Rolling window features of delays

This notebook will take a DataFrame with scheduleDateTime or optionally a different column

  1. Calculate time features from DateTime values
  2. Output pd.DataFrame with id columns + time feature columns
  3. Write output to CSV file

Parameters

  • input_file: Filepath of model input data or flights data with column scheduleDelaySeconds
  • output_file: Filepath to CSV file to write output features to - freq: Frequency of rolling

window features. Default 10T for 10-minute intervals - window: Window size to calculate rolling window features. Default 2H for 2-hour windows

The window argument is passed a pandas DateOffset frequency string. For valid arguments to window look here, https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#dateoffset-objects
## Returns
Output CSV file with rolling window features at every timestep from the beginning to the end of the input_file date range, at an interval of freq.
At each timestamp the average delay in the previous window based on the DateTime, with window size window.

Output format

timeStamp                  |    n    |  sum_delays  |  mean_delay
--------------------------------------------------------------------
2017-12-31 15:00:00+01:00  |   1.0   |     75600.0  |  75600.000000
2017-12-31 15:10:00+01:00  |   1.0   |     75600.0  |  75600.000000
2017-12-31 15:20:00+01:00  |   1.0   |     75600.0  |  75600.000000
2017-12-31 15:30:00+01:00  |   1.0   |     75600.0  |  75600.000000
2017-12-31 15:40:00+01:00  |   1.0   |     75600.0  |  75600.000000
  ...                      |   ...   |       ...    |        ...
2018-07-12 17:10:00+02:00  |  379.0  |    223135.0  |    588.746702
2018-07-12 17:20:00+02:00  |  380.0  |    209786.0  |    552.068421
2018-07-12 17:30:00+02:00  |  363.0  |    207260.0  |    570.964187
2018-07-12 17:40:00+02:00  |  349.0  |    185570.0  |    531.719198
2018-07-12 17:50:00+02:00  |  336.0  |    164110.0  |    488.422619

Script parameters

[3]:
# parameters
input_file = "gs://lvt-schiphol-assignment-snakemake/data/model_input/delays_base_input.csv"
output_file = "rolling_delay_features__test.csv"
freq = "10T"
window = "30T+2H+1D"

Imports

[4]:
import pandas as pd
import numpy as np

import sys
sys.path.append("../")

from src.data.google_storage_io import read_csv_data, write_csv_data

Load data

[5]:
%%time

df = read_csv_data(input_file)
df.head()
Reading file from Google Storage
Bucket: lvt-schiphol-assignment-snakemake
File:   data/model_input/delays_base_input.csv

Wall time: 2.44 s
[5]:
id aircraftRegistration airlineCode terminal serviceType scheduleDateTime actualOffBlockTime scheduleDelaySeconds
0 123414481790510775 PHPXB 148.0 NaN NaN 2018-01-01 03:30:00+01:00 2018-01-01 03:22:00+01:00 -480.0
1 123414479288269149 PHHSJ 164.0 1.0 J 2018-01-01 06:00:00+01:00 2018-01-01 05:58:22+01:00 -98.0
2 123414479666542945 PHHSG 100.0 1.0 J 2018-01-01 06:05:00+01:00 2018-01-01 06:00:00+01:00 -300.0
3 123414479288365061 PHHSG 164.0 1.0 J 2018-01-01 06:05:00+01:00 2018-01-01 06:00:00+01:00 -300.0
4 123414479288274329 PHHXB 164.0 1.0 J 2018-01-01 06:15:00+01:00 2018-01-01 06:26:34+01:00 694.0

Rolling functions

After more development these should go into common modules to be reused.

[6]:
def bin_delays_by_schedule(df, freq='D'):
    # datetime floored to 10-minute interval
    df["timeStamp"] = pd.to_datetime(df["scheduleDateTime"], utc=True).dt.tz_convert("Europe/Amsterdam").dt.floor(freq)

    # groupby each floored datetime and keep n values + total delay per bin
    df_floor = df \
        .groupby("timeStamp")["scheduleDelaySeconds"] \
        .apply(lambda x: pd.DataFrame(dict(n=len(x), sum_delays=x.sum()), index=[x.name])) \
        .reset_index(level=0) \
        .set_index("timeStamp")
    return df_floor

def get_rolling_mean_delay(df_floor, freq='D', window='2D'):

    # Not every 10 minute interval has flights, in those cases we fill in timestamps
    complete_date_range = pd.date_range(df_floor.index.min(), df_floor.index.max(),
                                        freq=freq)

    # list of dates to fill because they are not in df_floor yet
    tofill_date_range = complete_date_range[~complete_date_range.isin(df_floor.index)]

    # concatenate df_floor with missing timestamps using n=0, sum_delays=0
    df_floor = pd.concat([
        df_floor,
        pd.DataFrame(dict(n=0,
                          sum_delays=0,
                          timeStamp=tofill_date_range
                    )).set_index("timeStamp")
    ]).sort_values("timeStamp")

    # calculate mean from sum and total n. If n=0, we get a NaN mean and fill with 0
    df_rolling_mean = df_floor.rolling(window).sum().assign(
        mean_delay = lambda x: (x["sum_delays"] / x["n"].replace(0, 1)))

    return df_rolling_mean

Note on DATA LEAKAGE

There may be data leakage introduced in this features in the current implementation(!)

By ways of example, assume window=2hours and freq=10minutes.

  • The time is now 18:30 and we want to predict delays for 2hours from now at 20:30
  • We want to calculate the average delay of the last 2 hours
  • A flight was scheduled at 18:00 and has still not left yet at 18:30
  • In our data we know that this flight at 18:00 will have a delay of say 1 hour
  • At the time of prediction, the only information we have is that the flight is at least 30minutes delayed

When calculating the average delay in our current implementation, we are leaking knowledge by using the fact that the flight at 18:00 had a 1-hour delay, even though this is not known at the time of prediction at 18:30(!)


Alternatively we can round delays of flights that have not yet left at the time of calculating mean-delay over the last {window}. In real-time we can then calculate the same values and therefore not introduce data leakage.

With more time permitted you could make a prediction of delay of flights that have not yet left, based on all Other features. Then use that prediction to fill in our rolling-window timeseries.

Calculate rolling features

First implementation took forever to determine rolling windows.

Implemented this step in PySpark but did not want to force a pyspark dependency unless we need to.

Then refactored in Pandas with sufficient speed. Current implementation does the trick.

[7]:
%%time
df_floor = bin_delays_by_schedule(df, freq=freq)
df_floor.head()
Wall time: 25 s
[7]:
n sum_delays
timeStamp
2018-01-01 03:30:00+01:00 1 -480.0
2018-01-01 06:00:00+01:00 3 -698.0
2018-01-01 06:10:00+01:00 1 694.0
2018-01-01 06:20:00+01:00 5 2193.0
2018-01-01 06:30:00+01:00 5 2121.0
[8]:
%%time

if isinstance(window, list):
    window_list = window
elif '+' in window:
    window_list = window.split('+')
else:
    window_list = [window]

# add rolling features for all window sizes
df_rolling_features = pd.concat([get_rolling_mean_delay(df_floor, freq=freq, window=window).add_suffix(f"__{window}")
                                     for window in window_list], axis=1) \
                        .reset_index()
df_rolling_features
Wall time: 47 ms
[8]:
timeStamp n__30T sum_delays__30T mean_delay__30T n__2H sum_delays__2H mean_delay__2H n__1D sum_delays__1D mean_delay__1D
0 2018-01-01 03:30:00+01:00 1.0 -480.0 -480.000000 1.0 -480.0 -480.000000 1.0 -480.0 -480.000000
1 2018-01-01 03:40:00+01:00 1.0 -480.0 -480.000000 1.0 -480.0 -480.000000 1.0 -480.0 -480.000000
2 2018-01-01 03:50:00+01:00 1.0 -480.0 -480.000000 1.0 -480.0 -480.000000 1.0 -480.0 -480.000000
3 2018-01-01 04:00:00+01:00 0.0 0.0 0.000000 1.0 -480.0 -480.000000 1.0 -480.0 -480.000000
4 2018-01-01 04:10:00+01:00 0.0 0.0 0.000000 1.0 -480.0 -480.000000 1.0 -480.0 -480.000000
... ... ... ... ... ... ... ... ... ... ...
27724 2018-07-12 17:10:00+02:00 102.0 51057.0 500.558824 379.0 223135.0 588.746702 2715.0 2022369.0 744.887293
27725 2018-07-12 17:20:00+02:00 71.0 15875.0 223.591549 380.0 209786.0 552.068421 2715.0 2025047.0 745.873665
27726 2018-07-12 17:30:00+02:00 42.0 9169.0 218.309524 363.0 207260.0 570.964187 2692.0 2015030.0 748.525260
27727 2018-07-12 17:40:00+02:00 24.0 2197.0 91.541667 349.0 185570.0 531.719198 2685.0 1998027.0 744.144134
27728 2018-07-12 17:50:00+02:00 2.0 -10625.0 -5312.500000 336.0 164110.0 488.422619 2669.0 1986413.0 744.253653

27729 rows × 10 columns

Check assumption that we have no NaN values in our data

[9]:
assert df["scheduleDelaySeconds"].shape == df["scheduleDelaySeconds"].dropna().shape
df["scheduleDelaySeconds"].shape, df["scheduleDelaySeconds"].dropna().shape
[9]:
((477776,), (477776,))
[10]:
assert df_floor["sum_delays"].shape == df_floor["sum_delays"].dropna().shape
df_floor["sum_delays"].shape, df_floor["sum_delays"].dropna().shape
[10]:
((20532,), (20532,))
[11]:
# check separately for each
assert df_rolling_features.shape == df_rolling_features.dropna().shape
df_rolling_features.shape, df_rolling_features.dropna().shape

[11]:
((27729, 10), (27729, 10))

Show output features

[12]:
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go

def px_facet_variable(df, x, y_cols, title=None, width=1200, height=800, log_transform=False):
    # long format with variable values in 'y' and name in 'variable'
    df_plot = pd.concat([
        df[[x, y_col]].assign(variable=y_col).rename(columns={y_col: "y"})
        for y_col in y_cols
    ]).sort_values(x)

    if log_transform:
        df_plot["y"] = np.log(df_plot["y"] + 1)

    # facet plot of each variable in a row
    fig = px.line(df_plot[[x, "y", "variable"]], x=x, y="y", facet_row="variable",
                  title=f"{title}{' [Log-transformed]' if log_transform else None}",
                  width=width, height=height)

    # Add range slider assuming 'x' is a date or datetime index
    fig.update_layout(
        xaxis=dict(
            rangeslider=dict(
                visible=True
            ),
            type="date"
        ),
        hovermode="x"
    )

    # independent y-axes
    fig.update_yaxes(matches=None)
    fig.for_each_annotation(lambda a: a.update(text=a.text.split("=")[-1]))
    return fig

# plotly figure of rolling features
flatten = lambda l: [item for sublist in l for item in sublist]
n_mean_columns = flatten([[f"n__{w}", f"mean_delay__{w}"] for w in window_list])
px_facet_variable(df_rolling_features, "timeStamp", n_mean_columns,
                  title="Rolling window features (window={w})")

Write output to CSV

Local or Google Storage is both handled

[13]:
# # write output file
write_csv_data(df_rolling_features, output_file, index=False)
Writing file to local directory
File:   rolling_delay_features__test.csv

[14]:
df_rolling_features.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 27729 entries, 0 to 27728
Data columns (total 10 columns):
 #   Column           Non-Null Count  Dtype
---  ------           --------------  -----
 0   timeStamp        27729 non-null  datetime64[ns, Europe/Amsterdam]
 1   n__30T           27729 non-null  float64
 2   sum_delays__30T  27729 non-null  float64
 3   mean_delay__30T  27729 non-null  float64
 4   n__2H            27729 non-null  float64
 5   sum_delays__2H   27729 non-null  float64
 6   mean_delay__2H   27729 non-null  float64
 7   n__1D            27729 non-null  float64
 8   sum_delays__1D   27729 non-null  float64
 9   mean_delay__1D   27729 non-null  float64
dtypes: datetime64[ns, Europe/Amsterdam](1), float64(9)
memory usage: 2.1 MB