%%html
<a href="javascript:code_showhide_toggle()">Show/Hide Code</a>
This notebook will illustrate use of Jupyter with PySpark to
# CONFIGURATION VARIABLES
city="Baltimore"
cityurl="https://data.baltimorecity.gov/api/views/wsfq-mvij/rows.csv?accessType=DOWNLOAD"
citynotebook="Baltimore-notebook" # citynotebook is used to automatically publish the notebook at the end
# location for raw (unprocessed) dataset in S3
scratch_bucket=os.environ["S3_SCRATCH_BUCKET"]
datasets3 = "s3://{0}/crimedata/raw/{1}.csv".format(scratch_bucket, city)
# locaton for harmonized (processed) dataset in S3. Structure is:
# outputroot
# |--- data - incidents - multiple CSV files
# |--- dictionary - incidents - CSV file containing data dictionary
# |--- doc - copy of notebook (published for web access)
outputroot = "s3://{0}/crimedata/harmonized/{1}".format(scratch_bucket,city)
outputpath_data = "{}/data".format(outputroot)
outputpath_dictionary="{}/dictionary".format(outputroot)
outputpath_doc="{}/docs".format(outputroot)
notebook_urlbase = "https://s3.amazonaws.com/{0}/{1}".format(outputpath_doc.replace("s3://",""), citynotebook)
# elasticsearch cluster endpoint
# - use local proxy service (installed via an EMR bootstrap) which signs ES API calls using EMR EC2 role.
esendpoint="localhost"
esport=9200
# Summary of configuration
print("City: {}".format(city))
print("Notebook Name: {}".format(citynotebook))
print("Dataset URL: {}".format(cityurl))
print("S3 dataset input: {}".format(datasets3))
print("Harmonized output: {}".format(outputroot))
print("ElasticSearch Cluster: {}.{}".format(esendpoint, esport))
print("Setup & Initialization done")
# INITIALIZATION - instantiate objects used by notebook
%matplotlib inline
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import *
from IPython.display import display, HTML
import datetime
import subprocess
# elasticsearch and harmonization objects defined in ./lib/esindex.py and ./lib/harmonizeCrimeIncidents.py
from lib.esindex import esindex
from lib.harmonizeCrimeIncidents import harmonizeCrimeIncidents
sc = SparkContext.getOrCreate()
hc = HiveContext(sc)
es = esindex(esendpoint)
hz = harmonizeCrimeIncidents(hc)
print("Initialization complete.")
# Download & install dataset from website
def copy_from_website_to_s3(city, cityurl, datasets3):
tmpfile="/tmp/{0}.csv".format(city)
print("Downloading {0} from: {1}".format(tmpfile, cityurl))
subprocess.check_output("curl {0} -o {1}".format(cityurl, tmpfile).split())
print("Copying {0} to: {1}".format(tmpfile, datasets3))
subprocess.check_output("aws s3 cp {0} {1} --sse AES256".format(tmpfile, datasets3).split())
os.remove(tmpfile)
# uncomment if you want to re-download the dataset
copy_from_website_to_s3(city, cityurl, datasets3)
# Read input datasets as DataFrames
# No need to infer schema - all variables initially typed as strings
print("Loading dataset from S3")
df = hc.read.load( datasets3,
format='com.databricks.spark.csv',
header='true',
inferSchema='false',
delimiter=',')
df_in=df # keep copy of raw data for reference
print("Dataset {0}: Loaded {1} rows.".format(city, df.count()))
Edit as desired to interactively explore the data
With the data source file loaded into a Spark Data Frame, you can get some details on the content of the data using methods from Spark, Pandas, and others. This is purely exploratory information to give users insights into the raw data set.
# examine column headers
str(df.columns)
# look at first 2 rows
df.head(2)
# look at the distinct values for 'Description'
df.select('Description').distinct().show(truncate=False)
# Graph incident count by Description
descr = df.select('Description').toPandas()
descrGrp = descr.groupby('Description').size().rename('counts')
descrPlot = descrGrp.plot(kind='bar')
Standard harmonised variables for crime incident datasets are defined in ./lib/harmonizeCrimeIncidents.py (You can open this module in JupyterNB, and modify the variable list and associated standard variable metadata as required)
Harmonization is the process of mapping the raw variables of each incoming dataset to use the standard 'harmonized' variables and associated units of measurement, as much as possible. Harmonized datasets support cross-dataset search as well as the ability to combine/union datasets to perform multi dataset analysis and research.
See examples below for how to generate new variables from existing variables, and how to manipulate variable values. The hz (harmonizeCrimeIncidents) class provides methods to help abstract the code for some common actions.
A core challenge when combining loosely coupled in a combined search index is dealing with different names for the same attribute. For example "Sex" versus "Gender" or "48in" versus "4ft". We have a pre-defined set of standard variable names and types that we are using for our search page, the harmonization process ensures that attributes and values in the raw data files to match that predifined set to allow for a consistent search tool across multiple data sets.
# Use hz.mapVar(old, new, keepOrig=False) to create new variables from the original variables, by default dropping
# the original variable. Use 'keepOrig=True' argument to keep the original variable in the dataset.
# Metadata for the transformation will be captured and included in the data dictionary
df = hz.mapVar(df, "Description", "description_orig", keepOrig=True) # make a copy of original Description variable
df = hz.mapVar(df, "Location 1", "geolocation")
# Rename any variables that have illegal names
# no illegal characters or spaces (required to support parquet output format)
# all lowercase variable names (required to provide compatibility with Amazon Athena)
df = hz.makeValidVariableNames(df)
# Harmonize Description variable to standard values.
# map value to standard descriptions
descrMappings = {
"ARSON" : "Arson",
"AGG. ASSAULT" : "Assault",
"ASSAULT BY THREAT" : "Assault",
"COMMON ASSAULT" : "Assault",
"RAPE" : "Rape",
"BURGLARY" : "Burglary",
"ROBBERY - COMMERCIAL" : "Robbery",
"ROBBERY - RESIDENCE" : "Robbery",
"ROBBERY - STREET" : "Robbery",
"ROBBERY - CARJACKING" : "Robbery",
"LARCENY FROM AUTO" : "Theft",
"AUTO THEFT" : "Theft",
"LARCENY" : "Theft",
"HOMICIDE" : "Homicide",
"SHOOTING" : "Weapons"
}
df = hz.mapValues(df, "description", descrMappings)
# Add city variable. All rows are given the value of the current city name
df = df.withColumn('city', lit(city))
# Add TransformDescr metadata field
hz.addTransformDescr('city','"City" assigned by harmonization code')
print("Add 'city' variable")
# Format geolocation field from '(lat, lon)' to 'lat,long' format understaood by elasticsearch - i.e. remove parenthesis
df = df.withColumn('geolocation', regexp_replace(df['geolocation'], r'[\(\)\s]', ''))
hz.addTransformDescr('geolocation','geolocation format harmonised to "latitude, longitude"')
print("Harmonized geolocation variable")
# filter out records with empty or null coordinates
c1 = df.count()
df = df.where(length('geolocation') > 0).where(length('geolocation') > 0)
c2 = df.count()
print("Deleted {} rows with corrupted coordinates in LATITUDE and LONGITUDE".format(c1-c2))
# Generate standard datetime, date part, and time part fields from raw CrimeDate and CrimeTime fields
# for simplicity we will keep all times in localtime (ignoring timezones)
# Split CrimeDate into year, month, day (all defined as harmonized variables in the hz class)
# CrimeDate is formatted as day/month/year
dateregex = r'(\d+)/(\d+)/(\d+)'
df = df.withColumn('month', regexp_extract('CrimeDate', dateregex, 1))
hz.addTransformDescr('month','month, extracted from CrimeDate')
df = df.withColumn('day', regexp_extract('CrimeDate', dateregex, 2))
hz.addTransformDescr('day','day, extracted from CrimeDate')
df = df.withColumn('year', regexp_extract('CrimeDate', dateregex, 3))
hz.addTransformDescr('year','year, extracted from CrimeDate')
# Split CrimeTime into hour & minute (both defined as harmonized variables in the hz class)
# CrimeDate is formatted as either 18:51:00 or 1851 - both formats matched below
time1regex = r'(\d+):(\d+):(\d+)'
time2regex = r'(\d\d)(\d\d)'
df = df.withColumn('hour1', regexp_extract('CrimeTime', time1regex, 1)) # match 18:51:00 format, or ""
df = df.withColumn('hour2', regexp_extract('CrimeTime', time2regex, 1)) # match 1851 format, or ""
df = df.withColumn('hour', concat(df.hour1,df.hour2)) # combine above matches
df = df.drop('hour1').drop('hour2') # drop temporary columns
df = df.withColumn('hour', regexp_replace('hour', '24', '00'))
hz.addTransformDescr('hour','hour, extracted from CrimeTime')
df = df.withColumn('minute1', regexp_extract('CrimeTime', time1regex, 2)) # match 18:51:00 format, or ""
df = df.withColumn('minute2', regexp_extract('CrimeTime', time2regex, 2)) # match 1851 format, or ""
df = df.withColumn('minute', concat(df.minute1,df.minute2)) # combine above matches
df = df.drop('minute1').drop('minute2') # drop temporary columns
hz.addTransformDescr('minute','minute, extracted from CrimeTime')
# Create new datetime field in format YYYY-MM-DD hhmm (defined as a harmonized variable in the hz class)
df = df.withColumn('datetime',concat(concat_ws('-',df.year,df.month,df.day),lit(' '),concat_ws(':',df.hour,df.minute,lit('00'))).cast("timestamp"))
hz.addTransformDescr('datetime','Full timestamp with date and time, eg 2007-04-05 14:30')
# Drop the original CrimeDate and CrimeTime fields - no longer needed
df = df.drop('CrimeDate').drop('CrimeTime')
# Cast all the date time part fields from string to int (allows use of numeric range filters in the search UI)
for col in ['year','month','day','hour','minute']:
df = df.withColumn(col, df[col].cast("int"))
# Add dayOfWeek variable, eg Monday, Tuesday, etc. (defined as a harmonized variable in the hz class)
df = df.withColumn('dayofweek',date_format('datetime', 'EEEE'))
hz.addTransformDescr('dayofweek','day of Week, calculated from datetime')
print("Harmonized date & time variables")
df.select('datetime').show(2,truncate=False)
# Add dataset descriptive variables (defined in hz class).
# location of raw data
df = df.withColumn('rawdatapath', lit(datasets3))
hz.addTransformDescr('rawdatapath','Assigned by harmonization code')
# location of harmonized data
df = df.withColumn('harmonizeddatapath', lit(outputroot))
hz.addTransformDescr('harmonizeddatapath','Assigned by harmonization code')
# URL for this notoebook (notebook will be saved/published using cells at the end of the notebook)
df = df.withColumn('notebookhtml', lit(notebook_urlbase + ".html"))
hz.addTransformDescr('notebookhtml','Assigned by harmonization code')
print("Added dataset descriptive variables")
Create some additional variables for this specific data set.
Here we can assign a default 'vargroup' for all variables that are not part of the defined list of harmonized variables in the hz class, using the hz.addVarGroup() method. (The 'vargroup' is used by the search UI to group variables under 'accordian' panels in the search page side bar).
We can also assign custom metadata to selected unharmonized variables as desired. NOTE:
This process adds customized search parameters to this specific data set in the UI that aren't part of our standard set.
Not all individual variables within a single data set are always valuable on their own. Some need some additional logic or combination to make search and discovery a better experience. This process allows the harmonization routine to apply that logic and expose it within the UI
# vargroups are used to define the Search Filter UI 'accordians' and their order
# set default variable group - used if variable is not exicitly assigned to a group
defaultVarGroup = "{0} (Unharmonized)".format(city)
hz.addVarGroup(defaultVarGroup,order=90,default=True)
# Metadata for the harmonized variables is already defined in 'hz' class.
# We can add metadata here for additional (non harmonized) variables to control
# how they are presented in the data dictionary and search UI.
# Here, "Inside/Outside" is not defined as one of the standard harmonized variables, so let's define its metadata here
# Set 'type' to use an enumerated drowndown selector.
# (use the hz.get_unique_values() method to build a value list for the enum selector widget.)
values=",".join(hz.get_unique_values(df,'insideoutside'))
hz.addVarMetadata("insideoutside",
vargroup=defaultVarGroup,
type="enum,%s" % values,
descr="Incident occurred inside or outside",
uifilter=True)
print("Added metadata for variable <insideoutside>")
df.head(2)
Generate a dictionary table containing field distribution stats and metadata data from the mappings
All variables that a) don't match standard harmonized variables, and b) don't have added metadata will be assigned to the default vargroup, and the variable type will be derived from the data distribution characteristics, calculated by hz.buildDataDict().
The data dictionary is used to generate dynamic search widgets and tools based on the data sets themselves. By basing our search widgets on the data itself, rather than hard-coded, it allows the search UI to update based on available data.
df_dict = hz.buildDataDict(df)
print("Data Dictionary created.")
Use the hiveContext object 'hc' to create a new schema for this city, and save the data dafarame and dictionary dataframe as tables in this schema with the hz.saveAsParquetTable() method.
Data and the associated dictionary information is saved to the S3 output path as parquet files.
This allows the tables we've created or modified through harmonization to be easily restored, combined, and analysed using SQL.
# Drop and create schema
schema="incidents"
hc.sql("DROP SCHEMA IF EXISTS {0} CASCADE".format(schema))
hc.sql("CREATE SCHEMA {0} COMMENT 'Crime incident data for {0}'".format(schema))
# Create Incident Data as SparkSQL table with S3 backed storage
data_table=city.lower()
data_table_ddl=hz.saveAsParquetTable(df,schema,data_table,outputpath_data)
# Create Dictionary as SparkSQL table with S3 backed storage
dict_table = data_table+"_dict"
dict_table_ddl=hz.saveAsParquetTable(df_dict.coalesce(1),schema,dict_table,outputpath_dictionary)
print "Done creating tables"
The S3 parquet files containing the harmonizd data are registered as Amazon Athena external tables.
You can use Amazon Athena to perform detailed ad-hoc analysis of this and other harmonised datasets using the familiar power of SQL. Using Athena also allows you to easily integrate the dataset with Amazon Quicksight where you can create visual analyses and dashboards.
ddlList=[
"CREATE DATABASE IF NOT EXISTS `{0}`;".format(schema),
"DROP TABLE IF EXISTS `{0}`.`{1}`;".format(schema,data_table),
"DROP TABLE IF EXISTS `{0}`.`{1}`;".format(schema,dict_table),
data_table_ddl,
dict_table_ddl
]
athena_s3_staging_dir = "s3://{0}/athena_staging_dir".format(scratch_bucket)
hz.executeAthenaDDL(athena_s3_staging_dir, ddlList)
Creates or replaces the elastic search index to store our harmonized data and assoicated dictionary file
We save both incident data and dictionary information to elastic search, to power the search page. The dictionary fields are used to dynamically build the search filter panels in the search page side bar - these fields identify each variable, it's vargroup (accordian panel), type (ui selector to use), & description (hover tooltip). The incident record fields are used for the dataset record search.
Call es.createOrReplaceIndex(es_index) to reset and set up default field mappings for the index (see ./lib/elasticsearch.py for more info on default mappings). You can also optionally specify field mappings for individual fields using es.addTypeMapping() as illustrated below, to support the search features you need - for example use a mapping to set date type for timestamps, or to speficy geo_point field if you want to use maps in your Kibana dashboard.
# set up data index
# index name city name for uniqueness, and *harmonized* to allow ES queries across all datasets
es_dataindex = "{0}_harmonized".format(city.lower())
es_datatype = 'incidents'
es.createOrReplaceIndex(es_dataindex)
# create mappings for geolocation and datetime field.. all other fields inherit default mapping
mapping="""
{
"properties": {
"geolocation": {
"type": "geo_point"
},
"datetime": {
"type": "date",
"format" : "yyyy-MM-dd HH:mm:ss"}
}
}
}
"""
es.addTypeMapping(es_dataindex, es_datatype, mapping)
# set up dictionary index
# index name city name for uniqueness, and *dictionary* to allow ES queries across all datasets
es_dictindex = "{0}_dictionary".format(city.lower())
es_dicttype = 'dictionary'
es.createOrReplaceIndex(es_dictindex)
All data and dictionary fields can be indexed, by calling ex.saveToEs() and passing the df and df_dict object directly, as shown below. If instead you want to index a subset of the variables, make copy of the data dataframe, drop columns you don't want to index, generate a new dictionary dataframe using the new data dataframe as the argument to hz.buildDataDict(df_for_indexing), and pass the new data and dictionary dataframes to es.saveToEs() as shown below.
# index all variables in data dataframe
print("Saving data to elasticsearch - please be patient")
df = df.withColumn("datetime",df["datetime"].cast("string")) # elasticsearch needs datetimes in a string type
es.saveToEs(df,index=es_dataindex,doctype=es_datatype)
# index all variables in dictionary dataframe
print("Saving dictionary to elasticsearch - please be patient")
es.saveToEs(df_dict,index=es_dictindex,doctype=es_dicttype)
Test that the data table was sucessfully created by using SQL to load a few rows.
What we see below is what will be exposed in the search UI, so we want to see what our harmonized data looks like. Did we get all the right headers? Does it match the target data type? Is any applied logic correct? etc...
sql="SELECT * FROM %s.%s LIMIT 3" % (schema, data_table)
# run query, convert results to a local pandas dataframe, and display as an HTML table.
hc.sql(sql).show(truncate=False)
Test the summary of the data to verify our dictionary was successfully created
The data dictionary drives how the data is organized and presented in the UI. We want to ensure we have our expected row counts, data types, mins and means, and that data is organized correctly for display in the UI. What we see here will translate into the search widgets and any available ranges that control those widgets.
sql="SELECT * FROM %s.%s ORDER BY dict_field ASC" % (schema, dict_table)
# run query, convert results to a local pandas dataframe, and display as an HTML table.
HTML(hc.sql(sql).toPandas().to_html())
Save the notebook using javascript to trigger the save_checkpoint method.
Convert notebook .ipny to html, and use hz.publishNotebookToS3 to copy the .ipny and .html files to the target S3 folder with web access enabled.
This provides a record within the UI of all the harmonization logic used to transform the raw data into what is exposed through the search and discovery tool. This record allows for easier verification, enhancements, or modifications of harmonization routines.
%%html
<!- SHOW / HIDE CODE TOGGLE ->
<script>
var code_hide=true; //true -> hide code at first
function code_showhide_toggle() {
if (code_hide){
$('div.input').hide();
$('div.prompt').hide();
} else {
$('div.input').show();
$('div.prompt').show();
}
code_hide = !code_hide
}
$( document ).ready(code_showhide_toggle);
</script>
!date
%%javascript
// save current notebook
IPython.notebook.save_checkpoint()
# convert ipynb to html, and move into subfolder
!jupyter nbconvert --to html $citynotebook
# copy saved notebook (ipynb and html formats) to target S3 bucket
hz.publishNotebookToS3(outputpath_doc, notebook_urlbase, citynotebook)
# move html copy of notebook into subfolder
f = citynotebook + ".html"
!mkdir -p ./html
os.rename()
print("Notebook execution complete.")
!aws s3 cp %s %s --sse --grants read=uri=http://acs.amazonaws.com/groups/global/AllUsers