End-2-End Datascience

From interactive programming to production ready code

Track missing sensor data

Building the algorithm interactively

Load sample data in CSV format

In [2]:
df = pandas.read_csv("inputdata/fleetextract.csv", 
                     parse_dates=['time','time_local'], 
                     infer_datetime_format=True)
df.head(3)
Out[2]:
country_id site_id asset_id metric_id time time_local value hive_insert_ts country site year month
0 C005 S009 T004 dVelGenFiltplc_Avg 2017-01-31 18:30:00 2017-01-31 19:30:00 1136.900 2017-02-13 14:58:43.418 C005 S009 2017 1
1 C005 S009 T005 dTempAniGen_Avg 2017-01-31 01:10:00 2017-01-31 02:10:00 63.420 2017-02-13 14:58:43.418 C005 S009 2017 1
2 C005 S009 T005 dTempAniGen_Avg 2017-01-31 01:40:00 2017-01-31 02:40:00 63.095 2017-02-13 14:58:43.418 C005 S009 2017 1

Select columns and clean

In [3]:
selected = df[['country_id', 'site_id', 'asset_id', 'metric_id', 'time']]
deduplicated = selected.drop_duplicates()

Filter dataframe for the relevant timeframe

In [4]:
'''First week of January 2017'''
lower = date(2017,1,2)
upper = date(2017,1,8)
upper_bound = deduplicated[deduplicated['time'] >= pandas.Timestamp(lower)]
relevant = upper_bound[upper_bound['time'] < pandas.Timestamp(upper)]

Add more coarse grained daily timerange

In [5]:
relevant['day'] = relevant['time'].apply(lambda d: d.date())
daily = relevant[['country_id', 'site_id', 'asset_id', 'metric_id', 'day']]
daily.head()
Out[5]:
country_id site_id asset_id metric_id day
19008 C005 S009 T001 dAmbTemp_Avg 2017-01-04
19027 C005 S009 T001 dAnguloPitch_Avg 2017-01-06
19028 C005 S009 T001 dAnguloPitch_Avg 2017-01-06
19029 C005 S009 T001 dAnguloPitch_Avg 2017-01-06
19030 C005 S009 T001 dAnguloPitch_Avg 2017-01-07

Group and count entries per sensor per device per day

In [6]:
daily['count'] = -1
grouped = daily.groupby(['country_id', 'site_id', 'asset_id', 'metric_id', 'day'])
counted = grouped.count().reset_index()
counted['percentage'] = counted['count'].apply(lambda c: float(c)/144.0*100.0)

Filter for specific device and draw heatmap

In [7]:
filtered_for_country = counted[counted['country_id'] == 'C005']
filtered_for_location = filtered_for_country[filtered_for_country['site_id'] == 'S009']
filtered_for_device = filtered_for_location[filtered_for_location['asset_id'] == 'T006']
In [8]:
heatmap = pandas.pivot_table(filtered_for_device, 
                             values='percentage', 
                             index='metric_id', 
                             columns='day')
sns.set_context("notebook", font_scale=1.5)
plt.title("device %s-%s-%s" % ("C005", "S009", "T006"))
plt.show(sns.heatmap(heatmap, vmin=0, vmax=100))

Build a scaling algorithm with PySpark

Load sample data in CSV format

In [10]:
input_file = "inputdata/fleetextract.csv"
output_file = "report_missing_datapoints.csv"

df = hsc.read.format("com.databricks.spark.csv")\
                         .option("header", "true")\
                         .option("inferschema", "true")\
                         .option("mode", "DROPMALFORMED")\
                         .load(input_file)
df.toPandas().head(3)
Out[10]:
country_id site_id asset_id metric_id time time_local value hive_insert_ts country site year month
0 C005 S009 T004 dVelGenFiltplc_Avg 2017-01-31 18:30:00 2017-01-31 19:30:00 1136.900 2017-02-13 14:58:43.418 C005 S009 2017 1
1 C005 S009 T005 dTempAniGen_Avg 2017-01-31 01:10:00 2017-01-31 02:10:00 63.420 2017-02-13 14:58:43.418 C005 S009 2017 1
2 C005 S009 T005 dTempAniGen_Avg 2017-01-31 01:40:00 2017-01-31 02:40:00 63.095 2017-02-13 14:58:43.418 C005 S009 2017 1

Select columns and clean

In [11]:
selected = df[['country_id', 'site_id', 'asset_id', 'metric_id', 'time']]
sort_criteria = ['country_id', 'site_id', 'asset_id', 'metric_id', 'time']
deduplicated = selected.sort(sort_criteria)\
                       .dropDuplicates(subset=sort_criteria)

Filter dataframe for the relevant timeframe

In [12]:
lower = date(2017,1,2)
upper = date(2017,1,8)
df_upper = deduplicated[deduplicated['time'] >= lower]
relevant = df_upper[df_upper['time'] < upper]
relevant.toPandas().head(3)
Out[12]:
country_id site_id asset_id metric_id time
0 C005 S009 T001 dAmbTemp_Avg 2017-01-02 11:40:00
1 C005 S009 T001 dAmbTemp_Avg 2017-01-02 11:50:00
2 C005 S009 T001 dAmbTemp_Avg 2017-01-02 12:00:00

Add more coarse grained daily timerange

In [13]:
daily = relevant.withColumn('day',to_date(col('time')))
daily = daily[['country_id', 'site_id', 'asset_id', 'metric_id', 'day']]
daily.toPandas().head(3)
Out[13]:
country_id site_id asset_id metric_id day
0 C005 S009 T001 dAmbTemp_Avg 2017-01-02
1 C005 S009 T001 dAmbTemp_Avg 2017-01-02
2 C005 S009 T001 dAmbTemp_Avg 2017-01-02

Group and count entries per sensor per device per day

In [14]:
grouped = daily.groupby(['country_id', 'site_id', 'asset_id', 'metric_id', 'day'])
counted = grouped.count()
counted = counted.withColumn('percentage',col('count')/144.0*100.0)
report_missing = counted[counted['count'] < 144]
report_missing.coalesce(1).toPandas().head(3)
Out[14]:
country_id site_id asset_id metric_id day count percentage
0 C005 S009 T003 dDirveleta_Avg 2017-01-07 96 66.666667
1 C005 S009 T006 dTempGen_Avg 2017-01-07 143 99.305556
2 C005 S009 T001 dPotTotal_Avg 2017-01-02 132 91.666667

Build a production ready pipeline with Luigi

A few things to remember ...

Monitoring

Collect metrics
Collect and manage logs
Configure alerts

Application

Keep consistency in all situations
Keep track on results

DevOps

Deployability
Memory settings
Startup-Scripts
Scheduling

A bit about Luigi

Luigi helps to stitch long running tasks together into pipelines
It contains a wide toolbox of task templates (e.g. Hive, Pig, Spark, Python)

How to compose workflows?

A workflow consists of Tasks, Targets and Parameters
Targets correspond to a file or a database entry or some other kind of checkpoint
Tasks consume Targets of other tasks, run a computation, then output a target
Parameters take care of task parameterization

The pipeline for our report

Start by checking the file

In [17]:
class CheckExistingFile(luigi.ExternalTask):
    # Parameters
    input_file = Parameter()

    def output(self):
        return LocalTarget(self.input_file)

Run the calculation

In [18]:
class RunCompletessCalculation(SparkSubmitTask):
    # Parameters
    input_file = Parameter()
    output_file = Parameter()
    metrics_file= Parameter()
    # Execute Spark Job that takes a local file and outputs a csv
    app = 'sparkjobs/completeness_calculation.py'
    # PySpark Parameters
    driver_memory = '1g' # <-- This could also be a configured
    executor_memory = '1g' # <-- This could also be a configured
    executor_cores = '2' # <-- This could also be a configured
    num_executors = '4' # <-- This could also be a configured
    master = 'local[*]' # <-- This could also be a configured

    def requires(self):
        return CheckExistingFile(self.input_file)

    def output(self):
        return LocalTarget(self.output_file)

    def app_options(self):
        return [self.input().path, self.output().path]
    
    def run(self):
        start = time.time()
        SparkSubmitTask.run(self)
        end = time.time()
        time.sleep(25)
        diff = round(end - start, 2)
        with open(self.metrics_file, 'w') as f:
            f.write("Execution time: %d" % (diff))

Archive the raw file

In [19]:
class ArchiveRawFile(luigi.ExternalTask):
    # Parameters
    archive = Parameter()
    input_file = Parameter()
    output_file = Parameter()
    metrics_file= Parameter()

    def requires(self):
        return RunCompletessCalculation(self.input_file, 
                                        self.output_file, 
                                        self.metrics_file)

    def output(self):
        import os
        filename = os.path.basename(self.input_file)
        return LocalTarget(os.path.join(self.archive, filename))

    def run(self):
        import os
        os.rename(self.input_file, self.output().path)

More features

Notifications for successful/failed tasks
Live view of running task using luigid
Locking running tasks using luigid
Experimental task history

Thank you!

Github: https://github.com/crazzle
Twitter: https://twitter.com/kein_mark