sqs-sort-records-into-fifo.py
Sat Aug 14 2021 01:40:11 GMT+0000 (Coordinated Universal Time)
Saved by @whardier #aws #boto3 #sqs #fifo #sort
# Standard import json # Not jason. import random # Fair dice roll. import time # As if we could. import uuid # Not GUID! # Boto3/Moto import boto3 # Client factory of typing doom. import moto # Love and sanity for the masses. queue_name = "EventQueue" fifo_queue_name = "EventQueueSorted.fifo" event_types = ["solid", "liquid", "gas"] # Mock me if you must! sqs_mocker = moto.mock_sqs() sqs_mocker.start() # Set up client. sqs_client = boto3.client("sqs", region_name="us-east-1") # Create standard queue for incoming events so that we can batch it up. sqs_create_queue_result = sqs_client.create_queue( QueueName=queue_name, ) queue_url = sqs_create_queue_result["QueueUrl"] # Create FIFO queue and allow for deduplication using event ID from # event provider. sqs_create_fifo_queue_result = sqs_client.create_queue( QueueName=fifo_queue_name, Attributes={ "FifoQueue": "true", # Stringy boolean! "DeduplicationScope": "messageGroup", # No comment! "FifoThroughputLimit": "perMessageGroupId", # I said no comment! }, ) fifo_queue_url = sqs_create_fifo_queue_result["QueueUrl"] # Make some fake records using fake timestamps. Ignore this awful. records = [ { "timestamp": int( (time.time() * 1000) + random.randint(-10000, 10000), ), "event_type": random.choice(event_types), "event_id": str(uuid.uuid1()), } for timestamp in range(10) ] # Do the hokey pokey random.shuffle(records) for record in records: print(record) # {'timestamp': 1628905019640, 'event_type': 'liquid', 'event_id': '1d6d987b-fca0-11eb-9b68-cbb48e88eb48'} # {'timestamp': 1628905015766, 'event_type': 'liquid', 'event_id': '1d6d987f-fca0-11eb-9b68-cbb48e88eb48'} # {'timestamp': 1628905019341, 'event_type': 'gas', 'event_id': '1d6d9881-fca0-11eb-9b68-cbb48e88eb48'} # {'timestamp': 1628905012019, 'event_type': 'solid', 'event_id': '1d6d9880-fca0-11eb-9b68-cbb48e88eb48'} # {'timestamp': 1628905026503, 'event_type': 'liquid', 'event_id': '1d6d987c-fca0-11eb-9b68-cbb48e88eb48'} # {'timestamp': 1628905024388, 'event_type': 'gas', 'event_id': '1d6d9882-fca0-11eb-9b68-cbb48e88eb48'} # {'timestamp': 1628905017491, 'event_type': 'gas', 'event_id': '1d6d9883-fca0-11eb-9b68-cbb48e88eb48'} # {'timestamp': 1628905013437, 'event_type': 'solid', 'event_id': '1d6d987e-fca0-11eb-9b68-cbb48e88eb48'} # {'timestamp': 1628905012744, 'event_type': 'solid', 'event_id': '1d6d987d-fca0-11eb-9b68-cbb48e88eb48'} # {'timestamp': 1628905010779, 'event_type': 'gas', 'event_id': '1d6d987a-fca0-11eb-9b68-cbb48e88eb48'} # Pretend we are uberkitten.io and we are sending you 10 out of order # records entries = [] for record in records: sqs_send_message_result = sqs_client.send_message( QueueUrl=queue_url, MessageBody=json.dumps(record), ) # ... Pretend we pooled up a bunch of messages over 300 seconds. # With a Lambda based subscription we can pool thousands of messages # over a very long period up to a 1MB payload... here in moto land # we don't have the luxury but we can imitate. sqs_receiver_message_result = sqs_client.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=10, ) # ... Pretend we are a LAMBDA that is now receiving a ton of messages. # Note: This would usually be a "Records" event and use different key # casing just because.. yeh. messages = sqs_receiver_message_result["Messages"] # Convert body back to JSON for all the messages before we go on. for message in messages: message["DecodedBody"] = json.loads(message["Body"]) # I'm still undecided about MSG messages.sort(key=lambda message: message["DecodedBody"]["timestamp"]) entries = [] # Iterate through messages and create a new bulk of entries from the # newly sorted list. for message in messages: record = message["DecodedBody"] entries.append( { "Id": str(uuid.uuid1()), "MessageBody": message["Body"], "MessageGroupId": record["event_type"], "MessageDeduplicationId": record["event_id"], } ) # Enqueue FiFoables. sqs_send_message_batch_results = sqs_client.send_message_batch( QueueUrl=fifo_queue_url, Entries=entries, ) # ... Pretend we pooled up the maximum batch size for a FIFO queue.. # which is not hard to pretend at all. sqs_receiver_message_result = sqs_client.receive_message( QueueUrl=fifo_queue_url, MaxNumberOfMessages=10, ) # ... Pretend we are ANOTHER LAMBDA that will be processing the "sorted" # messages. messages = sqs_receiver_message_result["Messages"] records_by_event_type = {} for message in messages: message["DecodedBody"] = json.loads(message["Body"]) record = message["DecodedBody"] records_by_event_type.setdefault(record["event_type"], []) records_by_event_type[record["event_type"]].append(record) # And now demonstrate how out of control an article script can get. for event_type, records in records_by_event_type.items(): print(event_type) for record in records: print(record) # gas # {'timestamp': 1628905010779, 'event_type': 'gas', 'event_id': '1d6d987a-fca0-11eb-9b68-cbb48e88eb48'} # {'timestamp': 1628905017491, 'event_type': 'gas', 'event_id': '1d6d9883-fca0-11eb-9b68-cbb48e88eb48'} # {'timestamp': 1628905019341, 'event_type': 'gas', 'event_id': '1d6d9881-fca0-11eb-9b68-cbb48e88eb48'} # {'timestamp': 1628905024388, 'event_type': 'gas', 'event_id': '1d6d9882-fca0-11eb-9b68-cbb48e88eb48'} # solid # {'timestamp': 1628905012019, 'event_type': 'solid', 'event_id': '1d6d9880-fca0-11eb-9b68-cbb48e88eb48'} # {'timestamp': 1628905012744, 'event_type': 'solid', 'event_id': '1d6d987d-fca0-11eb-9b68-cbb48e88eb48'} # {'timestamp': 1628905013437, 'event_type': 'solid', 'event_id': '1d6d987e-fca0-11eb-9b68-cbb48e88eb48'} # liquid # {'timestamp': 1628905015766, 'event_type': 'liquid', 'event_id': '1d6d987f-fca0-11eb-9b68-cbb48e88eb48'} # {'timestamp': 1628905019640, 'event_type': 'liquid', 'event_id': '1d6d987b-fca0-11eb-9b68-cbb48e88eb48'} # {'timestamp': 1628905026503, 'event_type': 'liquid', 'event_id': '1d6d987c-fca0-11eb-9b68-cbb48e88eb48'} # How dare you mock me! sqs_mocker.stop()
Comments