Snippets Collections
# Standard
import json  # Not jason.
import random  # Fair dice roll.
import time  # As if we could.
import uuid  # Not GUID!

# Boto3/Moto
import boto3  # Client factory of typing doom.
import moto  # Love and sanity for the masses.

queue_name = "EventQueue"
fifo_queue_name = "EventQueueSorted.fifo"
event_types = ["solid", "liquid", "gas"]

# Mock me if you must!
sqs_mocker = moto.mock_sqs()
sqs_mocker.start()

# Set up client.
sqs_client = boto3.client("sqs", region_name="us-east-1")

# Create standard queue for incoming events so that we can batch it up.
sqs_create_queue_result = sqs_client.create_queue(
    QueueName=queue_name,
)

queue_url = sqs_create_queue_result["QueueUrl"]

# Create FIFO queue and allow for deduplication using event ID from
# event provider.
sqs_create_fifo_queue_result = sqs_client.create_queue(
    QueueName=fifo_queue_name,
    Attributes={
        "FifoQueue": "true",  # Stringy boolean!
        "DeduplicationScope": "messageGroup",  # No comment!
        "FifoThroughputLimit": "perMessageGroupId",  # I said no comment!
    },
)

fifo_queue_url = sqs_create_fifo_queue_result["QueueUrl"]

# Make some fake records using fake timestamps. Ignore this awful.
records = [
    {
        "timestamp": int(
            (time.time() * 1000) + random.randint(-10000, 10000),
        ),
        "event_type": random.choice(event_types),
        "event_id": str(uuid.uuid1()),
    }
    for timestamp in range(10)
]

# Do the hokey pokey
random.shuffle(records)

for record in records:
    print(record)

# {'timestamp': 1628905019640, 'event_type': 'liquid', 'event_id': '1d6d987b-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905015766, 'event_type': 'liquid', 'event_id': '1d6d987f-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905019341, 'event_type': 'gas', 'event_id': '1d6d9881-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905012019, 'event_type': 'solid', 'event_id': '1d6d9880-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905026503, 'event_type': 'liquid', 'event_id': '1d6d987c-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905024388, 'event_type': 'gas', 'event_id': '1d6d9882-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905017491, 'event_type': 'gas', 'event_id': '1d6d9883-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905013437, 'event_type': 'solid', 'event_id': '1d6d987e-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905012744, 'event_type': 'solid', 'event_id': '1d6d987d-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905010779, 'event_type': 'gas', 'event_id': '1d6d987a-fca0-11eb-9b68-cbb48e88eb48'}

# Pretend we are uberkitten.io and we are sending you 10 out of order
# records

entries = []

for record in records:
    sqs_send_message_result = sqs_client.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(record),
    )

# ... Pretend we pooled up a bunch of messages over 300 seconds.

# With a Lambda based subscription we can pool thousands of messages
# over a very long period up to a 1MB payload... here in moto land
# we don't have the luxury but we can imitate.

sqs_receiver_message_result = sqs_client.receive_message(
    QueueUrl=queue_url,
    MaxNumberOfMessages=10,
)

# ... Pretend we are a LAMBDA that is now receiving a ton of messages.

# Note: This would usually be a "Records" event and use different key
# casing just because.. yeh.

messages = sqs_receiver_message_result["Messages"]

# Convert body back to JSON for all the messages before we go on.
for message in messages:
    message["DecodedBody"] = json.loads(message["Body"])

# I'm still undecided about MSG
messages.sort(key=lambda message: message["DecodedBody"]["timestamp"])

entries = []

# Iterate through messages and create a new bulk of entries from the
# newly sorted list.
for message in messages:
    record = message["DecodedBody"]
    entries.append(
        {
            "Id": str(uuid.uuid1()),
            "MessageBody": message["Body"],
            "MessageGroupId": record["event_type"],
            "MessageDeduplicationId": record["event_id"],
        }
    )

# Enqueue FiFoables.
sqs_send_message_batch_results = sqs_client.send_message_batch(
    QueueUrl=fifo_queue_url,
    Entries=entries,
)

# ... Pretend we pooled up the maximum batch size for a FIFO queue..
# which is not hard to pretend at all.

sqs_receiver_message_result = sqs_client.receive_message(
    QueueUrl=fifo_queue_url,
    MaxNumberOfMessages=10,
)

# ... Pretend we are ANOTHER LAMBDA that will be processing the "sorted"
# messages.

messages = sqs_receiver_message_result["Messages"]

records_by_event_type = {}

for message in messages:
    message["DecodedBody"] = json.loads(message["Body"])
    record = message["DecodedBody"]

    records_by_event_type.setdefault(record["event_type"], [])
    records_by_event_type[record["event_type"]].append(record)


# And now demonstrate how out of control an article script can get.
for event_type, records in records_by_event_type.items():
    print(event_type)
    for record in records:
        print(record)

# gas
# {'timestamp': 1628905010779, 'event_type': 'gas', 'event_id': '1d6d987a-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905017491, 'event_type': 'gas', 'event_id': '1d6d9883-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905019341, 'event_type': 'gas', 'event_id': '1d6d9881-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905024388, 'event_type': 'gas', 'event_id': '1d6d9882-fca0-11eb-9b68-cbb48e88eb48'}
# solid
# {'timestamp': 1628905012019, 'event_type': 'solid', 'event_id': '1d6d9880-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905012744, 'event_type': 'solid', 'event_id': '1d6d987d-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905013437, 'event_type': 'solid', 'event_id': '1d6d987e-fca0-11eb-9b68-cbb48e88eb48'}
# liquid
# {'timestamp': 1628905015766, 'event_type': 'liquid', 'event_id': '1d6d987f-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905019640, 'event_type': 'liquid', 'event_id': '1d6d987b-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905026503, 'event_type': 'liquid', 'event_id': '1d6d987c-fca0-11eb-9b68-cbb48e88eb48'}

# How dare you mock me!
sqs_mocker.stop()
star

Sat Aug 14 2021 01:40:11 GMT+0000 (Coordinated Universal Time)

#aws #boto3 #sqs #fifo #sort

Save snippets that work with our extensions

Available in the Chrome Web Store Get Firefox Add-on Get VS Code extension