An RDD transformation that applies the transformation function to every element of the data frame is known as a map in Pyspark. There occurs various situations when you have numerous columns and you need to convert them to map-type columns. It can be done easily by using the create_map function with the map key column name and column name as arguments. Continue reading the article further to know about it in detail.
Syntax: df.withColumn(“map_column_name”,create_map( lit(“mapkey_1”),col(“column_1”), lit(“mapkey_2”),col(“column_2”) )).drop( “column_1”, “column_2” ).show(truncate=False)
Here,
- column_1, column_2, column_3: These are the column names which needs to be converted to map.
- mapkey_1, mapkey_2, mapkey_3: These are the names of the map keys to be given to data on creation of map.
- map_column_name: It is the name given to the column in which map is stored.
Example 1:
In this example, we have used a data set (link), which is basically a 5×5 data frame as follows:
Then, we converted the columns ‘name,’ ‘class’ and ‘fees’ to map using the create_map function and stored them in the column ‘student_details‘ dropping the existing ‘name,’ ‘class’ and ‘fees’ columns.
Python3
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, create_map
spark_session = SparkSession.builder.getOrCreate()
data_frame = csv_file = spark_session.read.csv(
'/content/class_data.csv' , sep = ',' , inferSchema = True , header = True )
data_frame = data_frame.withColumn( "student_details" ,
create_map(lit( "student_name" ),
col( "name" ),
lit( "student_class" ),
col( "class" ),
lit( "student_fees" ),
col( "fees" ))).drop( "name" ,
"class" ,
"fees" )
data_frame.show(truncate = False )
|
Output:
Example 2:
In this example, we have created a data frame with columns emp_id, name, superior_emp_id, year_joined, emp_dept_id, gender, and salary as follows:
Then, we converted the columns name, superior_emp_id, year_joined, emp_dept_id, gender, and salary to map using the create_map function and stored in the column ‘employee_details‘ dropping the existing name, superior_emp_id, year_joined, emp_dept_id, gender, and salary columns.
Python3
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,lit,create_map
spark_session = SparkSession.builder.getOrCreate()
emp = [( 1 , "Smith" , - 1 , "2018" , "10" , "M" , 3000 ),
( 2 , "Rose" , 1 , "2010" , "20" , "M" , 4000 ),
( 3 , "Williams" , 1 , "2010" , "10" , "M" , 1000 ),
( 4 , "Jones" , 2 , "2005" , "10" , "F" , 2000 ),
( 5 , "Brown" , 2 , "2010" , "40" , "F" , 4000 ),
( 6 , "Brown" , 2 , "2010" , "50" , "M" , 2000 ) ]
empColumns = [ "emp_id" , "name" , "superior_emp_id" ,
"year_joined" , "emp_dept_id" ,
"gender" , "salary" ]
empDF = spark_session.createDataFrame(data = emp,
schema = empColumns)
empDF = empDF.withColumn( "employee_details" ,
create_map(lit( "name" ),
col( "name" ),
lit( "superior_emp_id" ),
col( "superior_emp_id" ),
lit( "year_joined" ),
col( "year_joined" ),
lit( "emp_dept_id" ),
col( "emp_dept_id" ),
lit( "gender" ),
col( "gender" ),
lit( "salary" ),
col( "salary" ))).drop( "name" ,
"superior_emp_id" ,
"year_joined" ,
"emp_dept_id" ,
"gender" ,
"salary" )
empDF.show(truncate = False )
|
Output:
|