Rossmann operates over 3,000 drug stores in 7 European countries. Currently, Rossmann store managers are tasked with predicting their daily sales for up to six weeks in advance. Store sales are influenced by many factors, including promotions, competition, school and state holidays, seasonality, and locality. With thousands of individual managers predicting sales based on their unique circumstances, the accuracy of results can be quite varied. In their first Kaggle competition, Rossmann is challenging you to predict 6 weeks of daily sales for 1,115 stores located across Germany. Reliable sales forecasts enable store managers to create effective staff schedules that increase productivity and motivation. By helping Rossmann create a robust prediction model, you will help store managers stay focused on whatâ€™s most important to them: their customers and their teams!

- That our input data is drawn from a multivariate normal distribution, ie that our variables are independent and normally distributed
- Observations are independent of one another
- A Linear, additive relationship between our input variables and our output variables
- Homoscedasticity of the error terms, or that the error terms should be distributed normally around the regression line
- Our variables are measured without systematic error (and like the point above, that the error values are drawn from some normal process, not caused by a confounding variable) Let's review those assumptions in this case. We're violating the first one because we our SchoolHoliday and StateHoliday variables are likely to be correlated. We're working with Time Series data so we're likely violating the second one as well. In general, finding problems that strictly fit the linear relationship is difficult and this problem is no exception, so we're likely violating the third assumption as well. The last two we can hope for but again, we're likely violating. With all that being said, we should still at least experiment with this model because it's so well understood and it makes for a strong baseline predictor for future exploration. This aligns with the Concepts of Structural Risk Minimization(if you don't understand SRM, don't worry about it). Now let's define the loss function for linear regression. In linear regression, we hope to minimize the squared error, defined as: L_n = ||Xw^T - y||^2 Where X is our input Matrix, w is our weight vector and y is our output vector. There is a closed form solution to this problem that can be found by inverting the inner product but Spark, because of it's orientation towards big data doesn't actually look to solve the problem this way. Spark leverages gradient descent methods to efficiently descend down the gradient of risk function to arrive at (hopefully) a solution near the global minimum.

```
import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
```

```
val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")
val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")
```

Secondly we're going to want to assemble all of our vectors together into one vector to input into our model.
```
val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open",
"DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")
```

`Evaluator`

that will judge how well our model is doing and automatically select the best parameter for us to use based on that metric. This means that we get to train lots and lots of different models to see which one is best. Super simple!
Let's walk through the creation for each model.
```
def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()
val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, lr))
val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}
```

We're going to create our model, use the above created indexers and assemblers and run our data through it. The one thing to note is that by default the pipeline uses the `label`

and `features`

columns as the output column and input features (regardless of the fact that this is regression). If we want to we can manually set these using the appropriate setter methods on our Linear Regression instance.
```
def loadTrainingData(sqlContext:HiveContext):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
.load("../mlproject/rossman/train.csv")
.repartition(6)
trainRaw.registerTempTable("raw_training_data")
sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\\d+-\\d+-(\\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}
def loadKaggleTestData(sqlContext:HiveContext) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
.load("../mlproject/rossman/test.csv")
.repartition(6)
testRaw.registerTempTable("raw_test_data")
val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\\d+-\\d+-(\\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually
Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}
```

Bringing in the Data will build what we learned we about how to use Spark DataFrames. It's all fairly straightforward but feel free to comment below with any questions. One thing you'll notice is that we're performing several splits of the data.
Our training set gets split into the training and validation sets automatically by our TrainValidationSplit class and we've already set aside our own test set that we'll use as an internal test before submitting to kaggle.
```
def savePredictions(predictions:DataFrame, testRaw:DataFrame) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("linear_regression_predictions.csv")
}
```

```
def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")
// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>
(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)
model
}
```

Here's our linear regression:
```
val data = loadTrainingData(sqlContext)
val Array(testRaw, testData) = loadKaggleTestData(sqlContext)
// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
savePredictions(lrOut, testRaw)
```

Here's the output:
```
15/11/06 10:14:32 INFO RossmannRegression$: Test Explained Variance:
15/11/06 10:15:29 INFO RossmannRegression$: 1.152061820772418E7
15/11/06 10:15:29 INFO RossmannRegression$: Test R^2 Coef:
15/11/06 10:15:29 INFO RossmannRegression$: 0.7779152287252036
15/11/06 10:15:29 INFO RossmannRegression$: Test MSE:
15/11/06 10:15:29 INFO RossmannRegression$: 3273414.1560751097
15/11/06 10:15:29 INFO RossmannRegression$: Test RMSE:
15/11/06 10:15:29 INFO RossmannRegression$: 1809.257902034729
```

Which is surprisingly good!
## Questions or comments?

comments powered by Disqus