Kinesis

What is Kinesis?

  • It is a managed alternative to Apache Kafka which is great to gather a lot of data in real-time.
  • Kinesis is a good option to choose if we need to gather data like application logs, IoT data, etc. Overall, it is great for any kind of real-time big data.
  • It integrates with a lot of processing frameworks for string pressing frameworks such as Spark or NiFi or other frameworks And we get some really cool data replication because data is replicated synchronously up to 380 so by default this is a highly available setup.
  • Now Kinesis is comprised of multiple services. In short, Earlier we use Apache Kafka but now we have Kinesis.
  • ·           Great for application logs, metrics, loT clickstreams
  • ·           Great for “real-time” big data
  • ·           It is good for streaming processing frameworks.
  • ·           Automatically its data is replicated to 3 AZ which is synchronous.
  • ·           Kinesis Streams is low latency streaming ingest
  • ·           Kinesis Analytics performs real-time analytics on streams where we can use SQL
  • ·           Kinesis Firehose loads streams into Amazon S3, Amazon Redshift, Elastic Search engine & Splunk
  • There is Kinesis Firehose to load streams into S3, Redshift, ElasticSearch, and Splunk.
What are the components in Kinesis?
  • Amazon Kinesis Streams
  • Amazon Kinesis Analytics
  • Amazon Kinesis Firehose

What is the architecture surrounding Kinesis?

In the center, we have the Amazon Kinesis service. And the first one is the Amazon Kinesis Stream and the second is Amazon Kinesis Firehose.

Amazon Kinesis is real-time data And Kinesis Firehose will deliver your data to an Amazon S3 bucket, or an Amazon Redshift database, or Splunk, or Elastic Search. It is a whole high-level overview of the architecture surrounding Kinesis.

What is Shard in Kinesis?

  • Kinesis streams are divided into a shard and it is the equivalent of a partition.
  • Whenever a producer produces our stream in Kinesis and that stream is made up of shards.
  • Next consumers read that data from shards.
  • Streams in Kinesis are divided into ordered Shards or Partitions
  • Producers -> Shard 1, Shard 2, Shard 3 ->consumers
  • one stream is made of many different shards or partitions And when you get billed in Kinesis you get billed per shard provisions.
  • By default, For how many hours, Kinesis stores data?
  • By default, Kinesis streams do not store data forever. It stores it for 24 hours by default, so basically, we have one day to consume the data.
  • But if you wanted to increase then, we can retain data for up to seven days.
  • By default, Data retention is 24 hours but it can go up to 7 days

Can multiple applications consume the same stream in Kinesis?

  • Kinesis has the ability to reprocess and replay data. So, once you consume that data, the data is not gone from Kinesis streams.
  • It will be gone based on the data retention period, but you are able to read the same data over and over and over as long it is as it is in the Kinesis stream.
  • So multiple applications can take or consume the same stream.
  • That means you can-do real-time processing and you have a huge scale of throughput.

Ability to reprocess / replay data

•           Multiple applications can accept or consume the same stream

•           Real-time processing with scale of throughput         

Is Kinesis streams a database?

  • Kinesis streams is not a database. Once you insert the data into Kinesis it’s immutable which means it cannot be deleted.
  • Once data is inserted or loaded in Kinesis, it can’t be removed or deleted (immutability)

Explain about Kinesis Streams Shards.

•           One stream in kinesis is made of many different shards

•           As we know billing is done per shard and it can have as many shards as you want

•           Batching available or per message calls.

•           The number of shards can evolve overtime (reshard / merge)

•           Records are ordered per shard

Producers->Shard 1,Shard 2,Shard 3,Shard 4->consumers

Here we have four shards and consumers will receive that data.

What is Data Blob, Record Key and Sequence number? How are they linked?

As we know that Records are ordered per shard and record is made of Data Blob.

Data Blob is up to one megabyte. It can represent anything you want then you will have a record key and that record key basically helps Kinesis know to reach shard to send that data to.

Sequence number: It basically represents the sequence number in the Shard when the data is added to that chart.

In short,Data Blob:

  • data being sent,
  • serialized as bytes.
  • Up to I MB.
  • Can represent anything

• Record Key:

•           sent with a record and it helps to group records in Shards.

Same key = Same shard

•           Whenever we get “hot partition”  issue then we should use a highly distributed key to avoid this problem.

Sequence number is nothing but one unique identifier for each record which is put in form of shards. Added by Kinesis after ingestion

What are Kinesis Data Streams Limits?

•           Producer:

•           I MB/s or 1000 messages/s at write PER SHARD

•           “ProvisionedThroughputException” otherwise

•           Consumer Classic:

2MB/s at read PER SHARD across all consumers

consumers

•           5 API calls per second PER SHARD aq

Consumer Enhanced Fan-Out:

•           2MB/s at read PER SHARD, PER ENHANCED CONSUMER

•           No API calls needed (push model)

• Data Retention:

•           24 hours data retention by default

•           Can be extended to 7 days

*********************************************************************

Explain about Kinesis Producer ?

1. Kinesis SDK: This SDK allows us to write code or use the CLIA to directly send data into Amazon Kinesis streams.

2. The Kinesis Producer library or KPL.

The Kinesis Producer Library get enhanced throughput into Kinesis streams.

3. Kinesis agent : The Kinesis agent is a linux program that runs on our server so remember that it’s an agent that runs on servers and it basically allows you to get a log file and send that reliably into the Amazon Kinesis streams.

3rd part libraries: We can use third party libraries that usually build upon the SDK such as

Apache Spark, Kafka Connect, NiFi, etc. and all these things will allow you to send data to Kinesis streams.

Explain Kinesis Producer SDK-PutRecord(s)?

  • The Producer SDK : it uses PutRecord, or PutRecords. So it means whenever we put record that means SDK.
  • PutRecord (one) and PutRecords (many records) APIs are used in this.
  • So with a PutRecord as a name indicates you send one record.
  • And if you put many records then we must use PutRecords.
  •          PutRecords uses batching and it increases throughput which is less HTTP requests.
  • That’s because we send many records as part of one HTTP request and we save in the HTTP request and its throughput gets increased. And
  •          If we go over the limits, then ProvisionedThroughput Exceeds.
  • Now this producer SDK can be used in various ways.
  • You can use it on your applications but also on your mobile devices.

When would you choose to use the producer SDK?

  • Whenever we have some low throughput use case or we don’t mind the higher latency.
  • We want a very simple API or maybe we’re just working straight from the AWS Lambda.
  • This is the kind of place where we would use the producer SDK. As we know that there are some manage AWS sources for Kinesis data streams.
  • So behind the scenes, they use a producer SDK but we don’t see it.
  • And so these managed sources are going to be CloudWatch logs so we can send our logs directly from CloudWatch into Kinesis.
  • And finally, Kinesis data analytics can produce back into Kinesis data streams.

How do we deal with exceptions for the Kinesis API?

You get a provisioned throughput exception when you are sending more data that you can’t. For example if you are exceeding the number of megabytes per second or you’re exceeding the number of records per second.So you need to make sure when you get this that you don’t have a hot shard.For example if you have device id as your key and you have 90 percent of your devices being Phones then you’re gonna get a hotkey a hot partition because all of your devices are Phones and it will all go to the same partition.So we have to make sure we distribute as much as possible our key that we choose in order to not get a hot partition.

So solution is: Do retries with backoff.

In short summary,

• ProvisionedThroughputExceeded Exceptions

•           Happens when sending more data (exceeding MB/s orTPS for any shard)

•           Make sure you don’t have a hot shard (such as your partition key is bad and too much data goes to that partition)

• Solution:

•           Retries with backoff

•           Increase shards (scaling)

•           Observe your partition key whether it is a good one or not, it should be good as per standards.

Explain about Kinesis Producer Library (KPL)?

•      It is used for increasing high performance and long-running producers

•          It is Automated and configurable retry mechanism

•          It is Synchronous or Asynchronous API (whatever it has better performance for async API)

•          It submits metrics to CloudWatch for monitoring

•           In KPL, Batching is turned on by default. Doing so it increases throughput and decreases cost:

         Its records are collected and it is written to multiple shards having same PutRecords API call

•         KCL or special helper library are used for decoding and KPL Records should be decoded with these.

*****************************************************************************************

Explain Kinesis Agent

•           Monitor Log files and sends them to Kinesis Data Streams

•           Agent which is built on top of KPL is Java-based agent

•           Install in Linux-based server environments

• Features:

•           Write from multiple directories and write to multiple streams

•           Routing feature based on some directory and log file which is used.

•           Pre-process data before sending to streams (single line, csv to json, log to json…)

•           The agent handles file rotation, checkpointing, and retry upon failures

•           Emits metrics to CloudWatch for monitoring

*******************************************************************

Interviewer: Explain about Kinesis Consumer?

Candidate: In Kinesis, we should know, how we can consume data from Kinesis data streams. So these are main parts of Kinesis Consumers – Classic :

1)Kinesis SDK

2)Kinesis Client Library (KCL)

we produced the KPL and read with the KCL.

3)Kinesis Connector Library

4)3rd party libraries: Spark, Log4J, Appenders, Flume, Kafka

Apache Spark can read from Kinesis data streams as a consumer.

5)Kinesis Firehose

6)AWS Lambda

1)Kinesis SDK

We could use a CLI or some programming language to push data to Kinesis data streams and same way we can use the SDK or the CLI to read data from Kinesis data streams.

In Kinesis Consumer,First Producer comes then Kinesis Data Stream (Shards) comes and at the end Consumer comes.

Consider Consumer pulled records from Kinesis Data Stream (Shards) through GetRecords and each shard has 2 MB total aggregate throughput. GetRecords returns up to 10MB of data (then throttle for 5 seconds) or up to 10000 records to consumer.

•           Maximum of 5 GetRecords API calls per shard per second = 200ms latency

•           If 5 consumers application consume from the same shard then It means every consumer can poll once a second and receive less than 400 KB/s

Our producer is producing to our Kinesis data stream and it may be of let’s say three shards. And so if we have three shards then we have six megabytes of aggregate throughput for downstream. But each shard itself will get two megabytes for its own.

So now we have a consumer application and we want to read from shard number one what it will do is that they will do get records API call and the Shard will return some data and if the consumer wants more data it needs to do another get records API call. So that’s what’s called a polling mechanism.

So every time get records run it. It will return up to 10 megabytes of data and then because that 10 megabyte of data goes over the two megabytes per second total we will need to wait five seconds until we do another get records or it will return a maximum of up to 1000 records.

2)Kinesis Client Library (KCL)

KCL is a Java first library but it also uses for other languages such as Golang, Python, Ruby, Node,.NET and it allows us to read records produced with the KPL.

So basically with the help of KCL we can share multiple shards with multiple consumers in one group and there is a shard discovery mechanism that means that your Kinesis data streams can be consumed by one application but also a second one acting as a group together.And on top of it there is a checkpointing feature.

So that means that if one of these application goes down and comes back up it’s able to remember exactly where it was consuming last in order to resume the progress.

how does checkpointing works and all the shard discovery?

It basically uses an Amazon dynamo DB table to check point the progress over time and synchronize to see who is going to read which shard which make it helpful.

So DynamoDB will be used for the coordination and check pointing and it will have one row in the table for each shard to consume from.

And so because we have DynamoDB in the equation now with Kinesis client library we need to consider how to provision throughput for that.

DynamoDB table so you need to make sure you provision enough write capacity units or reading capacity units WCU or RCU or you need to use on demand for DynamoDB to just not use any provisioning of capacity.

well DynamoDB may throttle and that throttling will in fact slow down KCL.

Finally there is a API of record processors to process the data which make it really convenient to treat these messages one by one. So the Kinesis client library what you would need to remember is that it uses DynamoDB for check pointing and that it has a limit on DynamoDB. And it’s used to deaggregate records from the KPL.

Kinesis Client Library (KCL)

ava-first library but exists for other languages

too (Golang, Python, Ruby, Node, .NE ..

• Read records from Kinesis produced with the

KPL (de-aggregation)

•           Share multiple shards with multiple consumers

in one “group”, shard discovery

•           Checkpointing feature to resume

•           Leverages DynamoDB for coordination

checkpointing (one row per shard)

•           Make sure you provision enough WCU / RCU

•           Or use On-Demand for DynamoDB

•           Otherwise DynamoDB may slow down KCL

• Record processors will process the data

So for some of these targets we can use Kinesis firehose for example for S3 and redshift but for others we can AWS lambda.

So let’s talk about lambda now lambda can read records from a Kinesis data stream and the lambda consumer also has a small library which is really nice used to deaggregate record from the KPL so you can produce with a KPL and read from a lender consumer using a small library.

Now lambda can be used to do lightweight ETL so we can send data to Amazon S3,

DynamoDB, Redshift, Elastic Search, or really anywhere you want as long as you can program it and lambda can also be used to read in real time from Kinesis data streams and trigger notifications or for example send email in real time so whatever you may want.

But basically, you can say how much data at a time lambda should read from Kinesis which helps you regulate throughput.

So overall we’ve seen all the ways we can read from Kinesis data streams are many different ones but hope it places some context to which one is good for which use case.

->we have a consumer application and he wants to read from for example shard number one what it will do is that they will do get records API call and the Shard will return some data and if the consumer wants more data it needs to do another get records API call.

So that’s what’s called a polling mechanism.

So get records every time run it.

It will return up to 10 megabytes of data and then because that 10 megabyte of data goes over the two megabytes per second total you will need to wait five seconds until you do another get records or it will return a maximum of up to 1000 records.

There’s a maximum of five get records API calls per shards per seconds.

So that means that your consumer application it cannot do just get records get records like 20 times per second.

It can do it only five times per second.

That means that you will get two hundred millisecond latency on your data.

If we look at these constraints and we start adding more consumers will a five consumers application consume from the same shard that different applications in all need you read the same data then each consumer basically can poll for once a second and can receive less than 400 kilobytes per second. So that means that the more consumers you have the less throughput you will have per consumer. So if we had consumer B and consumers C they will all share that limit of two megabyte per second

per shard and they will all share that limit of five get records API call per second.

So it’s really important to understand that and we’ll see how Kinesis Enhance FanOut for

consumers will solve that problem.

3)Kinesis Connector Library

• Older Java library (2016), leverages the KCL library

• Write data to:

•           Amazon S3

•           DynamoDB

•           Red shift

•           ElasticSearch

Amazon Kinesis Data Streams

• Kinesis Firehose replaces the Connector Library for a few of these targets, Lambda for the others

So overall the Kinesis Connector library is deprecated and replaced by Kinesis firehose and Lambda altogether.

->lambda can read records from a Kinesis data stream

->The lambda consumer also has a small library which is really good used to deaggregate record from the KPL so it can be produced with a KPL and read from a lender consumer using a small library.

->Lambda can be used to do lightweight ETL so we can send data to Amazon S3,

DynamoDB, Redshift, ElasticSearch, or anywhere where we want with help of program. Lambda can also be used to read in real time from Kinesis data streams and trigger notifications or for example send email in real time.

But basically, you can say how much data at a time lambda should read from Kinesis which helps you regulate throughput.

So overall we’ve seen all the ways we can read from Kinesis data streams.

**********************************************

What is Shard Splitting’’ in Kinesis?

Kinesis Operations – Adding Shards – called “Shard Splitting’’

•           Can be used to increase the Stream capacity (I MB/s data in per shard)

•           Can be used to divide a “hot shard”

The old shard is closed and will be deleted once the data is expired

Shard 1           Shard 2           Shard 3

How can we scale Kinesis?

First operation you can do is to add shards and adding shards in Kinesis it increases the stream capacity.when you split a shard then the old shard is closed and will be deleted once the data in it is expired.

In short, Kinesis Operations – Merging Shards

•           Decrease the Stream capacity and save costs

•           Can be used to group two shards with low traffic

•           Old shards are closed and deleted based on data expiration

What is Kinesis Security?

  • We get encryption within the Kinesis streams using KMS. And if we want to encrypt a message client-side and we should decrypt it client-side.
  • If we want to access Kinesis within VPC within a private network we could use VPC endpoints.
  • we control access and authorizations to Kinesis using IAM policies and we can get encryption in-flight using the HTTPS endpoints.
  • It means that the data we send to Kinesis will be encrypted and it cannot be intercepted.