Rolling window features of delays¶
This notebook will take a DataFrame with scheduleDateTime or optionally a different column
- Calculate time features from DateTime values
- Output pd.DataFrame with id columns + time feature columns
- Write output to CSV file
Parameters¶
window features. Default |
The window argument is passed a pandas DateOffset frequency string. For valid arguments
to window look here,
https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#dateoffset-objects |
| ## Returns |
Output CSV file with rolling window features at every timestep from the beginning to the end of
the input_file date range, at an interval of freq. |
At each timestamp the average delay in the previous window based on the DateTime, with window
size window. |
Output format
timeStamp | n | sum_delays | mean_delay
--------------------------------------------------------------------
2017-12-31 15:00:00+01:00 | 1.0 | 75600.0 | 75600.000000
2017-12-31 15:10:00+01:00 | 1.0 | 75600.0 | 75600.000000
2017-12-31 15:20:00+01:00 | 1.0 | 75600.0 | 75600.000000
2017-12-31 15:30:00+01:00 | 1.0 | 75600.0 | 75600.000000
2017-12-31 15:40:00+01:00 | 1.0 | 75600.0 | 75600.000000
... | ... | ... | ...
2018-07-12 17:10:00+02:00 | 379.0 | 223135.0 | 588.746702
2018-07-12 17:20:00+02:00 | 380.0 | 209786.0 | 552.068421
2018-07-12 17:30:00+02:00 | 363.0 | 207260.0 | 570.964187
2018-07-12 17:40:00+02:00 | 349.0 | 185570.0 | 531.719198
2018-07-12 17:50:00+02:00 | 336.0 | 164110.0 | 488.422619
Script parameters¶
[3]:
# parameters
input_file = "gs://lvt-schiphol-assignment-snakemake/data/model_input/delays_base_input.csv"
output_file = "rolling_delay_features__test.csv"
freq = "10T"
window = "30T+2H+1D"
Imports¶
[4]:
import pandas as pd
import numpy as np
import sys
sys.path.append("../")
from src.data.google_storage_io import read_csv_data, write_csv_data
Load data¶
[5]:
%%time
df = read_csv_data(input_file)
df.head()
Reading file from Google Storage
Bucket: lvt-schiphol-assignment-snakemake
File: data/model_input/delays_base_input.csv
Wall time: 2.44 s
[5]:
| id | aircraftRegistration | airlineCode | terminal | serviceType | scheduleDateTime | actualOffBlockTime | scheduleDelaySeconds | |
|---|---|---|---|---|---|---|---|---|
| 0 | 123414481790510775 | PHPXB | 148.0 | NaN | NaN | 2018-01-01 03:30:00+01:00 | 2018-01-01 03:22:00+01:00 | -480.0 |
| 1 | 123414479288269149 | PHHSJ | 164.0 | 1.0 | J | 2018-01-01 06:00:00+01:00 | 2018-01-01 05:58:22+01:00 | -98.0 |
| 2 | 123414479666542945 | PHHSG | 100.0 | 1.0 | J | 2018-01-01 06:05:00+01:00 | 2018-01-01 06:00:00+01:00 | -300.0 |
| 3 | 123414479288365061 | PHHSG | 164.0 | 1.0 | J | 2018-01-01 06:05:00+01:00 | 2018-01-01 06:00:00+01:00 | -300.0 |
| 4 | 123414479288274329 | PHHXB | 164.0 | 1.0 | J | 2018-01-01 06:15:00+01:00 | 2018-01-01 06:26:34+01:00 | 694.0 |
Rolling functions¶
After more development these should go into common modules to be reused.
[6]:
def bin_delays_by_schedule(df, freq='D'):
# datetime floored to 10-minute interval
df["timeStamp"] = pd.to_datetime(df["scheduleDateTime"], utc=True).dt.tz_convert("Europe/Amsterdam").dt.floor(freq)
# groupby each floored datetime and keep n values + total delay per bin
df_floor = df \
.groupby("timeStamp")["scheduleDelaySeconds"] \
.apply(lambda x: pd.DataFrame(dict(n=len(x), sum_delays=x.sum()), index=[x.name])) \
.reset_index(level=0) \
.set_index("timeStamp")
return df_floor
def get_rolling_mean_delay(df_floor, freq='D', window='2D'):
# Not every 10 minute interval has flights, in those cases we fill in timestamps
complete_date_range = pd.date_range(df_floor.index.min(), df_floor.index.max(),
freq=freq)
# list of dates to fill because they are not in df_floor yet
tofill_date_range = complete_date_range[~complete_date_range.isin(df_floor.index)]
# concatenate df_floor with missing timestamps using n=0, sum_delays=0
df_floor = pd.concat([
df_floor,
pd.DataFrame(dict(n=0,
sum_delays=0,
timeStamp=tofill_date_range
)).set_index("timeStamp")
]).sort_values("timeStamp")
# calculate mean from sum and total n. If n=0, we get a NaN mean and fill with 0
df_rolling_mean = df_floor.rolling(window).sum().assign(
mean_delay = lambda x: (x["sum_delays"] / x["n"].replace(0, 1)))
return df_rolling_mean
Note on DATA LEAKAGE¶
There may be data leakage introduced in this features in the current implementation(!)
By ways of example, assume window=2hours and freq=10minutes.
- The time is now 18:30 and we want to predict delays for 2hours from now at 20:30
- We want to calculate the average delay of the last 2 hours
- A flight was scheduled at 18:00 and has still not left yet at 18:30
- In our data we know that this flight at 18:00 will have a delay of say 1 hour
- At the time of prediction, the only information we have is that the flight is at least 30minutes delayed
When calculating the average delay in our current implementation, we are leaking knowledge by using the fact that the flight at 18:00 had a 1-hour delay, even though this is not known at the time of prediction at 18:30(!)
Alternatively we can round delays of flights that have not yet left at the time of calculating mean-delay over the last {window}. In real-time we can then calculate the same values and therefore not introduce data leakage.
With more time permitted you could make a prediction of delay of flights that have not yet left, based on all Other features. Then use that prediction to fill in our rolling-window timeseries.
Calculate rolling features¶
First implementation took forever to determine rolling windows.
Implemented this step in PySpark but did not want to force a pyspark dependency unless we need to.
Then refactored in Pandas with sufficient speed. Current implementation does the trick.
[7]:
%%time
df_floor = bin_delays_by_schedule(df, freq=freq)
df_floor.head()
Wall time: 25 s
[7]:
| n | sum_delays | |
|---|---|---|
| timeStamp | ||
| 2018-01-01 03:30:00+01:00 | 1 | -480.0 |
| 2018-01-01 06:00:00+01:00 | 3 | -698.0 |
| 2018-01-01 06:10:00+01:00 | 1 | 694.0 |
| 2018-01-01 06:20:00+01:00 | 5 | 2193.0 |
| 2018-01-01 06:30:00+01:00 | 5 | 2121.0 |
[8]:
%%time
if isinstance(window, list):
window_list = window
elif '+' in window:
window_list = window.split('+')
else:
window_list = [window]
# add rolling features for all window sizes
df_rolling_features = pd.concat([get_rolling_mean_delay(df_floor, freq=freq, window=window).add_suffix(f"__{window}")
for window in window_list], axis=1) \
.reset_index()
df_rolling_features
Wall time: 47 ms
[8]:
| timeStamp | n__30T | sum_delays__30T | mean_delay__30T | n__2H | sum_delays__2H | mean_delay__2H | n__1D | sum_delays__1D | mean_delay__1D | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 2018-01-01 03:30:00+01:00 | 1.0 | -480.0 | -480.000000 | 1.0 | -480.0 | -480.000000 | 1.0 | -480.0 | -480.000000 |
| 1 | 2018-01-01 03:40:00+01:00 | 1.0 | -480.0 | -480.000000 | 1.0 | -480.0 | -480.000000 | 1.0 | -480.0 | -480.000000 |
| 2 | 2018-01-01 03:50:00+01:00 | 1.0 | -480.0 | -480.000000 | 1.0 | -480.0 | -480.000000 | 1.0 | -480.0 | -480.000000 |
| 3 | 2018-01-01 04:00:00+01:00 | 0.0 | 0.0 | 0.000000 | 1.0 | -480.0 | -480.000000 | 1.0 | -480.0 | -480.000000 |
| 4 | 2018-01-01 04:10:00+01:00 | 0.0 | 0.0 | 0.000000 | 1.0 | -480.0 | -480.000000 | 1.0 | -480.0 | -480.000000 |
| ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
| 27724 | 2018-07-12 17:10:00+02:00 | 102.0 | 51057.0 | 500.558824 | 379.0 | 223135.0 | 588.746702 | 2715.0 | 2022369.0 | 744.887293 |
| 27725 | 2018-07-12 17:20:00+02:00 | 71.0 | 15875.0 | 223.591549 | 380.0 | 209786.0 | 552.068421 | 2715.0 | 2025047.0 | 745.873665 |
| 27726 | 2018-07-12 17:30:00+02:00 | 42.0 | 9169.0 | 218.309524 | 363.0 | 207260.0 | 570.964187 | 2692.0 | 2015030.0 | 748.525260 |
| 27727 | 2018-07-12 17:40:00+02:00 | 24.0 | 2197.0 | 91.541667 | 349.0 | 185570.0 | 531.719198 | 2685.0 | 1998027.0 | 744.144134 |
| 27728 | 2018-07-12 17:50:00+02:00 | 2.0 | -10625.0 | -5312.500000 | 336.0 | 164110.0 | 488.422619 | 2669.0 | 1986413.0 | 744.253653 |
27729 rows × 10 columns
Check assumption that we have no NaN values in our data¶
[9]:
assert df["scheduleDelaySeconds"].shape == df["scheduleDelaySeconds"].dropna().shape
df["scheduleDelaySeconds"].shape, df["scheduleDelaySeconds"].dropna().shape
[9]:
((477776,), (477776,))
[10]:
assert df_floor["sum_delays"].shape == df_floor["sum_delays"].dropna().shape
df_floor["sum_delays"].shape, df_floor["sum_delays"].dropna().shape
[10]:
((20532,), (20532,))
[11]:
# check separately for each
assert df_rolling_features.shape == df_rolling_features.dropna().shape
df_rolling_features.shape, df_rolling_features.dropna().shape
[11]:
((27729, 10), (27729, 10))
Show output features¶
[12]:
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
def px_facet_variable(df, x, y_cols, title=None, width=1200, height=800, log_transform=False):
# long format with variable values in 'y' and name in 'variable'
df_plot = pd.concat([
df[[x, y_col]].assign(variable=y_col).rename(columns={y_col: "y"})
for y_col in y_cols
]).sort_values(x)
if log_transform:
df_plot["y"] = np.log(df_plot["y"] + 1)
# facet plot of each variable in a row
fig = px.line(df_plot[[x, "y", "variable"]], x=x, y="y", facet_row="variable",
title=f"{title}{' [Log-transformed]' if log_transform else None}",
width=width, height=height)
# Add range slider assuming 'x' is a date or datetime index
fig.update_layout(
xaxis=dict(
rangeslider=dict(
visible=True
),
type="date"
),
hovermode="x"
)
# independent y-axes
fig.update_yaxes(matches=None)
fig.for_each_annotation(lambda a: a.update(text=a.text.split("=")[-1]))
return fig
# plotly figure of rolling features
flatten = lambda l: [item for sublist in l for item in sublist]
n_mean_columns = flatten([[f"n__{w}", f"mean_delay__{w}"] for w in window_list])
px_facet_variable(df_rolling_features, "timeStamp", n_mean_columns,
title="Rolling window features (window={w})")
Write output to CSV¶
Local or Google Storage is both handled
[13]:
# # write output file
write_csv_data(df_rolling_features, output_file, index=False)
Writing file to local directory
File: rolling_delay_features__test.csv
[14]:
df_rolling_features.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 27729 entries, 0 to 27728
Data columns (total 10 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 timeStamp 27729 non-null datetime64[ns, Europe/Amsterdam]
1 n__30T 27729 non-null float64
2 sum_delays__30T 27729 non-null float64
3 mean_delay__30T 27729 non-null float64
4 n__2H 27729 non-null float64
5 sum_delays__2H 27729 non-null float64
6 mean_delay__2H 27729 non-null float64
7 n__1D 27729 non-null float64
8 sum_delays__1D 27729 non-null float64
9 mean_delay__1D 27729 non-null float64
dtypes: datetime64[ns, Europe/Amsterdam](1), float64(9)
memory usage: 2.1 MB