# Standard
import json # Not jason.
import random # Fair dice roll.
import time # As if we could.
import uuid # Not GUID!
# Boto3/Moto
import boto3 # Client factory of typing doom.
import moto # Love and sanity for the masses.
queue_name = "EventQueue"
fifo_queue_name = "EventQueueSorted.fifo"
event_types = ["solid", "liquid", "gas"]
# Mock me if you must!
sqs_mocker = moto.mock_sqs()
sqs_mocker.start()
# Set up client.
sqs_client = boto3.client("sqs", region_name="us-east-1")
# Create standard queue for incoming events so that we can batch it up.
sqs_create_queue_result = sqs_client.create_queue(
QueueName=queue_name,
)
queue_url = sqs_create_queue_result["QueueUrl"]
# Create FIFO queue and allow for deduplication using event ID from
# event provider.
sqs_create_fifo_queue_result = sqs_client.create_queue(
QueueName=fifo_queue_name,
Attributes={
"FifoQueue": "true", # Stringy boolean!
"DeduplicationScope": "messageGroup", # No comment!
"FifoThroughputLimit": "perMessageGroupId", # I said no comment!
},
)
fifo_queue_url = sqs_create_fifo_queue_result["QueueUrl"]
# Make some fake records using fake timestamps. Ignore this awful.
records = [
{
"timestamp": int(
(time.time() * 1000) + random.randint(-10000, 10000),
),
"event_type": random.choice(event_types),
"event_id": str(uuid.uuid1()),
}
for timestamp in range(10)
]
# Do the hokey pokey
random.shuffle(records)
for record in records:
print(record)
# {'timestamp': 1628905019640, 'event_type': 'liquid', 'event_id': '1d6d987b-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905015766, 'event_type': 'liquid', 'event_id': '1d6d987f-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905019341, 'event_type': 'gas', 'event_id': '1d6d9881-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905012019, 'event_type': 'solid', 'event_id': '1d6d9880-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905026503, 'event_type': 'liquid', 'event_id': '1d6d987c-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905024388, 'event_type': 'gas', 'event_id': '1d6d9882-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905017491, 'event_type': 'gas', 'event_id': '1d6d9883-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905013437, 'event_type': 'solid', 'event_id': '1d6d987e-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905012744, 'event_type': 'solid', 'event_id': '1d6d987d-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905010779, 'event_type': 'gas', 'event_id': '1d6d987a-fca0-11eb-9b68-cbb48e88eb48'}
# Pretend we are uberkitten.io and we are sending you 10 out of order
# records
entries = []
for record in records:
sqs_send_message_result = sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(record),
)
# ... Pretend we pooled up a bunch of messages over 300 seconds.
# With a Lambda based subscription we can pool thousands of messages
# over a very long period up to a 1MB payload... here in moto land
# we don't have the luxury but we can imitate.
sqs_receiver_message_result = sqs_client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
)
# ... Pretend we are a LAMBDA that is now receiving a ton of messages.
# Note: This would usually be a "Records" event and use different key
# casing just because.. yeh.
messages = sqs_receiver_message_result["Messages"]
# Convert body back to JSON for all the messages before we go on.
for message in messages:
message["DecodedBody"] = json.loads(message["Body"])
# I'm still undecided about MSG
messages.sort(key=lambda message: message["DecodedBody"]["timestamp"])
entries = []
# Iterate through messages and create a new bulk of entries from the
# newly sorted list.
for message in messages:
record = message["DecodedBody"]
entries.append(
{
"Id": str(uuid.uuid1()),
"MessageBody": message["Body"],
"MessageGroupId": record["event_type"],
"MessageDeduplicationId": record["event_id"],
}
)
# Enqueue FiFoables.
sqs_send_message_batch_results = sqs_client.send_message_batch(
QueueUrl=fifo_queue_url,
Entries=entries,
)
# ... Pretend we pooled up the maximum batch size for a FIFO queue..
# which is not hard to pretend at all.
sqs_receiver_message_result = sqs_client.receive_message(
QueueUrl=fifo_queue_url,
MaxNumberOfMessages=10,
)
# ... Pretend we are ANOTHER LAMBDA that will be processing the "sorted"
# messages.
messages = sqs_receiver_message_result["Messages"]
records_by_event_type = {}
for message in messages:
message["DecodedBody"] = json.loads(message["Body"])
record = message["DecodedBody"]
records_by_event_type.setdefault(record["event_type"], [])
records_by_event_type[record["event_type"]].append(record)
# And now demonstrate how out of control an article script can get.
for event_type, records in records_by_event_type.items():
print(event_type)
for record in records:
print(record)
# gas
# {'timestamp': 1628905010779, 'event_type': 'gas', 'event_id': '1d6d987a-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905017491, 'event_type': 'gas', 'event_id': '1d6d9883-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905019341, 'event_type': 'gas', 'event_id': '1d6d9881-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905024388, 'event_type': 'gas', 'event_id': '1d6d9882-fca0-11eb-9b68-cbb48e88eb48'}
# solid
# {'timestamp': 1628905012019, 'event_type': 'solid', 'event_id': '1d6d9880-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905012744, 'event_type': 'solid', 'event_id': '1d6d987d-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905013437, 'event_type': 'solid', 'event_id': '1d6d987e-fca0-11eb-9b68-cbb48e88eb48'}
# liquid
# {'timestamp': 1628905015766, 'event_type': 'liquid', 'event_id': '1d6d987f-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905019640, 'event_type': 'liquid', 'event_id': '1d6d987b-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905026503, 'event_type': 'liquid', 'event_id': '1d6d987c-fca0-11eb-9b68-cbb48e88eb48'}
# How dare you mock me!
sqs_mocker.stop()
Comments