Preview:
import time
import json
import random

from kafka import KafkaProducer
from river import datasets


# Create a Kafka producer that connects to Kafka on port 9092
producer = KafkaProducer(
    bootstrap_servers=["localhost:9092"],
    value_serializer=lambda x: json.dumps(x).encode("utf-8"),
)

# Initialize the River credit card fraud dataset.
dataset = datasets.CreditCard()

# Send observations to the Kafka topic with a random sleep
for x, y in dataset:
    print("Sending:")
    print(f"   x: {x}")
    print(f"   y: {y}")
    data = {"x": x, "y": y}     
    data = {"x": x, "y": y}
    producer.send("ml_training_data", value=data)

    time.sleep(
downloadDownload PNG downloadDownload JPEG downloadDownload SVG

Tip: You can change the style, width & colours of the snippet with the inspect tool before clicking Download!

Click to optimize width for Twitter