Python for Machine Learning & Data Science — Day 7
MLOps & Deployment • Monitoring & CI/CD • Time Series Forecasting • Unsupervised (PCA & Clustering) • Recommenders • Interpretability & Fairness • End-to-End Mini Project
Overview / अवलोकन
Goal: Aaj hum model ko real world mein le jaayenge — deploy karna, monitor karna, update karna. Saath hi time-series forecasting, unsupervised learning (PCA + clustering), recommender systems, aur model interpretability/fairness ko bhi cover karenge. End mein ek mini-project hoga jisme sab concepts practically combine honge.
Why this matters? Classroom se industry transition yahin hoti hai. Accurate model banana kaafi nahi — serving, scaling, versioning, testing, monitoring equally important hai.
Prereqs Python, pandas, scikit-learn basics (Days 1–6). If you need refreshers, check your Day 1–6 posts (links at the bottom).
1) MLOps & Deployment / एमएलऑप्स और डिप्लॉयमेंट
Concept: MLOps = ML + DevOps practices. Iska focus: reliable pipelines, reproducibility, automation. Deployment ka matlab: apne trained model ko API/app/service ke form mein users ko dena.
- Packaging: Pickle/Joblib for sklearn,
requirements.txt
/pyproject.toml
. - Serving: FastAPI/Flask for REST APIs, batch jobs for offline scoring.
- Containerization: Docker images for consistent runtimes.
- Environments: Dev → Staging → Prod with version control (Git).
1.1 Model Packaging (joblib)
Model ko file mein save/load karna so that you can reuse in API, notebooks, or jobs.
# Example 1 — Train & save a model (Iris)
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
import joblib
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target,
test_size=0.2, random_state=42)
clf = LogisticRegression(max_iter=500).fit(X_train, y_train)
print("Test acc:", clf.score(X_test, y_test))
joblib.dump(clf, "iris_lr.joblib") # save
loaded = joblib.load("iris_lr.joblib") # load
print("Reloaded acc:", loaded.score(X_test, y_test))
# Example 2 — Save preprocessing + model via Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
pipe = Pipeline([
("scaler", StandardScaler()),
("lr", LogisticRegression(max_iter=500))
]).fit(X_train, y_train)
joblib.dump(pipe, "iris_pipe.joblib")
# Example 3 — Version your models with a naming pattern
import time, joblib
version = time.strftime("%Y%m%d_%H%M%S")
joblib.dump(pipe, f"models/iris_pipe_{version}.joblib")
# Example 4 — Save feature names & metadata
import json
meta = {"features": ["sepal_len","sepal_wid","petal_len","petal_wid"], "model":"LogReg"}
json.dump(meta, open("models/meta.json","w"), indent=2)
# Example 5 — Quick unit test: predict shape & dtype
import numpy as np
X_small = np.array([[5.1,3.5,1.4,0.2]])
pred = pipe.predict(X_small)
assert pred.shape == (1,), "Prediction shape mismatch"
print("Sanity test passed!")
joblib.dump()
se save karein, fir reload karke same test accuracy verify karein. Metadata JSON mein author name & feature list add karein.
1.2 Serving with FastAPI
FastAPI se REST endpoint banega jo JSON input lega aur prediction return karega. Hinglish explanation included.
# Example 1 — basic FastAPI app (save as app.py)
from fastapi import FastAPI
from pydantic import BaseModel
import joblib
import numpy as np
app = FastAPI(title="Iris Predictor API")
class IrisIn(BaseModel):
sepal_len: float
sepal_wid: float
petal_len: float
petal_wid: float
model = joblib.load("iris_pipe.joblib")
@app.get("/")
def root():
return {"msg":"API is live"}
@app.post("/predict")
def predict(inp: IrisIn):
X = np.array([[inp.sepal_len, inp.sepal_wid, inp.petal_len, inp.petal_wid]])
pred = int(model.predict(X)[0])
return {"prediction": pred}
# Example 2 — run API locally (terminal)
uvicorn app:app --reload --port 8000
# Test with curl/postman:
# curl -X POST http://127.0.0.1:8000/predict -H "Content-Type: application/json" \
# -d '{"sepal_len":5.1,"sepal_wid":3.5,"petal_len":1.4,"petal_wid":0.2}'
# Example 3 — add health & version routes
@app.get("/health")
def health():
return {"status":"ok"}
@app.get("/version")
def version():
return {"model_version":"1.0.0"}
# Example 4 — pydantic validation errors auto handled
# Try sending a string in sepal_len to see validation error response.
# Example 5 — batch predictions
from typing import List
class BatchIn(BaseModel):
rows: List[IrisIn]
@app.post("/predict_batch")
def predict_batch(batch: BatchIn):
X = [[r.sepal_len,r.sepal_wid,r.petal_len,r.petal_wid] for r in batch.rows]
import numpy as np
preds = model.predict(np.array(X)).tolist()
return {"predictions": preds}
/predict_proba
route banayein jo class probabilities return kare. Incorrect payload bhejkar validation behavior observe karein.
1.3 Docker for Consistent Deployment
Docker image create karke kahin bhi same environment milta hai. Yahan minimal Dockerfile diya gaya.
# Example 1 — Dockerfile (save as Dockerfile)
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY iris_pipe.joblib app.py ./
EXPOSE 8000
CMD ["uvicorn","app:app","--host","0.0.0.0","--port","8000"]
# Example 2 — requirements.txt
fastapi
uvicorn[standard]
scikit-learn
joblib
numpy
pydantic
# Example 3 — build & run
docker build -t iris-api:latest .
docker run -p 8000:8000 iris-api:latest
# Example 4 — attach git commit hash as label
docker build -t iris-api:$(git rev-parse --short HEAD) .
# Example 5 — environment variables for config (in app.py)
import os
MODEL_PATH = os.getenv("MODEL_PATH", "iris_pipe.joblib")
MODEL_PATH
env var change karke alag model load karne ka try karein.
2) Model Monitoring & CI/CD / मॉडल मॉनिटरिंग और सीआई/सीडी
CI/CD: Continuous Integration/Continuous Delivery — har code change par tests chalana, package build karna, aur deploy karna.
Monitoring: Production me model ki health dekhna: latency, error rate, data drift, concept drift, performance decay.
2.1 Unit & Integration Tests for ML
# Example 1 — pytest style unit test for schema
import numpy as np, joblib
def test_model_predict_shape():
model = joblib.load("iris_pipe.joblib")
X = np.array([[5.1,3.5,1.4,0.2]])
y = model.predict(X)
assert y.shape == (1,)
def test_model_prob_sum_to_one():
model = joblib.load("iris_pipe.joblib")
X = np.array([[6.2,2.8,4.8,1.8]])
p = model.predict_proba(X)
assert np.isclose(p.sum(), 1.0)
# Example 2 — data validation with simple checks
import pandas as pd
def validate_input(df: pd.DataFrame):
assert set(df.columns) == {"sepal_len","sepal_wid","petal_len","petal_wid"}
assert df.notna().all().all()
# Example 3 — performance regression test (thresholds)
def test_accuracy_threshold():
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris = load_iris()
X_train,X_test,y_train,y_test = train_test_split(iris.data,iris.target,test_size=0.2,random_state=42)
import joblib
model = joblib.load("iris_pipe.joblib")
assert model.score(X_test,y_test) >= 0.9
# Example 4 — mock API test (FastAPI TestClient)
from fastapi.testclient import TestClient
from app import app
client = TestClient(app)
def test_root():
assert client.get("/").status_code == 200
def test_predict():
payload = {"sepal_len":5.1,"sepal_wid":3.5,"petal_len":1.4,"petal_wid":0.2}
assert "prediction" in client.post("/predict", json=payload).json()
# Example 5 — simple GitHub Actions workflow (/.github/workflows/ci.yml)
name: ci
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with: {python-version: "3.11"}
- run: pip install -r requirements.txt pytest scikit-learn joblib fastapi uvicorn
- run: pytest -q
pytest
file likhiye jisme teen tests ho: (a) input schema, (b) probability sum, (c) accuracy threshold. Locally pytest -q chalayen.
2.2 Production Monitoring: Drift & Metrics
Production me aane ke baad data distribution change ho sakta hai (drift). Neeche quick checks:
# Example 1 — live vs training distribution (KS-test)
import numpy as np, pandas as pd
from scipy.stats import ks_2samp
train = pd.read_csv("train_sample.csv")
live = pd.read_csv("live_batch.csv")
for col in ["sepal_len","sepal_wid","petal_len","petal_wid"]:
stat, p = ks_2samp(train[col], live[col])
print(col, "KS p-value:", round(p, 4))
# Example 2 — rolling accuracy estimate using feedback
# Suppose you collect true labels later; compute moving window accuracy.
import pandas as pd
log = pd.read_csv("inference_log.csv") # columns: ts, y_pred, y_true (optional)
log["correct"] = (log["y_pred"] == log["y_true"]).astype(int)
print("Last 500 accuracy:", log["correct"].tail(500).mean())
# Example 3 — latency monitoring (simple)
import time, requests, statistics
payload = {"sepal_len":5.1,"sepal_wid":3.5,"petal_len":1.4,"petal_wid":0.2}
latencies=[]
for _ in range(20):
t0=time.time()
r = requests.post("http://localhost:8000/predict", json=payload, timeout=5)
latencies.append(time.time()-t0)
print("p50:", statistics.median(latencies), "p95:", sorted(latencies)[int(0.95*len(latencies))-1])
# Example 4 — log predictions with input hash for traceability
import hashlib, json, time
def log_request(x, y_pred):
rid = hashlib.md5(json.dumps(x, sort_keys=True).encode()).hexdigest()
with open("requests.log","a") as f:
f.write(json.dumps({"ts":time.time(),"rid":rid,"x":x,"y_pred":int(y_pred)})+"\n")
# Example 5 — trigger retrain when drift threshold crossed (pseudo)
if ks_p_value < 0.01 or recent_acc < 0.85:
print("🚨 Trigger retraining pipeline")
3) Time Series Forecasting / टाइम सीरीज़ फोरकास्टिंग
Key ideas: Autocorrelation, seasonality, trend, stationarity. Train/test split must respect time order. Metrics often MAPE/MAE/SMAPE.
3.1 Basics: Parsing, Resampling, Train/Test Split
# Example 1 — load and parse dates
import pandas as pd
df = pd.read_csv("sales_daily.csv", parse_dates=["date"])
df = df.set_index("date").sort_index()
print(df.head())
# Example 2 — resample to monthly
m = df["sales"].resample("M").sum()
print(m.tail())
# Example 3 — time-aware split
split_date = "2023-01-01"
train, test = m[:split_date], m[split_date:]
print(train.shape, test.shape)
# Example 4 — naive forecast baseline
naive = test.shift(1).fillna(train.iloc[-1])
mae = (test - naive).abs().mean()
print("Naive MAE:", mae)
# Example 5 — moving average baseline
ma3 = m.rolling(3).mean().shift(1)
mae = (test - ma3.loc[test.index]).abs().mean()
print("MA(3) MAE:", round(mae, 2))
3.2 ARIMA (statsmodels)
# Example 1 — ADF stationarity test
from statsmodels.tsa.stattools import adfuller
adf = adfuller(train.dropna())
print("ADF p-value:", adf[1])
# Example 2 — difference if not stationary
ts = train
if adf[1] > 0.05:
ts = train.diff().dropna()
# Example 3 — fit ARIMA (p,d,q)
from statsmodels.tsa.arima.model import ARIMA
model = ARIMA(train, order=(1,1,1)).fit()
print(model.summary())
# Example 4 — forecast horizon = len(test)
pred = model.forecast(steps=len(test))
mae = (test - pred).abs().mean()
print("ARIMA MAE:", round(mae,2))
# Example 5 — seasonal SARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
sar = SARIMAX(train, order=(1,1,1), seasonal_order=(1,1,1,12)).fit(disp=False)
pred = sar.forecast(steps=len(test))
print("SARIMA MAE:", (test-pred).abs().mean())
3.3 ML Regressors for Forecasting (Features)
# Example 1 — feature engineering from date
import numpy as np, pandas as pd
train_df = train.to_frame("y").copy()
train_df["month"] = train_df.index.month
train_df["year"] = train_df.index.year
train_df["lag1"] = train_df["y"].shift(1)
train_df["lag2"] = train_df["y"].shift(2)
train_df = train_df.dropna()
# Example 2 — train/test split again after lags
Xtr = train_df[["month","year","lag1","lag2"]]
ytr = train_df["y"]
# Example 3 — model: RandomForestRegressor
from sklearn.ensemble import RandomForestRegressor
rf = RandomForestRegressor(n_estimators=400, random_state=42)
rf.fit(Xtr, ytr)
# Example 4 — predict next points iteratively
def rf_forecast(rf, series, horizon=12):
import pandas as pd
last = series.copy()
preds=[]
idx=pd.date_range(series.index[-1]+pd.offsets.MonthEnd(), periods=horizon, freq="M")
y_prev1 = last.iloc[-1]; y_prev2 = last.iloc[-2]
month = series.index[-1].month; year = series.index[-1].year
for d in idx:
row = [[d.month, d.year, y_prev1, y_prev2]]
yhat = rf.predict(row)[0]
preds.append(yhat)
y_prev2, y_prev1 = y_prev1, yhat
return pd.Series(preds, index=idx)
pred = rf_forecast(rf, train)
# Example 5 — evaluate on test indices
mae = (test - pred[:len(test)]).abs().mean()
print("RF MAE:", round(mae,2))
4) Unsupervised Learning: PCA & Clustering / बिना लेबल
PCA dimension kam karta hai (variance preserve). Clustering similar points ko groups me daalta hai. Real world: segmentation, anomaly detection, visualization.
4.1 PCA Essentials
# Example 1 — PCA on Iris
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import load_iris
import pandas as pd
iris = load_iris(as_frame=True)
X = iris.data
Xz = StandardScaler().fit_transform(X)
pca = PCA(n_components=2).fit(Xz)
Z = pca.transform(Xz)
print("Explained variance:", pca.explained_variance_ratio_)
# Example 2 — choosing components by 95% variance
pca95 = PCA(n_components=0.95).fit(Xz)
print("n_components for 95%:", pca95.n_components_)
# Example 3 — components interpretation
loadings = pd.DataFrame(pca.components_, columns=X.columns, index=["PC1","PC2"])
print(loadings)
# Example 4 — pipeline with PCA + classifier
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
pipe = Pipeline([("scaler", StandardScaler()), ("pca", PCA(2)), ("lr", LogisticRegression(max_iter=500))])
from sklearn.model_selection import cross_val_score
print("CV acc:", cross_val_score(pipe, X, iris.target, cv=5).mean())
# Example 5 — reconstruction error (approx)
Xp = pca.inverse_transform(Z)
recon_mse = ((Xz - Xp)**2).mean()
print("Recon MSE:", recon_mse)
n_components
ko 2,3,4 pe try karke CV accuracy compare karein. loadings
se feature influence samjhein.
4.2 Clustering: KMeans & DBSCAN
# Example 1 — KMeans on Iris (2 PCs)
from sklearn.cluster import KMeans
km = KMeans(n_clusters=3, n_init=10, random_state=42)
labels = km.fit_predict(Z)
print(pd.Series(labels).value_counts())
# Example 2 — Silhouette score to evaluate
from sklearn.metrics import silhouette_score
sil = silhouette_score(Z, labels)
print("Silhouette:", round(sil, 3))
# Example 3 — choose k via simple loop
best=(None,-1)
for k in range(2,7):
lab = KMeans(n_clusters=k, n_init=10, random_state=42).fit_predict(Z)
s = silhouette_score(Z, lab)
if s>best[1]: best=(k,s)
print("Best k:", best)
# Example 4 — DBSCAN for non-spherical clusters
from sklearn.cluster import DBSCAN
db = DBSCAN(eps=0.8, min_samples=5).fit(Z)
print("Clusters (including -1 noise):", set(db.labels_))
# Example 5 — cluster centroids & simple interpretation
centroids = km.cluster_centers_
print("Centroids (PC space):", centroids)
StandardScaler→PCA→KMeans
pipeline banayein. k
ko 2..8 range me evaluate karke best silhouette choose karein.
5) Recommender Systems / रिकमेंडर सिस्टम
Types: Popularity baseline, Content-based (item/user features), Collaborative Filtering (user–item interactions), Hybrid. Metrics: Precision@K, Recall@K, MAP, NDCG.
5.1 Popularity & Simple Baselines
# Example 1 — popularity by overall rating count
import pandas as pd
ratings = pd.read_csv("ratings.csv") # user_id,item_id,rating
pop = ratings.groupby("item_id")["rating"].count().sort_values(ascending=False)
top10 = pop.head(10).index.tolist()
print("Top-10 item_ids:", top10)
# Example 2 — per-user fallback: recommend global top
def recommend_pop(user_id, k=5):
return top10[:k]
# Example 3 — simple rec for cold-start users
cold_recs = recommend_pop(user_id=999, k=5)
print(cold_recs)
# Example 4 — popularity weighted by mean rating
score = ratings.groupby("item_id")["rating"].mean()*ratings.groupby("item_id")["rating"].count().pow(0.5)
print(score.sort_values(ascending=False).head())
# Example 5 — business rule filters (e.g., category)
items = pd.read_csv("items.csv") # item_id,category
popular_in_cat = (ratings.merge(items)).groupby(["category","item_id"]).size().reset_index(name="n")
5.2 Content-based Filtering (TF-IDF)
# Example 1 — item descriptions → TF-IDF vectors
from sklearn.feature_extraction.text import TfidfVectorizer
desc = items["description"].fillna("")
tfidf = TfidfVectorizer(min_df=2, stop_words="english").fit_transform(desc)
# Example 2 — similarity via cosine
from sklearn.metrics.pairwise import cosine_similarity
sim = cosine_similarity(tfidf)
# Example 3 — recommend similar items to item i
def similar_items(i, k=5):
idx = items.index[items["item_id"]==i][0]
scores = sim[idx]
top = scores.argsort()[::-1]
recs = [items.iloc[j]["item_id"] for j in top if j!=idx][:k]
return recs
# Example 4 — user profile vector (average of liked items)
def user_profile(user_id):
liked = ratings[(ratings.user_id==user_id) & (ratings.rating>=4)]["item_id"]
idxs = [items.index[items.item_id==i][0] for i in liked]
import numpy as np
return tfidf[idxs].mean(axis=0) if len(idxs) else None
# Example 5 — recommend via user profile
def recommend_content(user_id, k=5):
up = user_profile(user_id)
if up is None: return recommend_pop(user_id, k)
sims = cosine_similarity(up, tfidf).ravel()
top = sims.argsort()[::-1]
return [items.iloc[j].item_id for j in top[:k]]
tags
or categories
ko one-hot karke TF-IDF ke saath concatenate karein aur recommendations compare karein.
5.3 Collaborative Filtering (Matrix Factorization)
# Example 1 — build user-item matrix (implicit zero)
import numpy as np, pandas as pd
users = ratings.user_id.unique(); items_u = ratings.item_id.unique()
u_map = {u:i for i,u in enumerate(users)}
i_map = {it:i for i,it in enumerate(items_u)}
R = np.zeros((len(users), len(items_u)))
for _,r in ratings.iterrows():
R[u_map[r.user_id], i_map[r.item_id]] = r.rating
# Example 2 — simple SVD with numpy
U, s, Vt = np.linalg.svd(R, full_matrices=False)
k=20
Uk, Sk, Vk = U[:,:k], np.diag(s[:k]), Vt[:k,:]
R_hat = Uk @ Sk @ Vk
# Example 3 — recommend top-K for a user
def recommend_cf(user_id, k=5):
ui = u_map[user_id]
scores = R_hat[ui]
seen = set(ratings[ratings.user_id==user_id].item_id)
idxs = np.argsort(scores)[::-1]
recs=[items_u[i] for i in idxs if items_u[i] not in seen][:k]
return recs
# Example 4 — evaluate with simple holdout
from sklearn.model_selection import train_test_split
train_r, test_r = train_test_split(ratings, test_size=0.2, random_state=42)
# (Rebuild R using train_r) then compute hit-rate@K for test pairs.
# Example 5 — hybrid: blend content & CF scores
def blend_scores(user_id, alpha=0.6):
cont = recommend_content(user_id, k=50)
cf = recommend_cf(user_id, k=50)
# Simple rank-based blending (demo)
combo = cont[:25]+cf[:25]
seen=set(); out=[]
for it in combo:
if it not in seen:
out.append(it); seen.add(it)
if len(out)==10: break
return out
k
ko 10/20/50 pe try karein. Hit-Rate@5 evaluate karne ka chhota function likhein.
6) Model Interpretability & Fairness / मॉडल व्याख्यात्मकता व निष्पक्षता
Interpretability: Model kyon aisa predict kar raha? Techniques: permutation importance, partial dependence (PDP), SHAP-like insights, LIME-style local explanations.
Fairness: Bias detect/mitigate across groups (e.g., gender/region). Simple metrics: demographic parity (rate parity), equal opportunity (TPR parity), calibration.
6.1 Permutation Importance
# Example 1 — permutation importance on classification
from sklearn.datasets import load_breast_cancer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.inspection import permutation_importance
data = load_breast_cancer()
X_train,X_test,y_train,y_test = train_test_split(data.data,data.target,test_size=0.2,random_state=42)
rf = RandomForestClassifier(n_estimators=400, random_state=42).fit(X_train,y_train)
imp = permutation_importance(rf, X_test, y_test, n_repeats=10, random_state=42)
print(sorted(zip(imp.importances_mean, data.feature_names))[-10:])
# Example 2 — compare with model's built-in feature_importances_
fi = sorted(zip(rf.feature_importances_, data.feature_names))
print(fi[-10:])
# Example 3 — PDP with scikit-learn
from sklearn.inspection import PartialDependenceDisplay
# In notebooks: PartialDependenceDisplay.from_estimator(rf, X_train, [0,1])
# Example 4 — local explanation using simple what-if
import numpy as np
x = X_test[0].copy()
for delta in [-2,-1,0,1,2]:
xt = x.copy(); xt[0]+=delta
print(delta, rf.predict_proba([xt])[0])
# Example 5 — simple SHAP-like logic (baseline mean prob)
# (For full SHAP, install shap; here we sketch additive contributions via permutation.)
6.2 Fairness Checks (Demo)
# Example 1 — create group column & compute positive rates
import pandas as pd, numpy as np
y_pred = rf.predict(X_test)
df = pd.DataFrame({"y_true":y_test, "y_pred":y_pred, "group":np.random.choice(["A","B"], size=len(y_pred))})
pos_rates = df.groupby("group")["y_pred"].mean()
print("Positive rate by group:", pos_rates.to_dict())
# Example 2 — equal opportunity: TPR by group
from sklearn.metrics import recall_score
tpr_by_group = df.groupby("group").apply(lambda g: recall_score(g["y_true"], g["y_pred"]))
print("TPR by group:", tpr_by_group.to_dict())
# Example 3 — calibration by group (predicted prob mean)
proba = rf.predict_proba(X_test)[:,1]
df["proba"]=proba
print(df.groupby("group")["proba"].mean())
# Example 4 — threshold adjustment per group (toy)
thr = {"A":0.55,"B":0.50}
df["y_adj"] = (df.apply(lambda r: r.proba >= thr[r.group], axis=1)).astype(int)
# Example 5 — fairness-utility tradeoff check
from sklearn.metrics import accuracy_score
acc = accuracy_score(df["y_true"], df["y_adj"])
print("Accuracy after adjustment:", acc)
group
feature bana kar positive rate & TPR parity dekhiyega. Threshold tweak karke parity improve karein and accuracy change note karein.
7) End-to-End Mini Project — BeepShip Retail Demand & Deployment
Scenario: BeepShip e-commerce par daily item demand forecast + “similar items” recommendations chahiye. Aapko ek combined solution banana hai: (a) time-series forecast per item (for inventory), (b) content-based similar recommendations, (c) FastAPI deployment with monitoring hooks.
7.1 Data Prep
# sales.csv: date,item_id,units_sold,price
# items.csv: item_id,title,description,category
import pandas as pd
sales = pd.read_csv("sales.csv", parse_dates=["date"])
items = pd.read_csv("items.csv")
sales = sales.sort_values("date")
7.2 Forecasting per Item (rolling features + RF)
# build a training table per item with lags
def build_features(df_item):
s = df_item.set_index("date")["units_sold"].asfreq("D").fillna(0)
feat = s.to_frame("y")
feat["dow"] = feat.index.dayofweek
feat["lag1"]=feat["y"].shift(1); feat["lag7"]=feat["y"].shift(7)
feat["ma7"]=feat["y"].rolling(7).mean()
return feat.dropna()
frames=[]
for iid, g in sales.groupby("item_id"):
f = build_features(g)
f["item_id"]=iid
frames.append(f)
train_df = pd.concat(frames)
# train/test split by date
split="2024-07-01"
tr = train_df.loc[train_df.index<split]
ts = train_df.loc[train_df.index>=split]
Xtr=tr[["dow","lag1","lag7","ma7"]]; ytr=tr["y"]
Xts=ts[["dow","lag1","lag7","ma7"]]; yts=ts["y"]
# model train
from sklearn.ensemble import RandomForestRegressor
rf = RandomForestRegressor(n_estimators=400, random_state=42)
rf.fit(Xtr,ytr)
print("Holdout MAE:", (abs(yts - rf.predict(Xts))).mean())
7.3 Similar Items (content-based)
# TF-IDF over titles+descriptions
from sklearn.feature_extraction.text import TfidfVectorizer
text = (items["title"].fillna("") +" "+ items["description"].fillna("")).values
tfidf = TfidfVectorizer(min_df=2, stop_words="english").fit_transform(text)
from sklearn.metrics.pairwise import cosine_similarity
sim = cosine_similarity(tfidf)
id_to_idx = {it:i for i,it in enumerate(items.item_id)}
idx_to_id = {i:it for it,i in id_to_idx.items()}
def similar_items(item_id, k=5):
i = id_to_idx[item_id]
scores = sim[i]
order = scores.argsort()[::-1]
recs=[idx_to_id[j] for j in order if j!=i][:k]
return recs
7.4 FastAPI Service (single image)
# api.py
from fastapi import FastAPI
from pydantic import BaseModel
import joblib, pandas as pd
from datetime import datetime
app = FastAPI(title="BeepShip Demand & Similar API")
rf = joblib.load("rf_demand.joblib") # trained earlier (save using joblib.dump)
items = pd.read_csv("items.csv") # for content recs + tfidf precompute
# (Load TF-IDF & sim from disk similarly)
class DemandIn(BaseModel):
item_id: int
lag1: float
lag7: float
ma7: float
dow: int
@app.post("/forecast")
def forecast(inp: DemandIn):
X=[[inp.dow, inp.lag1, inp.lag7, inp.ma7]]
yhat = float(rf.predict(X)[0])
return {"item_id": inp.item_id, "forecast": yhat}
@app.get("/similar/{item_id}")
def similar(item_id: int, k: int = 5):
return {"item_id": item_id, "recs": similar_items(item_id, k)}
7.5 Monitoring Hooks
# simple logging middleware (FastAPI)
from fastapi import Request
import time, json
@app.middleware("http")
async def log_requests(request: Request, call_next):
t0=time.time()
response = await call_next(request)
dur=time.time()-t0
with open("api.log","a") as f:
f.write(json.dumps({"path":request.url.path,"ms":round(dur*1000,2)})+"\n")
return response
/forecast
aur /similar
test karein. api.log
open karke latency dekhein. Deployment ke liye Dockerfile banakar run karein.
8) Day 7 Practice Set — Copy • Paste • Run • See Results
- MLOps: Apne kisi pehle banaye model ko
Pipeline
ke saathjoblib
me save karke FastAPI API banayein (predict + predict_proba + health). - CI: 3 unit tests likhiye + GitHub Actions workflow add karke tests pass karayein.
- Monitoring: KS-test script likhkar
train.csv
vslive.csv
drift detect karein; alert print karein. - Time Series: Apne data par ARIMA vs RandomForest features compare karke MAE report karein.
- PCA+Clustering: 95% variance components choose karke KMeans silhouette optimize karein.
- Recsys: Popularity, Content-based, CF SVD — teenon ka top-5 output compare karein ek user ke liye.
- Interpretability: Permutation importance + PDP (notebook) generate karein; top features discuss karein.
- Mini-Project: BeepShip API ko Docker me run karke cURL tests karein; latency p50/p95 note karein.
Inline Visuals
Quick mental models (crop for slides/blog):