Setup and train model

PHOTO EMBED

Tue Nov 21 2023 10:42:09 GMT+0000 (Coordinated Universal Time)

Saved by @GlassFlow

import json

from kafka import KafkaConsumer

from river import linear_model
from river import preprocessing
from river import metrics
from river import imblearn
from river import optim


# Use rocauc as the metric for evaluation
metric = metrics.ROCAUC()

# Create a logistic regression model with a scaler
# and Random Under Sampler to deal with data imbalance
model = (
    preprocessing.StandardScaler() |
    imblearn.RandomUnderSampler(
        classifier=linear_model.LogisticRegression(
            loss=optim.losses.Log(weight_pos=5)
        ),
        desired_dist={0: .8, 1: .2},
        seed=42
    )
)

# Create our Kafka consumer
consumer = KafkaConsumer(
    "ml_training_data",
    bootstrap_servers=["localhost:9092"],
    auto_offset_reset="earliest",
    enable_auto_commit=True,
    group_id="the-group-id",
    value_deserializer=lambda x: json.loads(x.decode("utf-8")),
)

# Use each event to update our model and print the prediction and metrics
for event in consumer:
    event_data = event.value
    try:
        x = event_data["x"]
        y = event_data["y"]
        prediction = model.predict_one(x)
        y_pred = model.predict_proba_one(x)

        model.learn_one(x, y)
        metric.update(y, y_pred)
        print(f"Prediction: {prediction}   Accuracy:{metric}")
    except:
        print("Processing bad data...")
content_copyCOPY