SARIMA only routes.py
Thu Jun 26 2025 19:03:23 GMT+0000 (Coordinated Universal Time)
Saved by @AKAMAY001
from flask import Blueprint, jsonify, request
import pandas as pd
import os
from datetime import timedelta
from statsmodels.tsa.statespace.sarimax import SARIMAX
from models import SensorData, User, session
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import login_user, logout_user, login_required, current_user
bp = Blueprint('api', __name__)
# --- AUTHENTICATION & PASSWORD RECOVERY ROUTES (Unchanged) ---
@bp.route('/api/signup', methods=['POST'])
def signup():
data = request.get_json()
if session.query(User).filter_by(username=data.get('username')).first():
return jsonify({"message": "Username already exists."}), 409
if session.query(User).filter_by(email=data.get('email')).first():
return jsonify({"message": "Email already registered."}), 409
new_user = User(
username=data.get('username'),
email=data.get('email'),
security_question_1=data.get('security_question_1'),
security_answer_1_hash=generate_password_hash(data.get('security_answer_1')),
security_question_2=data.get('security_question_2'),
security_answer_2_hash=generate_password_hash(data.get('security_answer_2'))
)
new_user.set_password(data.get('password'))
session.add(new_user)
session.commit()
return jsonify({"message": "Account created successfully."}), 201
@bp.route('/api/login', methods=['POST'])
def login():
data = request.get_json()
user = session.query(User).filter_by(username=data.get('username')).first()
if not user or not user.check_password(data.get('password')):
return jsonify({"message": "Invalid username or password."}), 401
login_user(user)
return jsonify({"message": "Logged in successfully."}), 200
@bp.route('/api/logout', methods=['POST'])
@login_required
def logout():
logout_user()
return jsonify({"message": "You have been logged out."}), 200
@bp.route('/api/check_session', methods=['GET'])
def check_session():
if current_user.is_authenticated:
return jsonify({"loggedIn": True, "username": current_user.username}), 200
return jsonify({"loggedIn": False}), 401
@bp.route('/api/get-security-questions', methods=['POST'])
def get_security_questions():
data = request.get_json()
user = session.query(User).filter_by(username=data.get('username')).first()
if not user or not user.security_question_1:
return jsonify({"message": "Unable to retrieve security questions for this user."}), 404
return jsonify({"question1": user.security_question_1, "question2": user.security_question_2}), 200
@bp.route('/api/reset-password', methods=['POST'])
def reset_password():
data = request.get_json()
user = session.query(User).filter_by(username=data.get('username')).first()
if not user:
return jsonify({"message": "Invalid credentials."}), 401
answer1_correct = check_password_hash(user.security_answer_1_hash, data.get('answer1'))
answer2_correct = check_password_hash(user.security_answer_2_hash, data.get('answer2'))
if not (answer1_correct and answer2_correct):
return jsonify({"message": "One or more security answers are incorrect."}), 401
user.set_password(data.get('newPassword'))
session.commit()
return jsonify({"message": "Password has been reset successfully. Please log in."}), 200
# --- DATA ROUTES (Unchanged) ---
@bp.route('/status', methods=['GET'])
def status():
return jsonify({'status': 'API is running'})
@bp.route('/api/upload-mock-data', methods=['POST'])
def upload_mock_data():
file_path = "mock_sensor_data.csv"
if not os.path.exists(file_path):
return jsonify({"error": "Mock file not found"}), 404
df = pd.read_csv(file_path)
df["timestamp"] = pd.to_datetime(df["timestamp"])
for _, row in df.iterrows():
entry = SensorData(
timestamp=row["timestamp"],
sensor_id=row["sensor_id"],
rainfall_mm=row["rainfall_mm"],
water_level_cm=row["water_level_cm"],
flow_rate_lps=row["flow_rate_lps"]
)
session.add(entry)
session.commit()
return jsonify({"message": "Mock data uploaded successfully!"})
@bp.route('/api/get-sensor-data', methods=['GET'])
@login_required
def get_sensor_data():
try:
data = session.query(SensorData).all()
result = [
{
"timestamp": entry.timestamp.strftime("%Y-%m-%d %H:%M:%S"),
"sensor_id": entry.sensor_id,
"rainfall_mm": entry.rainfall_mm,
"water_level_cm": entry.water_level_cm,
"flow_rate_lps": entry.flow_rate_lps
} for entry in data
]
return jsonify(result)
except Exception as e:
return jsonify({"error": f"An internal error occurred: {str(e)}"}), 500
# --- UPDATED FORECASTING ROUTE ---
@bp.route('/api/forecast', methods=['GET'])
@login_required
def forecast():
try:
query = session.query(SensorData).statement
df = pd.read_sql(query, session.bind)
if df.empty or len(df) < 24: # Need enough data to forecast
return jsonify({"error": "Not enough data to create a forecast."}), 400
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.set_index('timestamp').sort_index()
forecast_results = {}
metrics_to_forecast = {
"waterLevel": "water_level_cm",
"flowRate": "flow_rate_lps",
"rainfall": "rainfall_mm"
}
last_timestamp = df.index[-1]
forecast_steps = 24
future_dates = pd.date_range(start=last_timestamp + timedelta(hours=1), periods=forecast_steps, freq='H')
for key, column_name in metrics_to_forecast.items():
# Resample series for stability
series = df[column_name].resample('H').mean().fillna(method='ffill')
# Simple check to avoid trying to forecast flat-line data (common with rainfall)
if series.nunique() < 2:
predicted_mean = [series.iloc[-1]] * forecast_steps
else:
# Define and train the SARIMAX model
model = SARIMAX(series, order=(1, 1, 1), seasonal_order=(1, 1, 0, 12), enforce_stationarity=False, enforce_invertibility=False)
results = model.fit(disp=False)
prediction = results.get_forecast(steps=forecast_steps)
predicted_mean = prediction.predicted_mean.tolist()
# Format the prediction for this metric
forecast_results[key] = {
"timestamps": [d.strftime("%Y-%m-%d %H:%M:%S") for d in future_dates],
"predicted_values": predicted_mean
}
# Ensure rainfall forecast does not predict negative values
if key == 'rainfall':
forecast_results[key]['predicted_values'] = [max(0, val) for val in forecast_results[key]['predicted_values']]
return jsonify(forecast_results)
except Exception as e:
return jsonify({"error": f"Failed to generate forecast: {str(e)}"}), 500



Comments