Setup and train model
Tue Nov 21 2023 10:42:09 GMT+0000 (Coordinated Universal Time)
Saved by
@GlassFlow
import json
from kafka import KafkaConsumer
from river import linear_model
from river import preprocessing
from river import metrics
from river import imblearn
from river import optim
# Use rocauc as the metric for evaluation
metric = metrics.ROCAUC()
# Create a logistic regression model with a scaler
# and Random Under Sampler to deal with data imbalance
model = (
preprocessing.StandardScaler() |
imblearn.RandomUnderSampler(
classifier=linear_model.LogisticRegression(
loss=optim.losses.Log(weight_pos=5)
),
desired_dist={0: .8, 1: .2},
seed=42
)
)
# Create our Kafka consumer
consumer = KafkaConsumer(
"ml_training_data",
bootstrap_servers=["localhost:9092"],
auto_offset_reset="earliest",
enable_auto_commit=True,
group_id="the-group-id",
value_deserializer=lambda x: json.loads(x.decode("utf-8")),
)
# Use each event to update our model and print the prediction and metrics
for event in consumer:
event_data = event.value
try:
x = event_data["x"]
y = event_data["y"]
prediction = model.predict_one(x)
y_pred = model.predict_proba_one(x)
model.learn_one(x, y)
metric.update(y, y_pred)
print(f"Prediction: {prediction} Accuracy:{metric}")
except:
print("Processing bad data...")
content_copyCOPY
Comments