Sending Streaming Data

PHOTO EMBED

Tue Nov 21 2023 10:38:38 GMT+0000 (Coordinated Universal Time)

Saved by @GlassFlow

import time
import json
import random

from kafka import KafkaProducer
from river import datasets


# Create a Kafka producer that connects to Kafka on port 9092
producer = KafkaProducer(
    bootstrap_servers=["localhost:9092"],
    value_serializer=lambda x: json.dumps(x).encode("utf-8"),
)

# Initialize the River credit card fraud dataset.
dataset = datasets.CreditCard()

# Send observations to the Kafka topic with a random sleep
for x, y in dataset:
    print("Sending:")
    print(f"   x: {x}")
    print(f"   y: {y}")
    data = {"x": x, "y": y}     
    data = {"x": x, "y": y}
    producer.send("ml_training_data", value=data)

    time.sleep(
content_copyCOPY