Now that we have built the producer which we have discussed in our previous blog, the next thing we need to do is, to build an application that will retrieve the blobs of data from the Kinesis stream, in our case, the data related to the user’s clicks are to be retrieved from the “ClickStream” stream.
So lets start building our consumer application. As explained earlier, we can use the KCL ( Kinesis Client Library ) provided by Amazon to build a consumer application. In our example we will be using the sample available in the downloaded SDK under the folder name “AmazonKinesisApplication”.
The classes available there are –
1.SampleKinesisApplication
2.SampleRecordProcessor
3.SampleRecordProcessorFactory
When using the KCL, we first need to implement the IRecordProcessor interface. This interface helps bridge the communication between the Kinesis Stream and our Application. This implementation is done in the “SampleRecordProcessor” class.
What the KCL basically does is, using the Kineses API it retrieves data from the stream and then passes this data to the above mentioned IRecordProcessor’s, processRecords() method.
When the application is started, it will create a worker which will do the work of processing the records. This worker when initialized, is provided with the details regarding the stream that it needs to process the credentials etc. In our case, it will be something like this,
Every recordProcessor will process exactly one shard. So as the shards increase, the number of record processors also need to be increased. This is the responsibility of the worker.So based on the size of the stream ( the number of shards) the worker might create mulitple recordProcessor instances.
For every new recordProcessor instance creation, a call is made to the recordProcessorFactory. In our case that will be the, “SampleRecordProcessorFactory”.
Now lets take a look at the RecordProcessor, in our case,” SampleRecordProcessor”.
[ The Top 3 Hadoop Distributions have been Compared – Take Your Pick ]
When using the IRecordProcessor interface there are 3 methods that we need to implement.
1. initialize()
2.processRecords()
3.shutDown()
1.initialilze()
When a record processor is to be initialized, this method is called. The shardid of the shard to be processed is passed to this method.
Note: Amazon Kinesis has “at least once” semantics, meaning that every data record from a shard will be processed at least once by a worker in your Kinesis application.( Amazon Documentation )
2.processRecords
This method is of interest to us!!!! This is where we will implement our business logic:
The record retrieved from the stream is passed as one of the parameters to this method.In the above code, ” List records ” is the records reteieved, we further pass it to the method, “processRecordsWithRetries()” as a parameter.
processRecordsWithRetries() is where we have implemented our business logic.In our case, we have made a simple database entry of the click data being sent from the dummy website.
Sometimes there are chances that data records being processed is malformed, so we need to make sure that our business logic contains code to handle such scenarios.
Another important function that we come across is the ” checkpoint() ” function. There might be times when the worker application might shutdown or the ec2-instance itself may shutdown, at such times we would want to start processing the records, from the last point the records were processed by the previous instance. For such a functionality , we need to implement the checkpoint() method.
Here in our case, we can see that we call the checkpoint method after a certain time interval.
Note: Make sure that you call the checkpoint only after the records are processed completely, not in between the processing of the list of records.
3. shutdown()
The shutdown method can be called when any of the situations occur:
a. record processor will not receive any further records from the shard.
b. worker appears to be unresponsive to the Amazon Kinesis Client Library.
c.Amazon Kinesis Client Library calls the shutdown method with a parameter TERMINATE.
Note:
When a recordProcessor fails, the worker will take control and instantiate a new recordProcessor to process the shard.
However, situations in which the worker fails / application fails or the ec2 instance fails we need to detect and handle the situation or our own.
We have now completed the whole Kinesis Demo application.Visit the demo website here [ 54.80.53.243:8080 ]
New Interesting Finds from Our Blogs :
1. Hadoop Gives way to Real Time Big Data Stream Processing – The 3 Key Attributes – Collect | Process | Analyze
2. Diving Deep into MongoDB – How to Install and Integrate with the Popular YII Framework
3. Redefining the Retail Industry with Big Data – Stories Capturing the Changes as they Happen
4. Manoeuvering Through the Big Data Highway (Shards) with Amazon Kinesis
5. AWS Cloud for Start-ups Jump Start Package at Just 99$