Fit baseline model using flight delay averages

  • Takes the model input data for flight delays
  • Split data based on external train/test data file
  • Define baseline average model
  • Evaluate model
  • Save model as pickle
  • – save to mlflow –
  • Write prediction prediction output to csv

Parameters


  • input_file: Filepath of model input data of flight delays
  • train_test_file: Filepath of train/test csv file with columns [“id”, “model_set”]
  • output_file: Filepath to write output csv file with minimal modelling input

Returns


Trained baseline model that simply predicts the average flight delay from the training data in all predictions.

[42]:
# model params
input_file = "../lvt-schiphol-assignment-snakemake/data/model_input/delays_base_input.csv"
train_test_file = "../lvt-schiphol-assignment-snakemake/data/model_input/train_test__0.2__sample.csv"
output_predictions = "./predictions.csv"

# mlflow params
log_mlflow = True
mlflow_tracking_uri = "../mlruns"
mlflow_experiment = "from_script"
mlflow_run = "baseline_avg"
[43]:
from pathlib import Path
output_dir = Path(output_predictions).parent.absolute()
output_dir
[43]:
WindowsPath('C:/Users/lodew/qualogy/schiphol-code-assignment/scripts')

Imports

[44]:
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.base import BaseEstimator, TransformerMixin

import matplotlib.pyplot as plt
import seaborn as sns

import sys
sys.path.append("../")

from src.data.google_storage_io import read_csv_data, write_csv_data
from src.evaluation.metrics import get_regression_metrics
from src.evaluation.regression import make_regression_metrics_by_group, make_regression_metrics_by_datetime
from src.evaluation.predictions import make_predictions_dataframe
[45]:
plt.rcParams["figure.figsize"] = (16, 8)

Read data

[46]:
%%time
df = read_csv_data(input_file)
train_test = read_csv_data(train_test_file)
Reading file from local directory
File:   ../lvt-schiphol-assignment-snakemake/data/model_input/delays_base_input.csv

Reading file from local directory
File:   ../lvt-schiphol-assignment-snakemake/data/model_input/train_test__0.2__sample.csv

Wall time: 727 ms
[47]:
%%time

def split_train_test(df, train_test, target="scheduleDelaySeconds"):
    # merge by `id` and group by train/test set labels
    df_set_groups = pd.merge(df, train_test, on="id", how="left").groupby("model_set")

    # get data per train/test set
    df_train, df_test = df_set_groups.get_group("train"), df_set_groups.get_group("test")

    # split target from features
    X_train, y_train = df_train.drop(columns=[target]), df_train[target]
    X_test, y_test = df_test.drop(columns=[target]),  df_test[target]

    print(f"""
        Split data shapes
        Input: {df.shape}
        Train: {X_train.shape},\t {y_train.shape}
        Test:  {X_test.shape},\t {y_test.shape}
        """)

    # assert that we haven't dropped values at this stage
    # failed assert could indicate duplicate ids found in the data
    assert (len(X_train) + len(X_test)) == len(df)

    return X_train, X_test, y_train, y_test

# split data
X_train, X_test, y_train, y_test = split_train_test(df, train_test)



        Split data shapes
        Input: (477776, 8)
        Train: (382220, 8),      (382220,)
        Test:  (95556, 8),       (95556,)

Wall time: 536 ms

Prediction model

Define model

[48]:
class AverageBaseline(BaseEstimator):
    def __init__(self):
        self._average_y = None

    @property
    def average_y(self):
        return self._average_y

    def fit(self, X, y):
        """calculate the average values of `y` and save  internally"""
        self._average_y = np.mean(y)
        return self

    def predict(self, X):
        """return trained average y value for all observations in X"""
        return np.array([self.average_y] * X.shape[0])

Train model

[49]:
# train
avg_baseline = AverageBaseline().fit(X_train, y_train)

Evaluate model

[50]:
def datetime_to_date(dt):
    return pd.to_datetime(dt, utc=True).dt.date

def datetime_to_date_hour(dt):
    return pd.to_datetime(dt, utc=True).dt.floor('H')

[51]:
# create predictions on train/test sets
df_predictions = make_predictions_dataframe(avg_baseline, X_train, X_test, y_train, y_test)
df_predictions
[51]:
id scheduleDateTime y yhat error model_set
1 123414479288269149 2018-01-01 06:00:00+01:00 -98.0 859.825258 957.825258 train
2 123414479666542945 2018-01-01 06:05:00+01:00 -300.0 859.825258 1159.825258 train
5 123414479666545913 2018-01-01 06:20:00+01:00 611.0 859.825258 248.825258 train
6 123414478696233855 2018-01-01 06:20:00+01:00 611.0 859.825258 248.825258 train
7 123414479288370681 2018-01-01 06:20:00+01:00 180.0 859.825258 679.825258 train
... ... ... ... ... ... ...
477742 124763275285891683 2018-07-12 17:15:00+02:00 115.0 859.825258 744.825258 test
477748 124763299563775951 2018-07-12 17:15:00+02:00 -144.0 859.825258 1003.825258 test
477761 124763272032454817 2018-07-12 17:20:00+02:00 423.0 859.825258 436.825258 test
477765 124763271776654663 2018-07-12 17:25:00+02:00 80.0 859.825258 779.825258 test
477775 124763271129903067 2018-07-12 17:50:00+02:00 -8690.0 859.825258 9549.825258 test

477776 rows × 6 columns

Calculate regression metrics

[52]:
%%time

df_metrics_long = make_regression_metrics_by_group(df_predictions, group_cols = ["model_set"])
df_daily_metrics_long = make_regression_metrics_by_datetime(df_predictions, freq="D", alias="schedule_date")
df_hourly_metrics_long = make_regression_metrics_by_datetime(df_predictions, freq="H", alias="schedule_date")
Wall time: 15.2 s
[53]:
df_metrics_long.head()
[53]:
model_set variable value
0 test mae 889.598834
1 train mae 886.865285
2 test mape 103.462747
3 train mape 103.144828
4 test rmse 2273.616920
[54]:
import plotly.express as px
fig = px.line(df_hourly_metrics_long, x="schedule_date", y="value", facet_row="variable", color="model_set",
             width=1200, height=1200, title="Hourly prediction metrics")
# Add range slider
fig.update_layout(
    xaxis=dict(
        rangeslider=dict(
            visible=True
        ),
        type="date"
    ),
    hovermode="x"
)
fig.update_yaxes(matches=None)
# fig.update_xaxes(matches=None)
fig.show()

Plot some prediction results

[55]:
# def predictions_daily_mean(df_predictions):
#     df_predictions["schedule_date"] = datetime_to_date(df_predictions["scheduleDateTime"])
#     df_predictions = df_predictions.drop(columns="id")
#     df_daily_mean = df_predictions.groupby(["model_set", "schedule_date"]).mean().reset_index()
#     return df_daily_mean

# def predictions_hourly_mean(df_predictions):
#     df_predictions["schedule_date"] = datetime_to_date_hour(df_predictions["scheduleDateTime"])
#     df_predictions = df_predictions.drop(columns="id")
#     df_daily_mean = df_predictions.groupby(["model_set", "schedule_date"]).mean().reset_index()
#     return df_daily_mean

# def get_safe_ylim(y, q=0.05, q2=None):
#     if q2 is None:
#         q2 = 1 - q
#     return (np.quantile(y, q), np.quantile(y, q2))


# df_daily_mean = predictions_daily_mean(df_predictions)
# y_ylim = get_safe_ylim(df_daily_mean["y"])
# error_ylim = get_safe_ylim(df_daily_mean["error"])

# df_daily_mean[["schedule_date", "y", "yhat"]].plot(x="schedule_date", ylim=y_ylim)
# df_daily_mean[["schedule_date", "error"]].plot(x="schedule_date", ylim=error_ylim)

# df_hourly_mean = predictions_hourly_mean(df_predictions)
# y_ylim = get_safe_ylim(df_hourly_mean["y"])
# error_ylim = get_safe_ylim(df_hourly_mean["error"])

# df_hourly_mean[["schedule_date", "y", "yhat"]].plot(x="schedule_date", ylim=y_ylim)
# df_hourly_mean[["schedule_date", "error"]].plot(x="schedule_date", ylim=error_ylim)

Write output to output directory

[56]:
import joblib, pickle
from pathlib import Path
[58]:
model_file = str(Path(output_dir, "model.pkl"))
predictions_file = str(Path(output_dir, "predictions.csv"))
overall_metrics_file = str(Path(output_dir, "overall_metrics_long.csv"))
daily_metrics_file = str(Path(output_dir, "daily_metrics_long.csv"))
hourly_metrics_file = str(Path(output_dir, "hourly_metrics_long.csv"))

Pickle output files for mlflow artifacts

  • Pipeline serialized with joblib
  • Model data or sample thereof
[57]:
joblib.dump(avg_baseline, model_file)
[57]:
['C:\\Users\\lodew\\qualogy\\schiphol-code-assignment\\scripts\\model.pkl']

Write output to CSV

Local or Google Storage is both handled

[59]:
# write output file
write_csv_data(df_predictions,       predictions_file, index=False)
write_csv_data(df_metrics_long,       overall_metrics_file, index=False)
write_csv_data(df_daily_metrics_long, daily_metrics_file, index=False)
write_csv_data(df_hourly_metrics_long, hourly_metrics_file, index=False)
Writing file to local directory
File:   C:\Users\lodew\qualogy\schiphol-code-assignment\scripts\predictions.csv

Writing file to local directory
File:   C:\Users\lodew\qualogy\schiphol-code-assignment\scripts\overall_metrics_long.csv

Writing file to local directory
File:   C:\Users\lodew\qualogy\schiphol-code-assignment\scripts\daily_metrics_long.csv

Writing file to local directory
File:   C:\Users\lodew\qualogy\schiphol-code-assignment\scripts\hourly_metrics_long.csv

Log to mlflow

[60]:
import mlflow

mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_experiment(mlflow_experiment)


print(f"Logging to experiment: {mlflow_experiment}")
print(f"Run name: {mlflow_run}")

with mlflow.start_run(run_name=mlflow_run):
    mlflow.log_param("Input file", input_file)
    mlflow.log_param("Train-test file", train_test_file)

    # Model metadata
    for idx, metric_row in df_metrics_long.iterrows():
        metric_name = "__".join([metric_row["variable"], metric_row["model_set"]])
        mlflow.log_metric(metric_name, metric_row["value"])

    # log artifacts
    print("Logging artifacts")
    mlflow.log_artifact(model_file)
    mlflow.log_artifact(predictions_file)
    mlflow.log_artifact(overall_metrics_file)
    mlflow.log_artifact(daily_metrics_file)
    mlflow.log_artifact(hourly_metrics_file)

Logging to experiment: test_baseline_average
Run name: baseline_avg
Logging artifacts

Overview of the output data

[61]:
df_predictions.info()
<class 'pandas.core.frame.DataFrame'>
Int64Index: 477776 entries, 1 to 477775
Data columns (total 7 columns):
 #   Column            Non-Null Count   Dtype
---  ------            --------------   -----
 0   id                477776 non-null  int64
 1   scheduleDateTime  477776 non-null  object
 2   y                 477776 non-null  float64
 3   yhat              477776 non-null  float64
 4   error             477776 non-null  float64
 5   model_set         477776 non-null  object
 6   schedule_date     477776 non-null  datetime64[ns, UTC]
dtypes: datetime64[ns, UTC](1), float64(3), int64(1), object(2)
memory usage: 29.2+ MB