Jump to a question:
# -*- coding: utf-8 -*-
!pip install pandas numpy scikit-learn seaborn matplotlib xgboost Flask imblearn shap lime
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.metrics import (classification_report, confusion_matrix, accuracy_score,
recall_score, f1_score, precision_score, roc_curve, roc_auc_score)
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, IsolationForest
from xgboost import XGBClassifier
from imblearn.over_sampling import SMOTE
import shap
from lime.lime_tabular import LimeTabularExplainer
import warnings
warnings.filterwarnings('ignore')
from google.colab import drive
drive.mount('/content/drive')
DATA_PATH = "/content/drive/MyDrive/Shares/UOW-FinAI/lesson5.csv"
#DATA_PATH="C:\\Users\\erb\\Downloads\\lesson5.csv"
RANDOM_STATE = 42
TEST_SIZE = 0.2
USE_SMOTE = False # Set to True for oversampling
!ls "/content/drive/MyDrive"
df = pd.read_csv(DATA_PATH)
print(f"Original shape: {df.shape}")
print(f"Duplicate: {df.duplicated().sum()}")
df = df.drop_duplicates()
print("After drop duplicates:", df.shape)
print(f"Null values:\n{df.isnull().sum()}")
print(f"\n====== sample data ======")
print(df.head(8))
print(df.loc[1020:1025])
df["txn_amount"] = pd.to_numeric(df["txn_amount"], errors="coerce")
df["txn_amount"] = df["txn_amount"].fillna(df["txn_amount"].median())
df["is_new_device"] = df["is_new_device"].replace({"True": 1, "False": 0, True: 1, False: 0})
df["is_new_device"] = pd.to_numeric(df["is_new_device"], errors="coerce").fillna(0)
for col in ["customer_country", "merchant_country"]:
df[col] = df[col].replace({"HKG": "HK", "Hong Kong": "HK"})
# mapping = {"HKG": "HK", "Hong Kong": "HK"}
# for col in ["customer_country", "merchant_country"]:
# df[col] = df[col].map(mapping).fillna(df[col])
df["txn_time"] = pd.to_datetime(df["txn_time"], errors="coerce")
df["txn_hour"] = df["txn_time"].dt.hour
df["txn_hour"] = df["txn_hour"].fillna(df["txn_hour"].mean())
df["is_weekend"] = df["txn_time"].dt.dayofweek.isin([5, 6]).astype(int)
df["is_night_txn"] = df["txn_hour"].isin([0, 1, 2, 3, 4, 23]).astype(int)
df["is_large_txn"] = (df["txn_amount"] > 10000).astype(int)
df["high_risk_combo"] = (
(df["is_night_txn"] == 1) &
(df["is_cross_border"] == 1) &
(df["is_new_device"] == 1)
).astype(int)
df["risk_score"] = (
df["is_night_txn"] +
df["is_cross_border"] +
df["is_new_device"] +
df["is_large_txn"]
)
print("\n===== Fraud Label Distribution ======")
print(df["fraud_label"].value_counts())
print(df["fraud_label"].value_counts(normalize=True))
features = [
"txn_amount", "is_night_txn", "is_cross_border", "is_new_device",
"txn_hour", "is_weekend", "is_large_txn", "high_risk_combo", "risk_score"
]
X = df[features]
y = df["fraud_label"]
df.to_csv('data_processed.csv', index=False)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE)
# Optional: SMOTE for imbalance
# if USE_SMOTE:
# smote = SMOTE(random_state=RANDOM_STATE)
# X_train, y_train = smote.fit_resample(X_train, y_train)
# print("Applied SMOTE. New training shape:", X_train.shape)
baseline_model = DecisionTreeClassifier(random_state=RANDOM_STATE)
baseline_model.fit(X_train, y_train)
baseline_pred = baseline_model.predict(X_test)
log_model = LogisticRegression(max_iter=2000, random_state=RANDOM_STATE, class_weight='balanced')
log_model.fit(X_train, y_train)
log_pred = log_model.predict(X_test)
forest_model = RandomForestClassifier(n_estimators=1000, random_state=RANDOM_STATE, class_weight='balanced')
forest_model.fit(X_train, y_train)
forest_pred = forest_model.predict(X_test)
xgb_model = XGBClassifier(
n_estimators=100,
max_depth=4,
learning_rate=0.1,
subsample=0.8,
colsample_bytree=0.8,
objective='binary:logistic',
eval_metric='logloss',
random_state=RANDOM_STATE
)
xgb_model.fit(X_train, y_train)
xgb_pred = xgb_model.predict(X_test)
def model_evaluation (y_true, y_pred):
print(f"Accuracy : {accuracy_score(y_true, y_pred):.4f}")
print(f"Precision: {precision_score(y_true, y_pred):.4f}")
print(f"Recall : {recall_score(y_true, y_pred):.4f}")
print(f"F1 Score : {f1_score(y_true, y_pred):.4f}")
print("\nConfusion Matrix:")
print(confusion_matrix(y_true, y_pred))
print("\nClassification Report:")
print(classification_report(y_true, y_pred))
print(f"\n{'='*10} Decision Tree Model {'='*10}")
model_evaluation (y_test, baseline_pred)
print(f"\n{'='*10} Logistic Regression Model {'='*10}")
model_evaluation (y_test, log_pred)
print(f"\n{'='*10} Random Forest Model {'='*10}")
model_evaluation (y_test, forest_pred)
print(f"\n{'='*10} XGBoost Model {'='*10}")
model_evaluation (y_test, xgb_pred)
#? Which model is best for financial fraud detection? Give reasons.
#? Which parameters are most important than others, why?
#? Comment on confusion matrix.
print("\n" + "="*10 + " Isolation Forest (Anomaly Detection) " + "="*10)
iso_model = IsolationForest(
contamination=0.17,
random_state=RANDOM_STATE,
n_estimators=100
)
iso_model.fit(X)
df['anomaly_score'] = iso_model.decision_function(X)
df['anomaly'] = iso_model.predict(X)
print("Anomaly vs Fraud Label Comparison:")
print(pd.crosstab(df['anomaly'], df['fraud_label']))
models = {
"Decision Tree": baseline_model,
"Logistic Regression": log_model,
"Random Forest": forest_model,
"XGBoost": xgb_model
}
print("\n" + "="*10 + " Feature importance " + "="*10)
for model_name, model in models.items():
if model_name in ['Decision Tree', 'Random Forest', 'XGBoost']:
importance = pd.DataFrame({
'feature': features,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
plt.figure(figsize=(10, 6))
sns.barplot(x='importance', y='feature', data=importance)
plt.title(f'Feature Importance - {model_name}')
plt.tight_layout()
plt.show()
print("\n" + "="*10 + " ROC, AUC (Best Model) " + "="*10)
y_prob = xgb_model.predict_proba(X_test)[:, 1]
fpr, tpr, thresholds = roc_curve(y_test, y_prob)
auc_score = roc_auc_score(y_test, y_prob)
print("fpr = ", fpr)
print("tpr = ", tpr)
print("thresholds = ", thresholds)
print(f"\nAUC Score (XGBoost): {auc_score:.4f}")
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f'XGBoost (AUC = {auc_score:.4f})')
plt.plot([0, 1], [0, 1], linestyle='--')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend()
plt.show()
def lime_exp(model_name, model, sample, X_train):
lime_explainer = LimeTabularExplainer(
training_data=X_train.values,
feature_names=X_train.columns.tolist(),
class_names=["Normal", "Fraud"],
mode="classification"
)
print(f"\n", "="*20, " LIME Explanation: ", model_name, " ", "="*20)
lime_exp = lime_explainer.explain_instance(
data_row=sample.values,
predict_fn=model.predict_proba,
num_features=6
)
print(lime_exp.as_list())
fig = lime_exp.as_pyplot_figure()
plt.title(f"LIME Explanation - {model_name}")
plt.tight_layout()
plt.show()
def shap_exp(model_name, model, sample_index, x_train, x_test):
print("\n", "="*20, "SHAP Explanation: ", model_name, " ", "="*20)
if model_name in ["Decision Tree", "Random Forest", "XGBoost"]:
explainer = shap.Explainer(model)
shap_values = explainer.shap_values(x_test)
print(type(shap_values))
print(np.array(shap_values).shape)
if np.array(shap_values).ndim == 3:
shap_values_fraud = shap_values[:, :, 1]
base_value = explainer.expected_value[1]
else:
shap_values_fraud = shap_values
base_value = explainer.expected_value
shap.summary_plot(
shap_values_fraud,
x_test,
feature_names=x_test.columns,
show=False
)
plt.tight_layout()
plt.show()
print("\n", "="*20, " Waterfall plot: ", model_name, " ", "="*20)
shap.plots.waterfall(
shap.Explanation(
values=shap_values_fraud[sample_index],
base_values=base_value,
data=x_test.iloc[sample_index],
feature_names=x_test.columns
),
show=False
)
plt.tight_layout()
plt.show()
elif model_name == "Logistic Regression":
x_train_lr = x_train.astype(float)
x_test_lr = x_test.astype(float)
explainer = shap.LinearExplainer(model, x_train_lr)
shap_values = explainer.shap_values(x_test_lr)
shap_values = np.asarray(shap_values, dtype=float)
print(type(shap_values))
print(shap_values.shape)
print(shap_values.dtype)
shap.summary_plot(
shap_values,
x_test_lr,
feature_names=x_test_lr.columns
)
print("\n", "="*20, " Waterfall plot : ", model_name, " ", "="*20)
shap.plots.waterfall(
shap.Explanation(
values=shap_values[sample_index],
base_values=float(explainer.expected_value),
data=x_test_lr.iloc[sample_index].values.astype(float),
feature_names=x_test_lr.columns
),
show=False
)
plt.tight_layout()
plt.show()
SAMPLE_INDEX=2
sample = X_test.iloc[SAMPLE_INDEX]
for model_name, model in models.items():
lime_exp(model_name, model, sample, X_train)
for model_name, model in models.items():
shap_exp(model_name, model, SAMPLE_INDEX, X_train, X_test)
# lesson8.py origninal
# #print(df.head())
# #print(df.duplicated().sum())
# df = df.drop_duplicates()
# #print(df.isnull().sum())
# df["txn_amount"] = pd.to_numeric(df["txn_amount"], errors="coerce")
# df["txn_amount"] = df["txn_amount"].fillna(df["txn_amount"].median())
# df["is_new_device"] = df["is_new_device"].replace({
# "True": 1,
# "False": 0,
# True: 1,
# False: 0,
# })
# df["is_new_device"] = pd.to_numeric(df["is_new_device"], errors="coerce")
# df["is_new_device"] = df["is_new_device"].fillna(0)
# df["customer_country"] = df["customer_country"].replace({
# "HKG": "HK",
# "Hong Kong": "HK"
# })
# df["merchant_country"] = df["merchant_country"].replace({
# "HKG": "HK",
# "Hong Kong": "HK"
# })
# df["txn_time"] = pd.to_datetime(df["txn_time"], errors="coerce")
# df["txn_hour"] = df["txn_time"].dt.hour
# df["txn_hour"] = df["txn_hour"].fillna(df["txn_hour"].mean())
# df["is_weekend"] = df["txn_time"].dt.day_of_week.isin([5,6]).astype(int)
# df["is_large_txn"] = (df["txn_amount"] > 10000).astype(int)
# df["high_risk_combo"] = (
# (df["is_night_txn"].astype(int) == 1) &
# (df["is_cross_border"].astype(int) == 1) &
# (df["is_new_device"].astype(int) ==1)
# )
# df["high_risk_combo"] = df["high_risk_combo"].astype(int)
# df["risk_score"] = (
# df["is_night_txn"].astype(int) +
# df["is_cross_border"].astype(int) +
# df["is_large_txn"].astype(int) +
# df["is_new_device"].astype(int) +
# df["is_large_txn"].astype(int)
# )
# print(df["fraud_label"].value_counts())
# print(df["fraud_label"].value_counts(normalize=True))
# features = [
# "txn_amount", "is_night_txn", "is_cross_border", "is_new_device",
# "txn_hour", "is_weekend", "is_large_txn", "high_risk_combo", "risk_score"
# ]
# x = df[features]
# y = df["fraud_label"]
# #df.to_csv('/Users/christophertang/Downloads/lesson5.outout.csv', index=False)
# x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)
# baseline_model = DecisionTreeClassifier(random_state=42)
# baseline_model.fit(x_train, y_train)
# baseline_pred = baseline_model.predict(x_test)
# log_model = LogisticRegression(max_iter=2000, random_state=42)
# log_model.fit(x_train, y_train)
# log_pred = log_model.predict(x_test)
# forest_model = RandomForestClassifier(n_estimators=1000, random_state=42)
# forest_model.fit(x_train, y_train)
# forest_pred = forest_model.predict(x_test)
# xgb_model = XGBClassifier(
# n_estimators=100,
# max_depth=4,
# learning_rate=0.1,
# subsample=0.8,
# colsample_bytree=0.8,
# objective='binary:logistic',
# eval_metric='logloss',
# random_state=42
# )
# xgb_model.fit(x_train, y_train)
# xgb_pred = xgb_model.predict(x_test)
# print("Accuracy:", accuracy_score(y_test, baseline_pred))
# print("Recall:", recall_score(y_test, baseline_pred))
# print("F1:", f1_score(y_test, baseline_pred))
# print(confusion_matrix(y_test, baseline_pred))
# print(classification_report(y_test, baseline_pred))
# '''
# baseline_importance_df = pd.DataFrame({
# 'feature': x_train.columns,
# 'importance': baseline_model.feature_importances_
# }).sort_values('importance', ascending=False)
# plt.barh(baseline_importance_df['feature'], baseline_importance_df['importance'])
# plt.xlabel("importance")
# plt.ylabel("Feature")
# plt.tight_layout()
# plt.show()
# '''
# '''
# important_features = baseline_importance_df[
# baseline_importance_df['importance'] > 0.05
# ]['feature'].tolist()
# print(features)
# print(important_features)
# x_train_important_selected = x_train[important_features]
# x_test_important_selected = x_test[important_features]
# tree_important_model = DecisionTreeClassifier(random_state=42)
# tree_important_model.fit(x_train_important_selected, y_train)
# tree_important_pred = tree_important_model.predict(x_test_important_selected)
# print("Accuracy:", accuracy_score(y_test, tree_important_pred))
# print("Recall:", recall_score(y_test, tree_important_pred))
# print("F1:", f1_score(y_test, tree_important_pred))
# print(confusion_matrix(y_test, tree_important_pred))
# print(classification_report(y_test, tree_important_pred))
# '''
# tree_model = DecisionTreeClassifier(random_state=42)
# param_grid = {
# 'max_depth': [3, 5, 10, None],
# 'min_samples_split': [2, 5, 10],
# 'min_samples_leaf': [1, 2, 4],
# 'criterion': ['gini', 'entropy']
# }
# grid_search = GridSearchCV(
# tree_model, param_grid,
# scoring='f1', n_jobs=-1,
# cv = StratifiedKFold(
# n_splits=5,
# shuffle=True,
# random_state=42
# )
# )
# grid_search.fit(x_train, y_train)
# print("Best parameter:", grid_search.best_params_)
# print("best score:", grid_search.best_score_)
# best_tree_model = grid_search.best_estimator_
# best_tree_pred = best_tree_model.predict(x_test)
# print("Accuracy:", accuracy_score(y_test, best_tree_pred))
# print("Recall:", recall_score(y_test, best_tree_pred))
# print("F1:", f1_score(y_test, best_tree_pred))
# print(confusion_matrix(y_test, best_tree_pred))
# print(classification_report(y_test, best_tree_pred))
# '''
# log_importance_df = pd.DataFrame({
# 'feature': x_train.columns,
# 'importance': log_model.coef_[0]
# }).sort_values('importance', ascending=False)
# plt.barh(log_importance_df['feature'], log_importance_df['importance'])
# plt.xlabel("importance")
# plt.ylabel("Feature")
# plt.tight_layout()
# fig = plt.gcf()
# plt.show()
# fig.show()
# print(log_model.coef_)
# print("Accuracy:", accuracy_score(y_test, log_pred))
# print("Recall:", recall_score(y_test, log_pred))
# print("F1:", f1_score(y_test, log_pred))
# print(confusion_matrix(y_test, log_pred))
# print(classification_report(y_test, log_pred))
# print("Accuracy:", accuracy_score(y_test, forest_pred))
# print("Recall:", recall_score(y_test, forest_pred))
# print("F1:", f1_score(y_test, forest_pred))
# print(confusion_matrix(y_test, forest_pred)
# print(classification_report(y_test, forest_pred))
# print("Accuracy:", accuracy_score(y_test, xgb_pred))
# print("Recall:", recall_score(y_test, xgb_pred))
# print("F1:", f1_score(y_test, xgb_pred))
# print(confusion_matrix(y_test, xgb_pred))
# print(classification_report(y_test, xgb_pred))
# '''
# print("============isolation ===")
# iso_model = IsolationForest(
# contamination=0.17,
# random_state=42,
# n_estimators=100
# )
# iso_model.fit(x)
# df["anomaly_score"] = iso_model.decision_function(x)
# df["anomaly"] = iso_model.predict(x)
# df.to_csv('lesson5.outout.csv', index=False)
# '''
# # Visualization of the results
# plt.figure(figsize=(10, 5))
# # Plot normal instances
# normal = df[df['anomaly'] == 1]
# plt.scatter(normal.index, normal['anomaly_score'], label='Normal')
# # Plot anomalies
# anomalies = df[df['anomaly'] == -1]
# plt.scatter(anomalies.index, anomalies['anomaly_score'], label='Anomaly')
# plt.xlabel("Instance")
# plt.ylabel("Anomaly Score")
# plt.legend()
# plt.show()
# '''
# y_prob = baseline_model.predict_proba(x_test)[:, 1] #ROC, AUC
# fpr, tpr, thresholds = roc_curve(y_test, y_prob)
# auc_score = roc_auc_score(y_test, y_prob)
# print("fpr = ", fpr)
# print("tpr = ", tpr)
# print("thresholds = ", thresholds)
# print("auc_score = ", auc_score)
# plt.plot(fpr, tpr)
# plt.plot([0, 1], [0, 1], linestyle='--')
# plt.xlabel("False Positive Rate")
# plt.ylabel("True Positive Rate")
# plt.title("ROC Curve")
# plt.show()
# models = {
# "Decision Tree": baseline_model,
# "Logistic Regression": log_model,
# "Random Forest": forest_model,
# "XGBoost": xgb_model
# }
# sample_index = 2 # fault case
# sample = x_test.iloc[sample_index]
# # LIME needs numeric training data
# lime_explainer = LimeTabularExplainer(
# training_data=x_train.values,
# feature_names=x_train.columns.tolist(),
# class_names=["Normal", "Fraud"],
# mode="classification"
# )
# for model_name, model in models.items():
# print("=" * 80)
# print("LIME Explanation:", model_name)
# print("=" * 80)
# lime_exp = lime_explainer.explain_instance(
# data_row=sample.values,
# predict_fn=model.predict_proba,
# num_features=6
# )
# print(lime_exp.as_list())
# fig = lime_exp.as_pyplot_figure()
# plt.title(f"LIME Explanation - {model_name}")
# plt.tight_layout()
# plt.show()
# for model_name, model in models.items():
# print("=" * 80)
# print("SHAP Explanation:", model_name)
# print("=" * 80)
# if model_name in ["Decision Tree", "Random Forest", "XGBoost"]:
# explainer = shap.TreeExplainer(model)
# shap_values = explainer.shap_values(x_test)
# print(type(shap_values))
# print(np.array(shap_values).shape)
# # SHAP 0.52 binary classification:
# # possible shape = (rows, features, 2)
# if np.array(shap_values).ndim == 3:
# shap_values_fraud = shap_values[:, :, 1]
# base_value = explainer.expected_value[1]
# else:
# shap_values_fraud = shap_values
# base_value = explainer.expected_value
# shap.summary_plot(
# shap_values_fraud,
# x_test,
# feature_names=x_test.columns,
# show=False
# )
# plt.tight_layout()
# plt.show()
# shap.plots.waterfall(
# shap.Explanation(
# values=shap_values_fraud[sample_index],
# base_values=base_value,
# data=x_test.iloc[sample_index],
# feature_names=x_test.columns
# ),
# show = False
# )
# plt.tight_layout()
# plt.show()
# elif model_name == "Logistic Regression":
# # 重要:確保所有 input 都係 numeric float
# x_train_lr = x_train.astype(float)
# x_test_lr = x_test.astype(float)
# explainer = shap.LinearExplainer(
# model,
# x_train_lr
# )
# shap_values = explainer.shap_values(x_test_lr)
# # 重要:確保 shap_values 都係 float array
# shap_values = np.asarray(shap_values, dtype=float)
# print(type(shap_values))
# print(shap_values.shape)
# print(shap_values.dtype)
# shap.summary_plot(
# shap_values,
# x_test_lr,
# feature_names=x_test_lr.columns
# )
# shap.plots.waterfall(
# shap.Explanation(
# values=shap_values[sample_index],
# base_values=float(explainer.expected_value),
# data=x_test_lr.iloc[sample_index].values.astype(float),
# feature_names=x_test_lr.columns
# ),
# show = False
# )
# plt.tight_layout()
# plt.show()
#
Here is where you can write the answer to that question.
Here is where you can write the answer to that question.
Here is where you can write the answer to that question.
Here is where you can write the answer to that question.