import json from kafka import KafkaConsumer from river import linear_model from river import preprocessing from river import metrics from river import imblearn from river import optim # Use rocauc as the metric for evaluation metric = metrics.ROCAUC() # Create a logistic regression model with a scaler # and Random Under Sampler to deal with data imbalance model = ( preprocessing.StandardScaler() | imblearn.RandomUnderSampler( classifier=linear_model.LogisticRegression( loss=optim.losses.Log(weight_pos=5) ), desired_dist={0: .8, 1: .2}, seed=42 ) ) # Create our Kafka consumer consumer = KafkaConsumer( "ml_training_data", bootstrap_servers=["localhost:9092"], auto_offset_reset="earliest", enable_auto_commit=True, group_id="the-group-id", value_deserializer=lambda x: json.loads(x.decode("utf-8")), ) # Use each event to update our model and print the prediction and metrics for event in consumer: event_data = event.value try: x = event_data["x"] y = event_data["y"] prediction = model.predict_one(x) y_pred = model.predict_proba_one(x) model.learn_one(x, y) metric.update(y, y_pred) print(f"Prediction: {prediction} Accuracy:{metric}") except: print("Processing bad data...")
Preview:
downloadDownload PNG
downloadDownload JPEG
downloadDownload SVG
Tip: You can change the style, width & colours of the snippet with the inspect tool before clicking Download!
Click to optimize width for Twitter