import time import json import random from kafka import KafkaProducer from river import datasets # Create a Kafka producer that connects to Kafka on port 9092 producer = KafkaProducer( bootstrap_servers=["localhost:9092"], value_serializer=lambda x: json.dumps(x).encode("utf-8"), ) # Initialize the River credit card fraud dataset. dataset = datasets.CreditCard() # Send observations to the Kafka topic with a random sleep for x, y in dataset: print("Sending:") print(f" x: {x}") print(f" y: {y}") data = {"x": x, "y": y} data = {"x": x, "y": y} producer.send("ml_training_data", value=data) time.sleep(
Preview:
downloadDownload PNG
downloadDownload JPEG
downloadDownload SVG
Tip: You can change the style, width & colours of the snippet with the inspect tool before clicking Download!
Click to optimize width for Twitter