Kinesis is a high capacity real time data pipe.
What does it mean ?
Let's look at a some simple scenarios, most of the time we log data hoping to analyse them at some point in time. Our log files keep growing and they get stored somewhere. What if we can analyse them real time and archive them once they are analysed. This will reduce our storage costs because we can archive logs to cheeper storage once the've been analysed, we will also be able to monitor our logs real for anomalies, performance, threats etc. This will ensure that we are proactive rather than reactive to changes.
Think of another scenario where a weather balloon would send real time data through an IOT device where the data needs to be analysed real time.
The data mentioned above can be sent to a data pipe, the data can be analysed real time for instance by a Lamba funtion when a data input event is triggered.
Basically a data pipe stores a sequence of events within a pipe, the end point of the pipe being a data analytics engine or similar.
Welcome to Keinesis, Kinesis is a high capacity real time data pipe or an event streaming service. This is similar in principal to an Apache Kafka Cluster. https://kafka.apache.org/
AWS has created variants of Kinesis known as Streams, Firehose and Analytics.
Kinesis streams
Kinesis streams can be utilized as mentioned in our previous examples to stream data to our own analytics engines or programs to analyse data.
Kinesis Firehose
Kinesis Firehose can be utilized either to store streams of data to S3 (An AWS object data store service), AWS Redshift (An AWS Data Warehouse service that is horizontally scalable) or AWS Elastic Search (An AWS search service that search, analyze, and visualize data).
Kinesis Data Analytics
Enables to query and analyse data utilizing SQL queries or Java based applications.
Shards are storage partitions thats store the stream's incoming data records within the main pipe with a unique accesss keys known as the partition key, partition keys are used to group data by shard within a stream.Data streams records can be stored across these shards. Shards enable better throughput and faster access to data records. You can think of shards as horizontally scalable data partitions that enhance data thoughput. When you stream data events to the Kinesis pipe the data gets stored across these shards. You can also configure a pipe with a single shard if necessary.
The proportianality of data throughput and performance is based on the number of shards being consumed.
Each shard can support up to 5 transactions per second for reads, up to a maximum total data read rate of 2 MB per second and up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second (including partition keys).
Streams
A stream is a uniquely identifiable Events stream of data records. The stream comprosises a unique name and it will have an events stream of related data records. (Each event consists of a data record).
Data is stored within the pipeline for 24 hours by default, this can be extended to 7 days if necessary. The data stream can be replayed from the start or from a new stream or data known as the latest. You can also think of a Kinesis pipeline as a multitrack recorder that stores data within each track(shard). The data gets deleted after 24hours (Default) once the listner has consumed or analysed the data. The song can be replayed from the start or while a new song is being recorded.
The start of stored data is called the trim horizon and can be replayed from the start (trim horizon), or a play can be triggered with new data.
Data Record
A data record is a unit of data stored in a data stream, a data record consits of the following :
Partition Key (The shard key) provided by the source with the blop of data will determine the placement of the data within multiple shards.
For example:
If we have three blobs of data with corresponding partion keys "A" "B" and "C"
If we have a single shard the all blobs of data will get stored on the same shard.
If we have 2 shards "A" and "C" maybe stored on shard 1 and "B" maybe stored on shard 2. Hence through partion key Kinesis is able to determine the placement and retrieval of data.
Sequence Number (The sequence number of the data event this is assigned by Kinesis based on the sequences of the event stream).
Data Blob (The actual data).
Note: Partition keys are Unicode strings with a maximum length limit of 256 bytes. An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards. When an application puts data into a stream, it must specify a partition key.
AWS CLI Examples:
List Existing Kinesis Streams
aws kinesis list-streams
{
"StreamNames": []
}
Returns an empty JSON array since there aren't any shards at present on the AWS account I am currently workin on
Create Kinesis Stream Named Test With One Shard
aws kinesis create-stream --stream-name test --shard-count 1
List Created Kinesis Stream
aws kinesis list-streams
{
"StreamNames": [
"test"
]
}
The describe stream command shows the configuration of the stream
aws kinesis describe-stream --stream-name test
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49592235610908080812900514258860256679432992322846982146"
}
}
],
"StreamARN": "arn:aws:kinesis:eu-west-2:682894122420:stream/test",
"StreamName": "test",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
]
}
}
Note: The hashing number can range from 0 through
340282366920938463463374607431768211455
And the starting sequence number assigned by Kinesis is
49592235610908080812900514258860256679432992322846982146
Now We Can Put A Data Record With A Partition Key Of Our Choice.
aws kinesis put-record --stream-name test --data 'happy streaming' --partition-key 'A'
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49592257543780186548562496460832369797253790639069331458"
}
Note: To to query records we need to get the value of what is called the iterator for that shard.
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name test
{
"ShardIterator": "AAAAAAAAAAFIXgF12s3AQvjl6Xy5ure+4U5IeHjDXZZrolyrIUdFCkjzmfNcqplpikcXxMuGg
YqDmMr0zRiwvSB/bN5C8hUULOVeZFVIQAeF5QcD5m4lLX+nHDpTRR9tKRqqcdENNk
vMabOT1gj241sxjwLusK+N4RPos0FPRVJfzgJPwo6RyvR74omdczuCGRTp7eBc33iJPHeCKWYH6U+VbWUqyCKc"
}
Now to get the records
aws kinesis get-records --shard-iterator
AAAAAAAAAAFIXgF12s3AQvjl6Xy5ure+4U5IeHjDXZZrolyrIUdFCkjzmfNcqplpikcXxMuGg
YqDmMr0zRiwvSB/bN5C8hUULOVeZFVIQAeF5QcD5m4lLX+nHDpTRR9tKRqqcdENNkv
MabOT1gj241sxjwLusK+N4RPos0FPRVJfzgJPwo6RyvR74omdczuCGRTp7eBc33iJPHeCKW
YH6U+VbWUqyCKc
{
"Records": [
{
"SequenceNumber": "49592257543780186548562496460832369797253790639069331458",
"ApproximateArrivalTimestamp": 1548155258.219,
"Data": "aGFwcHkgc3RyZWFtaW5n",
"PartitionKey": "A"
}
],
"NextShardIterator": "AAAAAAAAAAFqJg5A62FHHlTdjkZ6M0mAtseRrg4PLjYiRW8IrSnyAowKM0YVzG9u2ilK6ZbQnliTd8LqtDUK6jNHGNaRtHi/xGNjpZ0a999oiwp/UGBxX3i69VNbx937On00bKsiRzoidyE8oaIDyjQGEtbTygH1UJSpv98zJDIyLvfL+wB7zUqrWjLcKwPd1xEJp2YvdqEAeyvcnpSTVsEd1PV5/ftA",
"MillisBehindLatest": 0
}
Please note that the records get Base64 encoded so we need to decode the above value.
"Data": "aGFwcHkgc3RyZWFtaW5n"
We can run the following Python script to decode the above Base64 to Binary
import base64
a = 'aGFwcHkgc3RyZWFtaW5n'
b= base64.b64decode(a)
print(b)
'happy streaming'
Please note that the iterator expires after 5 minutes.
Now let's delete the stream
aws kinesis delete-stream --stream-name test