Chapter 13 : User Defined Functions(UDFs)¶
Chapter Learning Objectives¶
What is User Defined Function and how to use it?.
Chapter Outline¶
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
from IPython.display import display_html
import pandas as pd
import numpy as np
def display_side_by_side(*args):
html_str=''
for df in args:
html_str+=df.to_html(index=False)
html_str+= "\xa0\xa0\xa0"*10
display_html(html_str.replace('table','table style="display:inline"'),raw=True)
space = "\xa0" * 10
import panel as pn
css = """
div.special_table + table, th, td {
border: 3px solid orange;
}
"""
pn.extension(raw_css=[css])
1. What is User Defined Function?¶
User-Defined Functions (UDFs) are user-programmable routines that act on one row. Note: UDF’s are the most expensive operations hence use them only you have no choice and when essential
1a. How to create User-defined functions ?¶
Method:1
def squared(s):
return s * s
spark.udf.register("squared", squared)
<function __main__.squared(s)>
You can optionally set the return type of your UDF. The default return type is StringType.
from pyspark.sql.types import LongType
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared, LongType())
<function __main__.squared(s)>
Method:2
from pyspark.sql.functions import udf
@udf("long")
def squared(s):
return s * s
Method:3
from pyspark.sql.types import IntegerType
squared = udf(lambda x: x*x, IntegerType())
1b. How to use UDF in data frames?¶
Input: Spark dataframe
df = spark.createDataFrame([(1,),(2,),(3,)],["value"])
df.show()
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
+-----+
Use:1 Output : Spark dataframe containing transformed column using UDF
df.select(squared(df.value).alias("squared")).show()
+-------+
|squared|
+-------+
| 1|
| 4|
| 9|
+-------+
Use:2 Output : Spark dataframe containing transformed column using UDF
df.withColumn("squared", squared("value")).show()
+-----+-------+
|value|squared|
+-----+-------+
| 1| 1|
| 2| 4|
| 3| 9|
+-----+-------+
df.select(squared(df.value).alias("squared")).show()
+-------+
|squared|
+-------+
| 1|
| 4|
| 9|
+-------+
Evaluation order and null checking
Spark DataFrame API does not guarantee the order of evaluation of subexpressions. In particular, the inputs of an operator or function are not necessarily evaluated left-to-right or in any other fixed order.
df = spark.createDataFrame([(1,),(2,),(3,),(None,)],["value"])
df.show()
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| null|
+-----+
Below code will fail if you execute
df.select(squared(df.value).alias(“squared”)).show() # fail
from pyspark.sql.types import LongType
def squared(s):
if s is not None:
return s * s
spark.udf.register("squaredWithPython", squared, LongType())
<function __main__.squared(s)>
df.select(squared(df.value).alias("squared")).show()
+-------+
|squared|
+-------+
| 1|
| 4|
| 9|
| null|
+-------+