%%html
<a href="javascript:code_showhide_toggle()">Show/Hide Code</a>
This notebook will illustrate use of Jupyter with PySpark to
# CONFIGURATION VARIABLES
city="LosAngeles"
cityurl="https://data.lacounty.gov/api/views/ca5f-5zzs/rows.csv?accessType=DOWNLOAD"
citynotebook="LosAngeles-notebook" # citynotebook is used to automatically publish the notebook at the end
# location for raw (unprocessed) dataset in S3
scratch_bucket=os.environ["S3_SCRATCH_BUCKET"]
datasets3 = "s3://{0}/crimedata/raw/{1}.csv".format(scratch_bucket, city)
# locaton for harmonized (processed) dataset in S3. Structure is:
# outputroot
# |--- data - incidents - multiple CSV files
# |--- dictionary - incidents - CSV file containing data dictionary
# |--- doc - copy of notebook (published for web access)
outputroot = "s3://{0}/crimedata/harmonized/{1}".format(scratch_bucket,city)
outputpath_data = "{}/data".format(outputroot)
outputpath_dictionary="{}/dictionary".format(outputroot)
outputpath_doc="{}/docs".format(outputroot)
notebook_urlbase = "https://s3.amazonaws.com/{0}/{1}".format(outputpath_doc.replace("s3://",""), citynotebook)
# elasticsearch cluster endpoint
# - use local proxy service (installed via an EMR bootstrap) which signs ES API calls using EMR EC2 role.
esendpoint="localhost"
esport=9200
# Summary of configuration
print("City: {}".format(city))
print("Notebook Name: {}".format(citynotebook))
print("Dataset URL: {}".format(cityurl))
print("S3 dataset input: {}".format(datasets3))
print("Harmonized output: {}".format(outputroot))
print("ElasticSearch Cluster: {}.{}".format(esendpoint, esport))
print("Setup & Initialization done")
# INITIALIZATION - instantiate objects used by notebook
%matplotlib inline
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import *
from IPython.display import display, HTML
import datetime
import subprocess
# elasticsearch and harmonization objects defined in ./lib/esindex.py and ./lib/harmonizeCrimeIncidents.py
from lib.esindex import esindex
from lib.harmonizeCrimeIncidents import harmonizeCrimeIncidents
sc = SparkContext.getOrCreate()
hc = HiveContext(sc)
es = esindex(esendpoint)
hz = harmonizeCrimeIncidents(hc)
print("Initialization complete.")
# Download & install dataset from website
def copy_from_website_to_s3(city, cityurl, datasets3):
tmpfile="/tmp/{0}.csv".format(city)
print("Downloading {0} from: {1}".format(tmpfile, cityurl))
subprocess.check_output("curl {0} -o {1}".format(cityurl, tmpfile).split())
print("Copying {0} to: {1}".format(tmpfile, datasets3))
subprocess.check_output("aws s3 cp {0} {1} --sse AES256".format(tmpfile, datasets3).split())
os.remove(tmpfile)
# uncomment if you want to re-download the dataset
copy_from_website_to_s3(city, cityurl, datasets3)
# Read input datasets as DataFrames
# No need to infer schema - all variables typed as strings
print("Loading dataset from S3")
df = hc.read.load( datasets3,
format='com.databricks.spark.csv',
header='true',
inferSchema='false',
delimiter=',')
df_in=df # keep copy of raw data for reference
print("Dataset {0}: Loaded {1} rows.".format(city, df.count()))
Edit as desired to interactively explore the data
With the data source file loaded into a Spark Data Frame, you can get some details on the content of the data using methods from Spark, Pandas, and others. This is purely exploratory information to give users insights into the raw data set.
# examine column headers
str(df.columns)
# look at first 2 rows
df.head(2)
# look at the distinct values for 'CRIME_CATEGORY_DESCRIPTION'
df.select('CRIME_CATEGORY_DESCRIPTION').distinct().show(truncate=False)
# Graph incident count by Description
descr = df.select('CRIME_CATEGORY_DESCRIPTION').toPandas()
descrGrp = descr.groupby('CRIME_CATEGORY_DESCRIPTION').size().rename('counts')
descrPlot = descrGrp.plot(kind='bar')
Standard harmonised variables for crime incident datasets are defined in ./lib/harmonizeCrimeIncidents.py (You can open this module in JupyterNB, and modify the variable list and associated standard variable metadata as required)
Harmonization is the process of mapping the raw variables of each incoming dataset to use the standard 'harmonized' variables and associated units of measurement, as much as possible. Harmonized datasets support cross-dataset search as well as the ability to combine/union datasets to perform multi dataset analysis and research.
See examples below for how to generate new variables from existing variables, and how to manipulate variable values. The hz (harmonizeCrimeIncidents) class provides methods to help abstract the code for some common actions.
A core challenge when combining loosely coupled in a combined search index is dealing with different names for the same attribute. For example "Sex" versus "Gender" or "48in" versus "4ft". We have a pre-defined set of standard variable names and types that we are using for our search page, the harmonization process ensures that attributes and values in the raw data files to match that predifined set to allow for a consistent search tool across multiple data sets.
# Use hz.mapVar(old,new, keepOrig=False) to create new variables from the original variables, by default dropping
# the original variable. Use keepOrig=True argument to keep the original variable in the dataset.
# Metadata for the transformation will be captured and included in the data dictionary
df = hz.mapVar(df, "STREET", "location")
df = hz.mapVar(df, "STATION_NAME", "neighbourhood", keepOrig=True)
# Rename any variables that have illegal names
# no illegal characters or spaces (required to support parquet output format)
# all lowercase variable names (required to provide compatibility with Amazon Athena)
df = hz.makeValidVariableNames(df)
# Harmonize Description variable to standard values.
df = hz.mapVar(df, "CRIME_CATEGORY_DESCRIPTION", "Description", keepOrig=True) # make a new copy of Description variable, original renamed
# map value to standard descriptions
descrMappings = {
"AGGRAVATED ASSAULT" : "Assault",
"ARSON" : "Arson",
"BURGLARY" : "Burglary",
"CRIMINAL HOMICIDE" : "Homicide",
"DISORDERLY CONDUCT" : "Miscellaneous",
"DRUNK / ALCOHOL / DRUGS" : "OUI",
"DRUNK DRIVING VEHICLE / BOAT" : "OUI",
"FEDERAL OFFENSES W/O MONEY" : "Miscellaneous",
"FEDERAL OFFENSES WITH MONEY" : "Miscellaneous",
"FELONIES MISCELLANEOUS" : "Miscellaneous",
"FORCIBLE RAPE" : "Rape",
"FORGERY" : "Miscellaneous",
"FRAUD AND NSF CHECKS" : "Fraud",
"GAMBLING" : "Miscellaneous",
"GRAND THEFT AUTO" : "Vehicle Theft",
"LARCENY THEFT" : "Theft",
"LIQUOR LAWS" : "Miscellaneous",
"MISDEMEANORS MISCELLANEOUS" : "Miscellaneous",
"NARCOTICS" : "Narcotics",
"NON-AGGRAVATED ASSAULTS" : "Assault",
"OFFENSES AGAINST FAMILY" : "Miscellaneous",
"RECEIVING STOLEN PROPERTY" : "Miscellaneous",
"ROBBERY" : "Robbery",
"SEX OFFENSES FELONIES" : "Sex Offenses",
"SEX OFFENSES MISDEMEANORS" : "Sex Offenses",
"VAGRANCY" : "Miscellaneous",
"VANDALISM" : "Miscellaneous",
"VEHICLE / BOATING LAWS" : "Miscellaneous",
"WARRANTS" : "Miscellaneous",
"WEAPON LAWS" : "Weapons"
}
df = hz.mapValues(df, "description", descrMappings)
# Add city variable. All rows are given the value of the current city name
df = df.withColumn('city', lit(city))
# Add TransformDescr metadata field
hz.addTransformDescr('city','"city" assigned by harmonization code')
print("Add 'city' variable")
Cleanup and harmonise geographic co-ordinate variables. As we can see below, both latitude and longitude variables have rows with invalid values - empty strings and invalid coordinates. For this example we will simply remove those rows from our dataset, in order to avoid constructing invalid geo-point coordinates that will cause failuers when attempting to index into elasticsearch later.
df.select('latitude').distinct().sort('latitude', ascending=True).show(10)
df.select('longitude').distinct().sort('longitude', ascending=True).show(10)
# filter out records with invalid latitude or longitude coordinates
c1 = df.count()
# eliminate empty & null values
df = df.where(length('latitude') > 0).where(length('longitude') > 0)
# eliminate invalid (negative) latitude values.. All northen hemisphere values expected to be postive
df = df.where(df.latitude > 0)
c2 = df.count()
print("Deleted {} rows with corrupted coordinates in latitude and longitude".format(c1-c2))
# Format GeoLocation field by combining latitude and longitude
df = df.withColumn('geolocation', concat(df.latitude, lit(','),df.longitude))
df = df.drop('latitude').drop('longitude')
hz.addTransformDescr('geolocation','geolocation variable created from latitude and longitude variables')
print("Generated Harmonized geolocation variable, and dropped original latitude and longitude variables")
# Generate standard datetime, date part, and time part fields from raw crime_date field
# for simplicity we will keep all times in localtime (ignoring timezones)
# Split crime_date into year, month, day (all defined as harmonized variables in the hz class)
# crime_date is formatted as month/day/year hour:minute AM|PM
regex=r'(\d+)/(\d+)/(\d+) (\d+):(\d+):(\d+) (\w\w)'
df = df.withColumn('month', regexp_extract('crime_date', regex, 1))
hz.addTransformDescr('month','month, extracted from crime_date')
df = df.withColumn('day', regexp_extract('crime_date', regex, 2))
hz.addTransformDescr('day','day, extracted from crime_date')
df = df.withColumn('year', regexp_extract('crime_date', regex, 3))
hz.addTransformDescr('year','year, extracted from crime_date')
df = df.withColumn('hour', regexp_extract('crime_date', regex, 4))
df = df.withColumn('hour', regexp_replace('hour', '24', '00'))
hz.addTransformDescr('hour','hour, extracted from crime_date')
df = df.withColumn('minute', regexp_extract('crime_date', regex, 5))
hz.addTransformDescr('minute','minute, extracted from crime_date')
df = df.withColumn('AMPM', regexp_extract('crime_date', regex, 7))
# Convert hour to 24hr format
df = df.withColumnRenamed('hour', 'hour_tmp')
selectExpr = "IF( ampm = 'PM', cast((cast(hour_tmp as int) + 12) as string), hour_tmp ) AS hour"
df=df.selectExpr(selectExpr,"*").drop('hour_tmp').drop('ampm')
df = df.withColumn('hour', regexp_replace('hour', '24', '00')) # replace 24:00 with 00:00
# Create new datetime field in format YYYY-MM-DD hhmm (defined as a harmonized variable in the hz class)
df = df.withColumn('datetime',concat(concat_ws('-',df.year,df.month,df.day),lit(' '),concat_ws(':',df.hour,df.minute,lit('00'))).cast("timestamp"))
hz.addTransformDescr('datetime','Full timestamp with date and time, eg 2007-04-05 14:30')
# Drop the original crime_date and CRIME_YEAR variables - no longer needed
df = df.drop('crime_date').drop('crime_year')
# Cast all the date time part fields from string to int (allows use of numeric range filters in the search UI)
for col in ['year','month','day','hour','minute']:
df = df.withColumn(col, df[col].cast("int"))
# Add dayofweek variable, eg Monday, Tuesday, etc. (defined as a harmonized variable in the hz class)
df = df.withColumn('dayofweek',date_format('datetime', 'EEEE'))
hz.addTransformDescr('dayofweek','day of Week, calculated from datetime')
print("Harmonized date & time variables")
df.select('datetime').show(2,truncate=False)
# Add dataset descriptive variables (defined in hz class).
# location of raw data
df = df.withColumn('rawdatapath', lit(datasets3))
hz.addTransformDescr('rawdatapath','Assigned by harmonization code')
# location of harmonized data
df = df.withColumn('harmonizeddatapath', lit(outputroot))
hz.addTransformDescr('harmonizeddatapath','Assigned by harmonization code')
# URL for this notoebook (notebook will be saved/published using cells at the end of the notebook)
df = df.withColumn('notebookhtml', lit(notebook_urlbase + ".html"))
hz.addTransformDescr('notebookhtml','Assigned by harmonization code')
print("Added dataset descriptive variables")
Create some additional variables for this specific data set.
Here we can assign a default 'vargroup' for all variables that are not part of the defined list of harmonized variables in the hz class, using the hz.addVarGroup() method. (The 'vargroup' is used by the search UI to group variables under 'accordian' panels in the search page side bar).
We can also assign custom metadata to selected unharmonized variables as desired. NOTE:
This process adds customized search parameters to this specific data set in the UI that aren't part of our standard set.
Not all individual variables within a single data set are always valuable on their own. Some need some additional logic or combination to make search and discovery a better experience. This process allows the harmonization routine to apply that logic and expose it within the UI
# vargroups are used to define the Search Filter UI 'accordians' and their order
# set default variable group - used if variable is not exicitly assigned to a group
defaultVarGroup = "{0} (Unharmonized)".format(city)
hz.addVarGroup(defaultVarGroup,order=90,default=True)
# Metadata for the harmonized variables is already defined in 'hz' class.
# Use hz.addVarMetadata() to add metadata for additional (non harmonized) variables to control
# how they are presented in the data dictionary and search UI.
# Here, "GANG_RELATED" is not defined as one of the standard harmonized variables, but we can configure how it will be
## handled in the filter UI by setting metadata here.
# First, convert the Y/N values to 1|0 (true|false) and cast to int, to support a boolean selector
df = hz.mapValues(df, "gang_related", {"Y":1,"N":0})
df = df.withColumn('gang_related', df['gang_related'].cast("int"))
# Then set 'type' to boolean, which will generate a True/False selector in the UI.
hz.addVarMetadata("gang_related",
vargroup=defaultVarGroup,
type="boolean",
descr="Incident is gang related?",
uifilter=True)
print("Converted variable <gang_related> to boolean and added metadata")
df.head(2)
Generate a dictionary table containing field distribution stats and metadata data from the mappings
All variables that a) don't match standard harmonized variables, and b) don't have added metadata will be assigned to the default vargroup, and the variable type will be derived from the data distribution characteristics, calculated by hz.buildDataDict().
The data dictionary is used to generate dynamic search widgets and tools based on the data sets themselves. By basing our search widgets on the data itself, rather than hard-coded, it allows the search UI to update based on available data.
df_dict = hz.buildDataDict(df)
print("Data Dictionary created.")
Use the hiveContext object 'hc' to create a new schema for this city, and save the data dafarame and dictionary dataframe as tables in this schema with the hz.saveAsParquetTable() method.
Data and the associated dictionary information is saved to the S3 output path as parquet files.
This allows the tables we've created or modified through harmonization to be easily restored, combined, and analysed using SQL.
# Drop and create schema
schema="incidents"
hc.sql("DROP SCHEMA IF EXISTS {0} CASCADE".format(schema))
hc.sql("CREATE SCHEMA {0} COMMENT 'Crime incident data for {0}'".format(schema))
# Create Incident Data as SparkSQL table with S3 backed storage
data_table=city.lower()
data_table_ddl=hz.saveAsParquetTable(df,schema,data_table,outputpath_data)
# Create Dictionary as SparkSQL table with S3 backed storage
dict_table = data_table+"_dict"
dict_table_ddl=hz.saveAsParquetTable(df_dict.coalesce(1),schema,dict_table,outputpath_dictionary)
print "Done creating tables"
The S3 parquet files containing the harmonizd data are registered as Amazon Athena external tables.
You can use Amazon Athena to perform detailed ad-hoc analysis of this and other harmonised datasets using the familiar power of SQL. Using Athena also allows you to easily integrate the dataset with Amazon Quicksight where you can create visual analyses and dashboards.
ddlList=[
"CREATE DATABASE IF NOT EXISTS `{0}`;".format(schema),
"DROP TABLE IF EXISTS `{0}`.`{1}`;".format(schema,data_table),
"DROP TABLE IF EXISTS `{0}`.`{1}`;".format(schema,dict_table),
data_table_ddl,
dict_table_ddl
]
athena_s3_staging_dir = "s3://{0}/athena_staging_dir".format(scratch_bucket)
hz.executeAthenaDDL(athena_s3_staging_dir, ddlList)
Creates or replaces the elastic search index to store our harmonized data and assoicated dictionary file
We save both incident data and dictionary information to elastic search, to power the search page. The dictionary fields are used to dynamically build the search filter panels in the search page side bar - these fields identify each variable, it's vargroup (accordian panel), type (ui selector to use), & description (hover tooltip). The incident record fields are used for the dataset record search.
Call es.createOrReplaceIndex(es_index) to reset and set up default field mappings for the index (see ./lib/elasticsearch.py for more info on default mappings). You can also optionally specify field mappings for individual fields using es.addTypeMapping() as illustrated below, to support the search features you need - for example use a mapping to set date type for timestamps, or to speficy geo_point field if you want to use maps in your Kibana dashboard.
# set up data index
# index name city name for uniqueness, and *harmonized* to allow ES queries across all datasets
es_dataindex = "{0}_harmonized".format(city.lower())
es_datatype = 'incidents'
es.createOrReplaceIndex(es_dataindex)
# create mappings for geolocation and datetime field.. all other fields inherit default mapping
mapping="""
{
"properties": {
"geolocation": {
"type": "geo_point"
},
"datetime": {
"type": "date",
"format" : "yyyy-MM-dd HH:mm:ss"}
}
}
}
"""
es.addTypeMapping(es_dataindex, es_datatype, mapping)
# set up dictionary index
# index name city name for uniqueness, and *dictionary* to allow ES queries across all datasets
es_dictindex = "{0}_dictionary".format(city.lower())
es_dicttype = 'dictionary'
es.createOrReplaceIndex(es_dictindex)
All data and dictionary fields can be indexed, by calling ex.saveToEs() and passing the df and df_dict object directly, as shown below. If instead you want to index a subset of the variables, make copy of the data dataframe, drop columns you don't want to index, generate a new dictionary dataframe using the new data dataframe as the argument to hz.buildDataDict(df_for_indexing), and pass the new data and dictionary dataframes to es.saveToEs() as shown below.
# index all variables in data dataframe
print("Saving data to elasticsearch - please be patient")
df = df.withColumn("datetime",df["datetime"].cast("string")) # elasticsearch needs datetimes in a string type
es.saveToEs(df,index=es_dataindex,doctype=es_datatype)
# index all variables in dictionary dataframe
print("Saving dictionary to elasticsearch - please be patient")
es.saveToEs(df_dict,index=es_dictindex,doctype=es_dicttype)
Test that the data table was sucessfully created by using SQL to load a few rows.
What we see below is what will be exposed in the search UI, so we want to see what our harmonized data looks like. Did we get all the right headers? Does it match the target data type? Is any applied logic correct? etc...
sql="SELECT * FROM %s.%s LIMIT 3" % (schema, data_table)
# run query, convert results to a local pandas dataframe, and display as an HTML table.
HTML(hc.sql(sql).toPandas().to_html())
Test the summary of the data to verify our dictionary was successfully created
The data dictionary drives how the data is organized and presented in the UI. We want to ensure we have our expected row counts, data types, mins and means, and that data is organized correctly for display in the UI. What we see here will translate into the search widgets and any available ranges that control those widgets.
sql="SELECT * FROM %s.%s ORDER BY dict_field ASC" % (schema, dict_table)
# run query, convert results to a local pandas dataframe, and display as an HTML table.
HTML(hc.sql(sql).toPandas().to_html())
Save the notebook using javascript to trigger the save_checkpoint method.
Convert notebook .ipny to html, and use hz.publishNotebookToS3 to copy the .ipny and .html files to the target S3 folder with web access enabled.
This provides a record within the UI of all the harmonization logic used to transform the raw data into what is exposed through the search and discovery tool. This record allows for easier verification, enhancements, or modifications of harmonization routines.
%%html
<!- SHOW / HIDE CODE TOGGLE ->
<script>
var code_hide=true; //true -> hide code at first
function code_showhide_toggle() {
if (code_hide){
$('div.input').hide();
$('div.prompt').hide();
} else {
$('div.input').show();
$('div.prompt').show();
}
code_hide = !code_hide
}
$( document ).ready(code_showhide_toggle);
</script>
!date
%%javascript
// save current notebook
IPython.notebook.save_checkpoint()
# convert ipynb to html
!jupyter nbconvert --to html $citynotebook
# copy saved notebook (ipynb and html formats) to target S3 bucket
hz.publishNotebookToS3(outputpath_doc, notebook_urlbase, citynotebook)