[13]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
The autoreload extension is already loaded. To reload it, use:
%reload_ext autoreload
Create minimal model input¶
- Takes the flights data
- Processes the schedule/realized datetimes and computes the delay in seconds
- Remove observations with unknown prediction targets
- Write prediction target with minimal feature set to CSV
Parameters¶
- input_file: Filepath of flights data in format received from Schiphol
- output_file: Filepath to write output csv file with minimal modelling input
Returns¶
Output CSV file written to output_file with minimal model input
Example output,
id | aircraftRegistration | airlineCode | terminal | serviceType | scheduleDateTime | actualOffBlockTime | scheduleDelaySeconds
124257473326719795 | PHEXI | 80.0 | 2.0 | J | 2018-05-01 16:35:00+02:00 | 2018-05-01 16:58:16+02:00 | 1396.0
124538476600837715 | PHEXL | 2481.0 | 1.0 | J | 2018-06-10 13:00:00+02:00 | 2018-06-10 13:11:25+02:00 | 685.0
123512829091050355 | PHBQO | 100.0 | 2.0 | J | 2018-01-15 10:15:00+01:00 | 2018-01-15 10:35:10+01:00 | 1210.0
123786805997701057 | PHEXG | 2481.0 | 1.0 | J | 2018-02-23 17:45:00+01:00 | 2018-02-23 17:55:52+01:00 | 652.0
124664922607744671 | PHBXP | 1551.0 | 2.0 | J | 2018-06-28 20:50:00+02:00 | 2018-06-28 22:09:23+02:00 | 4763.0
File parameters¶
[14]:
# input parameters cell
input_file = "../lvt-schiphol-assignment-snakemake/data/raw/flights.csv"
output_file = "processed_flights.csv"
Imports¶
[15]:
import pandas as pd
import numpy as np
import sys
sys.path.append("../")
from src.data.google_storage_io import read_csv_data, write_csv_data
Utility functions¶
[16]:
def missing_values_percentages(df):
"""Calculate summary of missing values per column"""
percent_missing = df.isnull().sum() * 100 / len(df)
missing_value_df = pd.DataFrame({'column_name': df.columns,
'percent_missing': percent_missing})
missing_value_df = missing_value_df.sort_values('percent_missing', ascending=False)
return missing_value_df
def check_col_singular(x):
"""check if pd.Series contains more than 1 unique value excluding NaN"""
return x.dropna().nunique() <= 1
def drop_singular_columns(df, verbose=False):
"""Drop DataFrame columns with 1 or fewer unique values excluding NaN"""
col_singular = df.apply(check_col_singular, axis=0)
if verbose:
n_singular = sum(col_singular)
print(f"Dropping {n_singular} columns")
print(f"{col_singular[col_singular].index}")
df_output = df[[col for col, is_singular in col_singular.items()
if not is_singular]]
return df_output
def clean_flights(df_flights, verbose=True):
"""Clean flights data by removing singular columns and formatting dates"""
df = df_flights
df = df.dropna(subset=["scheduleDate", "scheduleTime", "actualOffBlockTime"]).reset_index(drop=True)
# remove singular columns
df = drop_singular_columns(df, verbose=verbose)
# format datetime fields
df["actualOffBlockTime"] = pd.to_datetime(df["actualOffBlockTime"], utc=True).dt.tz_convert('Europe/Amsterdam')
series_datetime_str = df['scheduleDate'].astype(str) + " " + df['scheduleTime'].astype(str)
df["scheduleDateTime"] = pd.to_datetime(series_datetime_str, format="%Y%m%d %H:%M:%S").dt.tz_localize('Europe/Amsterdam')
# calculate delay as difference between scheduled and realized departure
df["scheduleDelaySeconds"] = pd.to_timedelta(df["actualOffBlockTime"] - df["scheduleDateTime"]).dt.total_seconds()
return df
def read_flights_data(filename):
"""Read data local or from Google Storage bucket and clean it"""
df = read_csv_data(input_file)
print(f"Loaded data from: {input_file}\n"
f"Shape of data: {df.shape}")
df = clean_flights(df)
print(f"Cleaned flights data\n"
f"Shape of data: {df.shape}")
return df
Read data¶
[17]:
%%time
df = read_csv_data(input_file)
df.head()
Reading file from local directory
File: ../lvt-schiphol-assignment-snakemake/data/raw/flights.csv
Wall time: 1.93 s
[17]:
| actualOffBlockTime | aircraftRegistration | aircraftType.iatamain | aircraftType.iatasub | airlineCode | baggageClaim | estimatedLandingTime | expectedTimeBoarding | expectedTimeGateClosing | expectedTimeGateOpen | ... | prefixICAO | publicEstimatedOffBlockTime | publicFlightState.flightStates | route.destinations | scheduleDate | scheduleTime | serviceType | terminal | transferPositions | transferPositions.transferPositions | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | NaN | NaN | NaN | NaN | 148.0 | NaN | NaN | NaN | NaN | NaN | ... | ZXP | NaN | ['SCH'] | ['AMS'] | 2018-01-01 | 03:02:07 | P | NaN | NaN | NaN |
| 1 | NaN | PHPXY | AW1 | NaN | 148.0 | NaN | NaN | NaN | NaN | NaN | ... | ZXP | NaN | ['SCH'] | ['AMS'] | 2018-01-01 | 03:16:00 | NaN | NaN | NaN | NaN |
| 2 | NaN | NaN | AW1 | NaN | 148.0 | NaN | NaN | NaN | NaN | NaN | ... | ZXP | NaN | ['SCH'] | ['AMS'] | 2018-01-01 | 03:16:29 | P | NaN | NaN | NaN |
| 3 | 2018-01-01T03:22:00.000+01:00 | PHPXB | NaN | NaN | 148.0 | NaN | NaN | NaN | NaN | NaN | ... | ZXP | NaN | ['DEP'] | ['AMS'] | 2018-01-01 | 03:30:00 | NaN | NaN | NaN | NaN |
| 4 | 2018-01-01T05:58:22.000+01:00 | PHHSJ | 73H | 73H | 164.0 | NaN | NaN | NaN | NaN | NaN | ... | TRA | NaN | ['DEP'] | ['SPC'] | 2018-01-01 | 06:00:00 | J | 1.0 | NaN | NaN |
5 rows × 28 columns
[18]:
%%time
df = clean_flights(df)
df.head()
Dropping 5 columns
Index(['baggageClaim', 'estimatedLandingTime', 'expectedTimeOnBelt',
'flightDirection', 'transferPositions'],
dtype='object')
Wall time: 4.92 s
[18]:
| actualOffBlockTime | aircraftRegistration | aircraftType.iatamain | aircraftType.iatasub | airlineCode | expectedTimeBoarding | expectedTimeGateClosing | expectedTimeGateOpen | flightName | flightNumber | ... | publicEstimatedOffBlockTime | publicFlightState.flightStates | route.destinations | scheduleDate | scheduleTime | serviceType | terminal | transferPositions.transferPositions | scheduleDateTime | scheduleDelaySeconds | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 2018-01-01 03:22:00+01:00 | PHPXB | NaN | NaN | 148.0 | NaN | NaN | NaN | ZXP022 | 22.0 | ... | NaN | ['DEP'] | ['AMS'] | 2018-01-01 | 03:30:00 | NaN | NaN | NaN | 2018-01-01 03:30:00+01:00 | -480.0 |
| 1 | 2018-01-01 05:58:22+01:00 | PHHSJ | 73H | 73H | 164.0 | NaN | NaN | NaN | HV5641 | 5641.0 | ... | NaN | ['DEP'] | ['SPC'] | 2018-01-01 | 06:00:00 | J | 1.0 | NaN | 2018-01-01 06:00:00+01:00 | -98.0 |
| 2 | 2018-01-01 06:00:00+01:00 | PHHSG | 73H | 73H | 100.0 | NaN | NaN | NaN | KL2533 | 2533.0 | ... | NaN | ['DEP'] | ['LPA'] | 2018-01-01 | 06:05:00 | J | 1.0 | NaN | 2018-01-01 06:05:00+01:00 | -300.0 |
| 3 | 2018-01-01 06:00:00+01:00 | PHHSG | 73H | 73H | 164.0 | NaN | NaN | NaN | HV6455 | 6455.0 | ... | NaN | ['DEP'] | ['LPA'] | 2018-01-01 | 06:05:00 | J | 1.0 | NaN | 2018-01-01 06:05:00+01:00 | -300.0 |
| 4 | 2018-01-01 06:26:34+01:00 | PHHXB | 73H | 73H | 164.0 | NaN | NaN | NaN | HV5801 | 5801.0 | ... | NaN | ['DEP'] | ['TLV'] | 2018-01-01 | 06:15:00 | J | 1.0 | NaN | 2018-01-01 06:15:00+01:00 | 694.0 |
5 rows × 25 columns
Check for duplicates by id¶
Based on earlier findings we know there are duplicate values in the id column. We assume the id to be unique so that it can be used for indexing and merging.
Downstream it is vital that the id is unique and we can safely drop duplicate entries from the data.
[19]:
def duplicates_by_id(df):
df = df[df["id"].isin(df["id"][df[["id"]].duplicated()].unique())]
return df
def test_duplicates_by_id(df, verbose=True):
df_duplicates_by_id = duplicates_by_id(df)
nrows_duplicates_by_id = df_duplicates_by_id.shape[0]
nrows_drop_duplicates_by_id = df_duplicates_by_id.drop_duplicates("id").shape[0]
nrows_drop_duplicates_all = df_duplicates_by_id.drop_duplicates().shape[0]
diff_duplicates_by_id = nrows_drop_duplicates_by_id - nrows_drop_duplicates_all
if diff_duplicates_by_id == 0:
only_full_duplicates = True
if verbose:
print(f"""
The number of rows with only those ID's that are duplicated, including
all their occurences is {nrows_duplicates_by_id}.
Rows without duplicates only based on the `id`: {nrows_drop_duplicates_by_id}
Rows without duplicates based on all columns: {nrows_drop_duplicates_by_id}
If they are equal then all rows that are duplicated by `id` have no differences
in other columns and are exact duplicates.
Result: {only_full_duplicates}
""")
return only_full_duplicates
def test_duplicates_by_id_smarter(df):
"""Test if dropping all duplicates is equivalent to dropping by `id`"""
nrows_no_dupes = df.drop_duplicates().shape[0]
nrows_no_dupes_by_id = df.drop_duplicates("id").shape[0]
return nrows_no_dupes == nrows_no_dupes_by_id
# test if we have unique ids even though there are duplicates in the dataframe
ids_ok = test_duplicates_by_id(df, verbose=True)
ids_ok_smarter = test_duplicates_by_id_smarter(df)
# asserts
assert ids_ok
assert ids_ok_smarter
# drop duplicates if test passed
df = df.drop_duplicates("id")
The number of rows with only those ID's that are duplicated, including
all their occurences is 19847.
Rows without duplicates only based on the `id`: 9909
Rows without duplicates based on all columns: 9909
If they are equal then all rows that are duplicated by `id` have no differences
in other columns and are exact duplicates.
Result: True
Filter out data from 2017¶
Flight data from late 2017 has some outliers. Since it is at the very start of the data we don’t take as much consideration to simply remove the first couple of observations before 2018-01-01.
[20]:
shape_b4 = df.shape
filter_date = '2018-01-01 00:00:00+01:00'
df = df.query(f"scheduleDateTime >= '{filter_date}'")
print(f"Removed {shape_b4[0] - df.shape[0]} rows from before {filter_date}")
Removed 2 rows from before 2018-01-01 00:00:00+01:00
Output prediction target¶
[21]:
df.columns
[21]:
Index(['actualOffBlockTime', 'aircraftRegistration', 'aircraftType.iatamain',
'aircraftType.iatasub', 'airlineCode', 'expectedTimeBoarding',
'expectedTimeGateClosing', 'expectedTimeGateOpen', 'flightName',
'flightNumber', 'gate', 'id', 'mainFlight', 'prefixIATA', 'prefixICAO',
'publicEstimatedOffBlockTime', 'publicFlightState.flightStates',
'route.destinations', 'scheduleDate', 'scheduleTime', 'serviceType',
'terminal', 'transferPositions.transferPositions', 'scheduleDateTime',
'scheduleDelaySeconds'],
dtype='object')
[22]:
# meta columns for utility for columns we will often merge by
output_columns = ["id", "aircraftRegistration", "airlineCode", "terminal",
"serviceType", "scheduleDateTime", "actualOffBlockTime", "scheduleDelaySeconds"]
# DataFrame with id + merging columns + prediction target
df_target = df[output_columns]
df_target.head()
[22]:
| id | aircraftRegistration | airlineCode | terminal | serviceType | scheduleDateTime | actualOffBlockTime | scheduleDelaySeconds | |
|---|---|---|---|---|---|---|---|---|
| 0 | 123414481790510775 | PHPXB | 148.0 | NaN | NaN | 2018-01-01 03:30:00+01:00 | 2018-01-01 03:22:00+01:00 | -480.0 |
| 1 | 123414479288269149 | PHHSJ | 164.0 | 1.0 | J | 2018-01-01 06:00:00+01:00 | 2018-01-01 05:58:22+01:00 | -98.0 |
| 2 | 123414479666542945 | PHHSG | 100.0 | 1.0 | J | 2018-01-01 06:05:00+01:00 | 2018-01-01 06:00:00+01:00 | -300.0 |
| 3 | 123414479288365061 | PHHSG | 164.0 | 1.0 | J | 2018-01-01 06:05:00+01:00 | 2018-01-01 06:00:00+01:00 | -300.0 |
| 4 | 123414479288274329 | PHHXB | 164.0 | 1.0 | J | 2018-01-01 06:15:00+01:00 | 2018-01-01 06:26:34+01:00 | 694.0 |
Write output to CSV¶
Local or Google Storage is both handled
[23]:
# write output file
write_csv_data(df_target, output_file, index=False)
Writing file to local directory
File: processed_flights.csv
[24]:
df_target.info()
<class 'pandas.core.frame.DataFrame'>
Int64Index: 477776 entries, 0 to 487714
Data columns (total 8 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 id 477776 non-null int64
1 aircraftRegistration 477773 non-null object
2 airlineCode 476594 non-null float64
3 terminal 467868 non-null float64
4 serviceType 473110 non-null object
5 scheduleDateTime 477776 non-null datetime64[ns, Europe/Amsterdam]
6 actualOffBlockTime 477776 non-null datetime64[ns, Europe/Amsterdam]
7 scheduleDelaySeconds 477776 non-null float64
dtypes: datetime64[ns, Europe/Amsterdam](2), float64(3), int64(1), object(2)
memory usage: 32.8+ MB