import time import json import random from kafka import KafkaProducer from river import datasets # Create a Kafka producer that connects to Kafka on port 9092 producer = KafkaProducer( bootstrap_servers=["localhost:9092"], value_serializer=lambda x: json.dumps(x).encode("utf-8"), ) # Initialize the River credit card fraud dataset. dataset = datasets.CreditCard() # Send observations to the Kafka topic with a random sleep for x, y in dataset: print("Sending:") print(f" x: {x}") print(f" y: {y}") data = {"x": x, "y": y} data = {"x": x, "y": y} producer.send("ml_training_data", value=data) time.sleep(