Applying a custom function on PySpark Columns with user-defined functions
What is a user-defined function in PySpark?
PySpark comes with a rich set of built-in functions that you can leverage to implement most tasks, but there may be cases when you would have to roll out your own custom function. In PySpark, we can easily register a custom function that takes as input a column value and returns an updated value. This guide will go over how we can register a user-defined function and use it to manipulate data in PySpark.
Applying a custom function on a column
Consider the following PySpark DataFrame:
df = spark.createDataFrame([['Alex',10], ['Bob',20], ['Cathy',30]], ['name','age'])df.show()+-----+---+| name|age|+-----+---+| Alex| 10|| Bob| 20||Cathy| 30|+-----+---+
Let's define a custom function that takes in as argument a single column value:
Here, our custom
to_upper(~) function returns the uppercased version of the input string.
Now, we must register our custom function using the
udf(~) function, which returns a function that can be used just like any other function in the
This basic example is only for demonstration - there already exists a built-in function
upper(~) in the
pyspark.sql.functions library that uppercases string values:
User-defined functions are treated as black-box by PySpark so these functions cannot be optimized under the hood. Therefore, use built-in functions whenever possible and define custom functions only when necessary.
Applying a custom function on multiple columns
We can easily extend our user-defined function such that it takes multiple columns as argument:
+--------------------+| my_func(name, age)|+--------------------+|Alex is 10 years old|| Bob is 20 years old||Cathy is 30 years...|+--------------------+
Here, note the following:
our custom function
my_func(~)now takes in two column values
my_udf(~), we now pass in two columns
Specifying the resulting column type
By default, the column returned will always be of type string regardless of the actual return type of your custom function. For instance, consider the following custom function:
Here, the return type of our function
my_double(~) is obviously an integer, but the resulting column type is actually set to a string:
root|-- my_double(age): string (nullable = true)
We can specify the resulting column type using the second argument in
udf_double = udf(my_double, 'int')root|-- my_double(age): integer (nullable = true)
Here, we have indicated that the resulting column type should be integer.
Equivalently, we could also import an explicit PySpark type like so:
from pyspark.sql.types import IntegerTypeudf_double = udf(my_double, IntegerType())root|-- my_double(age): integer (nullable = true)
Calling user-defined functions in SQL expressions
To use user-defined functions in SQL expressions, register the custom function using
def to_upper(some_string):return some_string.upper()spark.udf.register('udf_upper', to_upper)df.selectExpr('udf_upper(name)').show()+---------------+|udf_upper(name)|+---------------+| ALEX|| BOB|| CATHY|+---------------+
Here, the method
selectExpr(~) method takes in as argument a SQL expression.
We could also register the DataFrame as a SQL table so that we can run full SQL expressions like so:
# Register PySpark DataFrame as a SQL tabledf.createOrReplaceTempView('my_table')spark.sql('SELECT udf_upper(name) FROM my_table').show()+---------------+|udf_upper(name)|+---------------+| ALEX|| BOB|| CATHY|+---------------+
Specifying the return type
Again, the type of the resulting column is string regardless of what your custom function returns. Just like we did earlier when registering with
udf(~), we can specify the type of the returned column like so:
def my_double(int_age):return 2 * int_agespark.udf.register('udf_double', my_double, 'int')df.selectExpr('udf_double(age)').printSchema()root|-- udf_double(age): integer (nullable = true)
Equivalently, we could import the explicit type from
from pyspark.sql.types import IntegerTypespark.udf.register('udf_double', my_double, IntegerType())df.selectExpr('udf_double(age)').printSchema()root|-- udf_double(age): integer (nullable = true)
Limitations of user-defined functions
Ordering of execution in sub-expressions is not fixed
The ordering in which sub-expressions in SQL (e.g.
HAVING) are performed is not guaranteed. As an example, consider the following:
spark.udf.register('my_double', lambda val: 2 * val, 'int')spark.sql('SELECT * from my_table WHERE age IS NOT NULL AND my_double(age) > 5').show()
Here, we have the sub-expression defined by
WHERE that specifies two conditions linked using the
AND clause. There is no guarantee that the SQL parser will check
age IS NOT NULL before
my_double(age)>5. This means that the input supplied to our custom function
my_double(~) may be null, which can cause your custom function to break if you do not handle this case specifically.
The way to get around this problem is to use an
IF statement that guarantees the ordering of execution:
spark.udf.register('my_double', lambda val: 2 * val, 'int')spark.sql('SELECT * from my_table WHERE IF(age IS NOT NULL, my_double(age) > 5, null) IS NOT NULL').show()
Here, the input supplied to our
my_double(~) function is guaranteed to be not null.
Slow compared to built-in PySpark functions
Since PySpark does not know how to optimize user-defined functions, these functions will always be slower compared to built-in functions. Therefore, only turn to user-defined functions when built-in functions cannot be used to achieve your task.