View the Project on GitHub juandes/SFCrimeClassification-Spark-LogisticRegression
The "San Francisco Crime Classification" challenge, is a Kaggle competition aimed to predict the category of the crimes that occurred in the city, given the time and location of the incident.
In this post, I explain and outline my second solution to this challenge. This time using Spark and Python.
Link to the competition: San Francisco Crime Classification
The algorithm chosen for the implemented solution, is a multinomial logistic regression, a classification model based on regression where the dependent variable (what we want to predict) is categorical (opposite of continuous).
The competition provides two dataset: a train data set and a test dataset. The train dataset is made of 878049 observations and the test dataset, of 884262 observations.
Both of them contains incidents from January 1, 2003 to May 13, 2015.
The first lines of the script are for setting the configuration of Spark. The setMaster(...)
parameter accept the URL of the master. Usually, I check for this URL on the Spark Web UI, available at http://localhost:8080
. The second parameter is the name of the app, and the last one is the amount of memory per worker, which I set to 2 GB.
conf = SparkConf().setMaster("spark://spark.master.url:7077").setAppName(
"SFCrime-Kaggle"). \
set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
For loading the train and test dataset (both csv files), I used the package spark-csv.
To download the package and to add it to the project, use this command $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.2.0-s_2.11
when using spark-shell, spark-submit or pyspark.
Now, we load both files, and use registerTempTable
on the train dataset, to run SQL statements on it.
# Import both the train and test dataset and register them as tables
train = sqlContext.read.format('com.databricks.spark.csv').options(
header='true') \
.load('train.csv')
train.registerTempTable('train')
test = sqlContext.read.format('com.databricks.spark.csv').options(
header='true') \
.load('test.csv')
After loading the datasets, my next step was to do some make some modifications on the training dataset and preparing it for the training phase.
# Get all the unique categories and add them to a dictionary
crimeCategories = sqlContext.sql(
'SELECT DISTINCT Category FROM train').collect()
crimeCategories.sort()
categories = {}
for category in crimeCategories:
categories[category.Category] = float(len(categories))
The first step was to get all the unique crime categories and add them to a dictionary, using the category as the key and an integer (the current size of the dictionary as the time of insertion), as a value. So, instead of using the category string as the response (what we want to predict), we are using an integer. ...
Then, I created a HashingTF
object which does a similar job do what I did with the dictionary; HashingTF maps a sequence of terms into an integer. We will use this object to convert the vectors of predictors into a vector of numeric values.
# HashingTF transforms a string into a numerical value
htf = HashingTF(5000)
Now, we use Spark's map
function to convert every observation of the dataset into a LabeledPoint
, an object that contains a label and a vector (either sparse or dense) and that is used for classification in Spark.
The features or predictors that I used for the model are the day of week when the incident occurred, the police district where it occurred and the hour.
# Create LabeledPoint object
trainingData = train.map(lambda x: LabeledPoint(categories[x.Category],
htf.transform(
[x.DayOfWeek, x.PdDistrict,
datetime.strptime(x.Dates,
'%Y-%m-%d %H:%M:%S').hour])))
After pre-processing the data, the next step is to train the model.
# Train the model
logisticRegressionModel = LogisticRegressionWithLBFGS.train(trainingData,
iterations=100,
numClasses=39)
Note that this time I didn't checked the training error.
This step is follow by preparing the test dataset in a similar way as the training one.
# Prepare the testting dataset
testingSet = test.map(lambda x: htf.transform([x.DayOfWeek, x.PdDistrict,
datetime.strptime(x.Dates,
'%Y-%m-%d %H:%M:%S').hour]))
And finally, we predict and save the result.
# Predict using the day of the week and the police district and hour of crime
predictions = logisticRegressionModel.predict(testingSet)
predictions.saveAsTextFile('predictionsSpark')
The score received this time, was a bit lower than my first attempt (26.78360 and 26.74064). Before I started working with this algorithm, my original plan was to calculate the predicted probability for each class, However, in Spark, this can be done in a binary classification problem and since in this problem, there are 39 possible outcomes, it wasn't going to work. So, since I had already written most of the code, I decided to continue forward and finish it.