Snippets Collections
//Useful for background tasks, uploading resources, print/task 
 
//Can be impleneted with array (use push/shift or unshift/pop) or queue class 
//Again any time you use shift/unshift, you have O(N)
//FIFO 

class Node: 
  def __init__(self, value, next=None): 
    self.value = value 
    self.next = next 
  
class Queue: 
  def __init__(self, first=None, last=None, size=0): 
    self.first = first 
    self.last = last 
    self.size = size 
  
  #add to end of queue (similar to append or push) and returns size 
  def enqueue(self, value): 
    new_node = Node(value) 
    
    if self.first == None: 
      self.first = new_node 
      self.last = new_node 
    else: 
      self.last.next = new_node
      self.last = new_node 
    
    self.size += 1
    return self.size 
  
  #remove from beginning of queue 
  def dequeue(self): 
    if self.first == None: 
      return None 
    
    temp = self.first 
    if self.first == self.last: 
      self.last = None 
    #set 1st property to be original first's next 
    self.first = self.first.next 
    self.size -= 1 
    
    return temp.value 
    
  #useful to visualize any other function works 
  def print_queue_to_list(self): 
    result = []
    current = self.first 
    while current: 
      result.append(current.value)
      current = current.next 
    print(result)

s = Queue() 
s.enqueue(1) 
s.enqueue(2)
s.enqueue(3)
s.print_queue_to_list() >> [1, 2, 3]
s.dequeue() 
s.print_queue_to_list() >> [2, 3]

// Big O: 
// Insertion and Removal: O(1) **this is mainly what stacks are good for 
// Access and Search: O(N) 
//Recall with array implenetation, insertion and removal is not constant 
# Standard
import json  # Not jason.
import random  # Fair dice roll.
import time  # As if we could.
import uuid  # Not GUID!

# Boto3/Moto
import boto3  # Client factory of typing doom.
import moto  # Love and sanity for the masses.

queue_name = "EventQueue"
fifo_queue_name = "EventQueueSorted.fifo"
event_types = ["solid", "liquid", "gas"]

# Mock me if you must!
sqs_mocker = moto.mock_sqs()
sqs_mocker.start()

# Set up client.
sqs_client = boto3.client("sqs", region_name="us-east-1")

# Create standard queue for incoming events so that we can batch it up.
sqs_create_queue_result = sqs_client.create_queue(
    QueueName=queue_name,
)

queue_url = sqs_create_queue_result["QueueUrl"]

# Create FIFO queue and allow for deduplication using event ID from
# event provider.
sqs_create_fifo_queue_result = sqs_client.create_queue(
    QueueName=fifo_queue_name,
    Attributes={
        "FifoQueue": "true",  # Stringy boolean!
        "DeduplicationScope": "messageGroup",  # No comment!
        "FifoThroughputLimit": "perMessageGroupId",  # I said no comment!
    },
)

fifo_queue_url = sqs_create_fifo_queue_result["QueueUrl"]

# Make some fake records using fake timestamps. Ignore this awful.
records = [
    {
        "timestamp": int(
            (time.time() * 1000) + random.randint(-10000, 10000),
        ),
        "event_type": random.choice(event_types),
        "event_id": str(uuid.uuid1()),
    }
    for timestamp in range(10)
]

# Do the hokey pokey
random.shuffle(records)

for record in records:
    print(record)

# {'timestamp': 1628905019640, 'event_type': 'liquid', 'event_id': '1d6d987b-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905015766, 'event_type': 'liquid', 'event_id': '1d6d987f-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905019341, 'event_type': 'gas', 'event_id': '1d6d9881-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905012019, 'event_type': 'solid', 'event_id': '1d6d9880-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905026503, 'event_type': 'liquid', 'event_id': '1d6d987c-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905024388, 'event_type': 'gas', 'event_id': '1d6d9882-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905017491, 'event_type': 'gas', 'event_id': '1d6d9883-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905013437, 'event_type': 'solid', 'event_id': '1d6d987e-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905012744, 'event_type': 'solid', 'event_id': '1d6d987d-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905010779, 'event_type': 'gas', 'event_id': '1d6d987a-fca0-11eb-9b68-cbb48e88eb48'}

# Pretend we are uberkitten.io and we are sending you 10 out of order
# records

entries = []

for record in records:
    sqs_send_message_result = sqs_client.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(record),
    )

# ... Pretend we pooled up a bunch of messages over 300 seconds.

# With a Lambda based subscription we can pool thousands of messages
# over a very long period up to a 1MB payload... here in moto land
# we don't have the luxury but we can imitate.

sqs_receiver_message_result = sqs_client.receive_message(
    QueueUrl=queue_url,
    MaxNumberOfMessages=10,
)

# ... Pretend we are a LAMBDA that is now receiving a ton of messages.

# Note: This would usually be a "Records" event and use different key
# casing just because.. yeh.

messages = sqs_receiver_message_result["Messages"]

# Convert body back to JSON for all the messages before we go on.
for message in messages:
    message["DecodedBody"] = json.loads(message["Body"])

# I'm still undecided about MSG
messages.sort(key=lambda message: message["DecodedBody"]["timestamp"])

entries = []

# Iterate through messages and create a new bulk of entries from the
# newly sorted list.
for message in messages:
    record = message["DecodedBody"]
    entries.append(
        {
            "Id": str(uuid.uuid1()),
            "MessageBody": message["Body"],
            "MessageGroupId": record["event_type"],
            "MessageDeduplicationId": record["event_id"],
        }
    )

# Enqueue FiFoables.
sqs_send_message_batch_results = sqs_client.send_message_batch(
    QueueUrl=fifo_queue_url,
    Entries=entries,
)

# ... Pretend we pooled up the maximum batch size for a FIFO queue..
# which is not hard to pretend at all.

sqs_receiver_message_result = sqs_client.receive_message(
    QueueUrl=fifo_queue_url,
    MaxNumberOfMessages=10,
)

# ... Pretend we are ANOTHER LAMBDA that will be processing the "sorted"
# messages.

messages = sqs_receiver_message_result["Messages"]

records_by_event_type = {}

for message in messages:
    message["DecodedBody"] = json.loads(message["Body"])
    record = message["DecodedBody"]

    records_by_event_type.setdefault(record["event_type"], [])
    records_by_event_type[record["event_type"]].append(record)


# And now demonstrate how out of control an article script can get.
for event_type, records in records_by_event_type.items():
    print(event_type)
    for record in records:
        print(record)

# gas
# {'timestamp': 1628905010779, 'event_type': 'gas', 'event_id': '1d6d987a-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905017491, 'event_type': 'gas', 'event_id': '1d6d9883-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905019341, 'event_type': 'gas', 'event_id': '1d6d9881-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905024388, 'event_type': 'gas', 'event_id': '1d6d9882-fca0-11eb-9b68-cbb48e88eb48'}
# solid
# {'timestamp': 1628905012019, 'event_type': 'solid', 'event_id': '1d6d9880-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905012744, 'event_type': 'solid', 'event_id': '1d6d987d-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905013437, 'event_type': 'solid', 'event_id': '1d6d987e-fca0-11eb-9b68-cbb48e88eb48'}
# liquid
# {'timestamp': 1628905015766, 'event_type': 'liquid', 'event_id': '1d6d987f-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905019640, 'event_type': 'liquid', 'event_id': '1d6d987b-fca0-11eb-9b68-cbb48e88eb48'}
# {'timestamp': 1628905026503, 'event_type': 'liquid', 'event_id': '1d6d987c-fca0-11eb-9b68-cbb48e88eb48'}

# How dare you mock me!
sqs_mocker.stop()
star

Sat Aug 14 2021 01:40:11 GMT+0000 (Coordinated Universal Time)

#aws #boto3 #sqs #fifo #sort

Save snippets that work with our extensions

Available in the Chrome Web Store Get Firefox Add-on Get VS Code extension