Sending Streaming Data
Tue Nov 21 2023 10:38:38 GMT+0000 (Coordinated Universal Time)
Saved by
@GlassFlow
import time
import json
import random
from kafka import KafkaProducer
from river import datasets
# Create a Kafka producer that connects to Kafka on port 9092
producer = KafkaProducer(
bootstrap_servers=["localhost:9092"],
value_serializer=lambda x: json.dumps(x).encode("utf-8"),
)
# Initialize the River credit card fraud dataset.
dataset = datasets.CreditCard()
# Send observations to the Kafka topic with a random sleep
for x, y in dataset:
print("Sending:")
print(f" x: {x}")
print(f" y: {y}")
data = {"x": x, "y": y}
data = {"x": x, "y": y}
producer.send("ml_training_data", value=data)
time.sleep(
content_copyCOPY
Comments