Sending data to kafka

PHOTO EMBED

Tue Nov 21 2023 10:08:25 GMT+0000 (Coordinated Universal Time)

Saved by @GlassFlow

import json
import time
import random
import csv

from confluent_kafka import Producer


def acked(err, msg):
    if err is not None:
        print(f"Failed to deliver message: {msg.value()}: {err.str()}")


producer = Producer({'bootstrap.servers': 'localhost:9092'})

with open('funnel_steps.csv', newline='') as csvfile:
    reader = csv.DictReader(csvfile)

    for row in reader:
        print(f'Sending payload: {row}')
        # Send to Kafka
        payload = json.dumps(row)
        producer.produce(topic='clickstream-events', key=str(row['user_id']),
                         value=payload, callback=acked)

        # Random sleep
        sleep_time = random.randint(1, 4)
        time.sleep(sleep_time)
content_copyCOPY