What are UDFs?
UDFs or user defined functions are a simple way of adding a function into the SparkSQL language. This function operates on distributed DataFrames and works row by row (unless you're creating an user defined aggregation function. You can find UDFs across a variety of SQL environments and certainly come up in Apache Hive. Think of it this way, it's common in SQL to perform something like a sum function. Here's an example, in this query - we've got a function
That function is a
User Defined Function
one that we made up and added into the SQL language, that creates a date out of separate fields.
SELECT customer_id, makeDateTime(date_field, time_field, timezone) as datetime, amount
So what's the point? Well, that makes it a lot easier to work in a language that you understand and use in your domain. It's an easy way(just like functions in programming) of abstracting away a common operation.
Defining Spark UDFs
Let's go through the process of defining our own UDF. We're going to define the UDF from the example above and use it on a Spark DataFrame. Defining our udf is pretty easy, we just create an anonymous function and register it through the
or through the udf function in
depending on how you want to use it.
Here's the set up. Imagine purchases is a DataFrame in the layout of:
Our goal here is to actually get a datetime field that we can use, Let's go ahead and give it a shot.
case class Purchase(customer_id: Int, purchase_id: Int, date: String, time: String, tz: String, amount:Double)
val x = sc.parallelize(Array(
Purchase(123, 234, "2007-12-12", "20:50", "UTC", 500.99),
Purchase(123, 247, "2007-12-12", "15:30", "PST", 300.22),
Purchase(189, 254, "2007-12-13", "00:50", "EST", 122.19),
Purchase(187, 299, "2007-12-12", "07:30", "UTC", 524.37)
val df = sqlContext.createDataFrame(x)
Now let's define our functions! The underscores simply signify that it's a partially applied function.
def makeDT(date: String, time: String, tz: String) = s"$date $time $tz"
// Now we can use our function directly in SparkSQL.
sqlContext.sql("SELECT amount, makeDt(date, time, tz) from df").take(2)
// but not outside
df.select($"customer_id", makeDt($"date", $"time", $"tz"), $"amount").take(2) // fails
You can see above that we can use it within SQL but not outside of it. To do that we're going to have to create a different UDF using
val makeDt = udf(makeDT(_:String,_:String,_:String))
// now this works
df.select($"customer_id", makeDt($"date", $"time", $"tz"), $"amount").take(2)
Pretty simple right? but nothing that's too crazy, we're really just joining these fields together. Now we can see that it's working however we've still got a string representation. In Spark version 1.5, functions to create date times were introduced. You can see them in the documentation
however I always find than an example is worth more than the docs.
Now we can leave our function the same however we're just going to create a format and wrap our MakeDT function in the
function call, we can do this both in and out of SparkSQL!
val fmt = "yyyy-MM-dd hh:mm z"
df.select($"customer_id", unix_timestamp(makeDt($"date", $"time", $"tz"), fmt), $"amount").take(2)
sqlContext.sql(s"SELECT customer_id, unix_timestamp(makeDt(date, time, tz), '$fmt'), amount FROM df").take(2)
As any programmer knows, working with time is extremely annoying and luckily Spark makes it fairly straightforward to do - we can create java DateTime formats which are fairly easy to work with. This system is really pretty flexible too!