Chapter 4 : Data Types & Schema¶
Chapter Learning Objectives¶
Various Data types in Spark.
Use of Schema.
Chapter Outline¶
# import panel as pn
# css = """
# div.special_table + table, th, td {
# border: 3px solid orange;
# }
# """
# pn.extension(raw_css=[css])
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
from IPython.display import display_html
import pandas as pd
import numpy as np
def display_side_by_side(*args,space):
html_str=''
for df in args:
html_str+=df.to_html(index=False)
html_str+= "\xa0\xa0\xa0"*space
display_html(html_str.replace('table','table style="display:inline"'),raw=True)
space = "\xa0" * 10
Common data types in spark¶
Numeric types
Name |
Description |
Example |
---|---|---|
IntegerType |
Represents 4-byte signed integer numbers. |
64 |
LongType |
Represents 8-byte signed integer numbers |
1000 |
FloatType |
Represents 4-byte single-precision floating point numbers. |
1000.45 |
DoubleType |
Represents 8-byte double-precision floating point numbers. |
10000000.45 |
String type
Name |
Description |
Example |
---|---|---|
StringType |
Represents character string values. |
“tony” |
Boolean type
Name |
Description |
Example |
---|---|---|
BooleanType |
Represents boolean values. |
“true” |
Datetime type
Name |
Description |
Example |
---|---|---|
TimestampType |
Represents values comprising values of fields year, month, day, hour, minute, and second, with the session local time-zone. The timestamp value represents an absolute point in time |
2019-11-03 05:30:00 UTC-05:00 |
DateType |
Represents values comprising values of fields year, month and day, without a time-zone. |
2019-11-03 |
Complex types
Name |
Definition |
Description |
Example |
---|---|---|---|
ArrayType |
ArrayType(elementType, containsNull) |
Represents values comprising a sequence of elements with the type of elementType. containsNull is used to indicate if elements in a ArrayType value can have null values. |
[“tony”,”kent”, “mike”] |
MapType |
MapType(keyType, valueType, valueContainsNull): |
Represents values comprising a set of key-value pairs. The data type of keys is described by keyType and the data type of values is described by valueType. For a MapType value, keys are not allowed to have null values. valueContainsNull is used to indicate if values of a MapType value can have null values. |
{“name”:”tony”} |
StructType |
StructType(fields) |
Represents values with the structure described by a sequence of StructFields (fields).StructField(name, dataType, nullable): Represents a field in a StructType. The name of a field is indicated by name. The data type of a field is indicated by dataType. nullable is used to indicate if values of these fields can have null values. |
{“name”:”tony”,”age”:30,”city”:””seattle”} |
1a. How to get the data type of a column/data frame?¶
Lets first understand the syntax
Syntax
pyspark.sql.DataFrame.dtypes
Returns all column names and their data types as a list.
‘’’
Input: Spark data frame
df_mul = spark.createDataFrame([('John', 60, True, 1.7, '1960-01-01'),
('Tony', 30, False, 1.8, '1990-01-01'),
('Mike', 40, True, 1.65, '1980-01-01')],['name', 'age', 'smoker','height', 'birthdate'])
df_mul.show()
+----+---+------+------+----------+
|name|age|smoker|height| birthdate|
+----+---+------+------+----------+
|John| 60| true| 1.7|1960-01-01|
|Tony| 30| false| 1.8|1990-01-01|
|Mike| 40| true| 1.65|1980-01-01|
+----+---+------+------+----------+
Output : Spark data frame column types
df_mul.dtypes
[('name', 'string'),
('age', 'bigint'),
('smoker', 'boolean'),
('height', 'double'),
('birthdate', 'string')]
Summary:
print("Input ", "Output")
display_side_by_side(df_mul.toPandas(),pd.DataFrame([str(df_mul.dtypes[0:6])],columns=["."]),space=1)
Input Output
name | age | smoker | height | birthdate |
---|---|---|---|---|
John | 60 | True | 1.70 | 1960-01-01 |
Tony | 30 | False | 1.80 | 1990-01-01 |
Mike | 40 | True | 1.65 | 1980-01-01 |
. |
---|
[('name', 'string'), ('age', 'bigint'), ('smoker', 'boolean'), ('height', 'double'), ('birthdate', 'string')] |
1b. How to change the data type of a column?¶
Lets first understand the syntax
Syntax
pyspark.sql.Column.cast
Convert the column into type dataType.
‘’’
Input: Spark data frame with a column “age” of integer type
df_mul = spark.createDataFrame([('John', 60, True, 1.7),
('Tony', 30, False, 1.8, ),
('Mike', 40, True, 1.65, )],['name', 'age', 'smoker','height'])
df_mul.show()
+----+---+------+------+
|name|age|smoker|height|
+----+---+------+------+
|John| 60| true| 1.7|
|Tony| 30| false| 1.8|
|Mike| 40| true| 1.65|
+----+---+------+------+
df_mul.dtypes
[('name', 'string'),
('age', 'bigint'),
('smoker', 'boolean'),
('height', 'double')]
Output : Spark data frame with a column with a split string
df_cast = df_mul.select("name",df_mul.age.cast("string").alias('age'), "smoker", "height")
df_cast.show()
+----+---+------+------+
|name|age|smoker|height|
+----+---+------+------+
|John| 60| true| 1.7|
|Tony| 30| false| 1.8|
|Mike| 40| true| 1.65|
+----+---+------+------+
df_cast.dtypes
[('name', 'string'),
('age', 'string'),
('smoker', 'boolean'),
('height', 'double')]
Summary:
print("Input ", "Output")
display_side_by_side(df_mul.toPandas(),df_cast.toPandas(),space=25)
print("Data types ", "Data types")
display_side_by_side(pd.DataFrame([str(df_mul.dtypes)],columns=["."]),pd.DataFrame([str(df_cast.dtypes)],columns=["."]),space=5)
Input Output
name | age | smoker | height |
---|---|---|---|
John | 60 | True | 1.70 |
Tony | 30 | False | 1.80 |
Mike | 40 | True | 1.65 |
name | age | smoker | height |
---|---|---|---|
John | 60 | True | 1.70 |
Tony | 30 | False | 1.80 |
Mike | 40 | True | 1.65 |
Data types Data types
. |
---|
[('name', 'string'), ('age', 'bigint'), ('smoker', 'boolean'), ('height', 'double')] |
. |
---|
[('name', 'string'), ('age', 'string'), ('smoker', 'boolean'), ('height', 'double')] |
2a. How to get the schema of a data frame?¶
Lets first understand the syntax
Converts a string expression to upper case.
Syntax
pyspark.sql.DataFrame.schema Returns the schema of this DataFrame as a pyspark.sql.types.StructType.
‘’’
Input: Spark data frame
df_mul = spark.createDataFrame([('John', 'Seattle', 60, True, 1.7, '1960-01-01'),
('Tony', 'Cupertino', 30, False, 1.8, '1990-01-01'),
('Mike', 'New York', 40, True, 1.65, '1980-01-01')],['name', 'city', 'age', 'smoker','height', 'birthdate'])
df_mul.show()
+----+---------+---+------+------+----------+
|name| city|age|smoker|height| birthdate|
+----+---------+---+------+------+----------+
|John| Seattle| 60| true| 1.7|1960-01-01|
|Tony|Cupertino| 30| false| 1.8|1990-01-01|
|Mike| New York| 40| true| 1.65|1980-01-01|
+----+---------+---+------+------+----------+
Output : Schema of Spark data frame
df_mul.schema
StructType(List(StructField(name,StringType,true),StructField(city,StringType,true),StructField(age,LongType,true),StructField(smoker,BooleanType,true),StructField(height,DoubleType,true),StructField(birthdate,StringType,true)))
How to print the schema in a tree format?¶
df_mul.printSchema()
root
|-- name: string (nullable = true)
|-- city: string (nullable = true)
|-- age: long (nullable = true)
|-- smoker: boolean (nullable = true)
|-- height: double (nullable = true)
|-- birthdate: string (nullable = true)
2b. How to define a schema?¶
Lets first understand the syntax
Syntax
pyspark.sql.functions.slice(x, start, length)
Collection function: returns an array containing all the elements in x from index start (array indices start at 1, or from the end if start is negative) with the specified length.
Parameters:
x : the array to be sliced
start : the starting index
length : the length of the slice ‘’’
Example#1¶
from pyspark.sql.types import *
schema1 = StructType([
StructField("name", StringType(), True),
StructField("city", StringType(), True),
StructField("age", IntegerType(), True),
StructField("smoker", BooleanType(), True),
StructField("height", FloatType(), True),
StructField("birthdate", StringType(), True),
])
schema1
StructType(List(StructField(name,StringType,true),StructField(city,StringType,true),StructField(age,IntegerType,true),StructField(smoker,BooleanType,true),StructField(height,FloatType,true),StructField(birthdate,StringType,true)))
schema1.fieldNames()
['name', 'city', 'age', 'smoker', 'height', 'birthdate']
Example#2¶
schema2 = StructType([
StructField("name", StringType()),
StructField("weight", LongType()),
StructField("smoker", BooleanType()),
StructField("height", DoubleType()),
StructField("birthdate", StringType()),
StructField("phone_nos", MapType(StringType(),LongType(),True),True),
StructField("favorite_colors", ArrayType(StringType(),True),True),
StructField("address", StructType([
StructField("houseno", IntegerType(),True),
StructField("street", StringType(),True),
StructField("city", StringType(),True),
StructField("zipcode", IntegerType(),True),
]))
])
print(schema2)
StructType(List(StructField(name,StringType,true),StructField(weight,LongType,true),StructField(smoker,BooleanType,true),StructField(height,DoubleType,true),StructField(birthdate,StringType,true),StructField(phone_nos,MapType(StringType,LongType,true),true),StructField(favorite_colors,ArrayType(StringType,true),true),StructField(address,StructType(List(StructField(houseno,IntegerType,true),StructField(street,StringType,true),StructField(city,StringType,true),StructField(zipcode,IntegerType,true))),true)))
schema2.fieldNames()
['name',
'weight',
'smoker',
'height',
'birthdate',
'phone_nos',
'favorite_colors',
'address']
2c. How to use the schema?¶
from pyspark.sql.types import *
from pyspark.sql import functions as func
schema = StructType([
StructField("name", StringType()),
StructField("weight", LongType()),
StructField("smoker", BooleanType()),
StructField("height", DoubleType()),
StructField("birthdate", StringType()),
StructField("phone_nos", MapType(StringType(),LongType(),True),True),
StructField("favorite_colors", ArrayType(StringType(),True),True),
StructField("address", StructType([
StructField("houseno", IntegerType(),True),
StructField("street", StringType(),True),
StructField("city", StringType(),True),
StructField("zipcode", IntegerType(),True),
]))
])
df = spark.createDataFrame((
[["john",180,True,1.7,'1960-01-01',{'office': 123456789, 'home': 223456789},["blue","red"],(100,'street1','city1',12345)],
["tony",180,True,1.8,'1990-01-01',{'office': 223456789, 'home': 323456789},["green","purple"],(200,'street2','city2',22345)],
["mike",180,True,1.65,'1980-01-01',{'office': 323456789, 'home': 423456789},["yellow","orange"],(300,'street3','city3',32345)]]
),schema=schema)
df.toPandas()#(3,False)
df.printSchema()
df.toPandas()
root
|-- name: string (nullable = true)
|-- weight: long (nullable = true)
|-- smoker: boolean (nullable = true)
|-- height: double (nullable = true)
|-- birthdate: string (nullable = true)
|-- phone_nos: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = true)
|-- favorite_colors: array (nullable = true)
| |-- element: string (containsNull = true)
|-- address: struct (nullable = true)
| |-- houseno: integer (nullable = true)
| |-- street: string (nullable = true)
| |-- city: string (nullable = true)
| |-- zipcode: integer (nullable = true)
name | weight | smoker | height | birthdate | phone_nos | favorite_colors | address | |
---|---|---|---|---|---|---|---|---|
0 | john | 180 | True | 1.70 | 1960-01-01 | {'office': 123456789, 'home': 223456789} | [blue, red] | (100, street1, city1, 12345) |
1 | tony | 180 | True | 1.80 | 1990-01-01 | {'office': 223456789, 'home': 323456789} | [green, purple] | (200, street2, city2, 22345) |
2 | mike | 180 | True | 1.65 | 1980-01-01 | {'office': 323456789, 'home': 423456789} | [yellow, orange] | (300, street3, city3, 32345) |
2d. How to save the schema?¶
df = spark.createDataFrame((
[["john",180,True,1.7,'1960-01-01',{'office': 123456789, 'home': 223456789},["blue","red"],(100,'street1','city1',12345)],
["tony",180,True,1.8,'1990-01-01',{'office': 223456789, 'home': 323456789},["green","purple"],(200,'street2','city2',22345)],
["mike",180,True,1.65,'1980-01-01',{'office': 323456789, 'home': 423456789},["yellow","orange"],(300,'street3','city3',32345)]]
),schema=schema)
df.printSchema()
root
|-- name: string (nullable = true)
|-- weight: long (nullable = true)
|-- smoker: boolean (nullable = true)
|-- height: double (nullable = true)
|-- birthdate: string (nullable = true)
|-- phone_nos: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = true)
|-- favorite_colors: array (nullable = true)
| |-- element: string (containsNull = true)
|-- address: struct (nullable = true)
| |-- houseno: integer (nullable = true)
| |-- street: string (nullable = true)
| |-- city: string (nullable = true)
| |-- zipcode: integer (nullable = true)
df.schema
StructType(List(StructField(name,StringType,true),StructField(weight,LongType,true),StructField(smoker,BooleanType,true),StructField(height,DoubleType,true),StructField(birthdate,StringType,true),StructField(phone_nos,MapType(StringType,LongType,true),true),StructField(favorite_colors,ArrayType(StringType,true),true),StructField(address,StructType(List(StructField(houseno,IntegerType,true),StructField(street,StringType,true),StructField(city,StringType,true),StructField(zipcode,IntegerType,true))),true)))
spark.conf.set("spark.hadoop.validateOutputSpecs", "false")
rdd_schema = spark.sparkContext.parallelize(df.schema)
#rdd_schema.coalesce(1).saveAsPickleFile("data/text/schema_file")
2e. How to load the saved schema?¶
schema_rdd = spark.sparkContext.pickleFile("data/text/schema_file")
schema = StructType(schema_rdd.collect())
print(schema)
StructType(List(StructField(name,StringType,true),StructField(weight,LongType,true),StructField(smoker,BooleanType,true),StructField(height,DoubleType,true),StructField(birthdate,StringType,true),StructField(phone_nos,MapType(StringType,LongType,true),true),StructField(favorite_colors,ArrayType(StringType,true),true),StructField(address,StructType(List(StructField(houseno,IntegerType,true),StructField(street,StringType,true),StructField(city,StringType,true),StructField(zipcode,IntegerType,true))),true)))
2f. How to get the names of all fields in the schema?¶
df_mul = spark.createDataFrame([('John', 'Seattle', 60, True, 1.7, '1960-01-01'),
('Tony', 'Cupertino', 30, False, 1.8, '1990-01-01'),
('Mike', 'New York', 40, True, 1.65, '1980-01-01')],['name', 'city', 'age', 'smoker','height', 'birthdate'])
df_mul.show()
+----+---------+---+------+------+----------+
|name| city|age|smoker|height| birthdate|
+----+---------+---+------+------+----------+
|John| Seattle| 60| true| 1.7|1960-01-01|
|Tony|Cupertino| 30| false| 1.8|1990-01-01|
|Mike| New York| 40| true| 1.65|1980-01-01|
+----+---------+---+------+------+----------+
df_mul.schema.fieldNames()
['name', 'city', 'age', 'smoker', 'height', 'birthdate']
Summary:
print("input ", "output")
display_side_by_side(df_mul.toPandas(),pd.DataFrame([[df_mul.schema.fieldNames()]],columns=["."]),space=2)
input output
name | city | age | smoker | height | birthdate |
---|---|---|---|---|---|
John | Seattle | 60 | True | 1.70 | 1960-01-01 |
Tony | Cupertino | 30 | False | 1.80 | 1990-01-01 |
Mike | New York | 40 | True | 1.65 | 1980-01-01 |
. |
---|
[name, city, age, smoker, height, birthdate] |