Dynamically Create Spark DataFrame Schema from Pandas DataFrame

Apache Spark has become a powerful tool for processing large-scale data in a distributed environment. One of its key components is the Spark DataFrame, which offers a higher-level abstraction over distributed data and enables efficient data manipulation. Spark DataFrame is typically used to manipulate large amounts of data in a distributed environment. When working within Spark, it’s frequently the case that data initially resides in a Pandas DataFrame. The objective then becomes to transition this data into a Spark DataFrame smoothly. An important aspect of this transition involves the creation of a suitable schema for the Spark DataFrame. In this blog post, we’ll discuss how to dynamically create Spark DataFrame schema from Pandas DataFrame.

Create Spark DataFrame Schema from Pandas DataFrame Dynamically

In Spark, a DataFrame is a distributed collection of data organized into named columns. Each column has a unique name and a specific data type. This metadata information defines the structure of the DataFrame, specifying the names and data types of each column. Accurate schema definition is crucial for efficient processing and transformation of data within Spark.

We can use the code below to generate a schema from a Pandas DataFrame at runtime. Then, we can use that schema to create a Spark DataFrame from the Pandas DataFrame.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType, LongType
import pandas as pd
from datetime import datetime

spark = SparkSession.builder.appName("PandasToSpark").getOrCreate()

# Create a sample pandas dataframe
data = {
    'col_str': ['apple', 'banana', 'cherry'],
    'col_int': [1, 2, 3],
    'col_time': [datetime(2023, 1, 1), datetime(2023, 2, 1), datetime(2023, 3, 1)],
    'col_double': [1.1, 2.2, 3.3],
    'col_float': [4.4, 5.5, 6.6],
    'col_big': [1000000000, 2000000000, 3000000000]
}

# Create a Pandas DataFrame
df = pd.DataFrame(data)
print("Pandas dataframe schema is:")
print(df.info())

columns = df.columns.tolist()
data_types = [str(df[col].dtype) for col in columns]

# Create a list of StructField objects for each column
fields = []
for col, dtype in zip(columns, data_types):
    if dtype == 'object':
        fields.append(StructField(col, StringType()))
    elif dtype == 'datetime64[ns]':
        fields.append(StructField(col, TimestampType()))
    elif dtype == 'int64':
        fields.append(StructField(col, LongType()))
    elif dtype == 'float64':
        fields.append(StructField(col, DoubleType()))
    else:
        fields.append(StructField(col, StringType()))

#fields = [StructField(col, StringType() if dtype == 'object' else IntegerType(), True) for col, dtype in zip(columns, data_types)]

# Create a StructType object using the list of StructField objects
schema = StructType(fields)

sdf = spark.createDataFrame(df, schema)
print("Spark dataframe schema is:")
print(sdf.printSchema())

In the above code, we have demonstrated the process of converting a Pandas DataFrame (df) into a Spark DataFrame (sdf) using PySpark. Let’s discuss the above code in detail:

1. Import Libraries:

First, import the necessary libraries including SparkSession from PySpark, and Pandas. We also need to import various data types and structures from PySpark.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType, LongType
import pandas as pd
from datetime import datetime

2. Initialize SparkSession:

Then, create a Spark session named “PandasToSpark.

spark = SparkSession.builder.appName("PandasToSpark").getOrCreate()

3. Create a Sample Pandas DataFrame:

Define a sample dictionary data and use it to create a Pandas DataFrame (df) with various data types (string, integer, datetime, double, float, and long).

data = {
    'col_str': ['apple', 'banana', 'cherry'],
    'col_int': [1, 2, 3],
    'col_time': [datetime(2023, 1, 1), datetime(2023, 2, 1), datetime(2023, 3, 1)],
    'col_double': [1.1, 2.2, 3.3],
    'col_float': [4.4, 5.5, 6.6],
    'col_big': [1000000000, 2000000000, 3000000000]
}
df = pd.DataFrame(data)

4. Print Pandas DataFrame Schema:

Print the information about the Pandas DataFrame schema using info() method.

print("Pandas dataframe schema is:")
print(df.info())
Pandas dataframe schema
Pandas dataframe schema

5. Extract column names and data types of Pandas DataFrame:

Extract column names and data types from the Pandas DataFrame.

columns = df.columns.tolist()
data_types = [str(df[col].dtype) for col in columns]

6. Create schema for Spark DataFrame from dynamically:

Create a StructType object using the list of StructField objects dynamically from Pandas DataFrame.

fields = []
for col, dtype in zip(columns, data_types):
    if dtype == 'object':
        fields.append(StructField(col, StringType()))
    elif dtype == 'datetime64[ns]':
        fields.append(StructField(col, TimestampType()))
    elif dtype == 'int64':
        fields.append(StructField(col, LongType()))
    elif dtype == 'float64':
        fields.append(StructField(col, DoubleType()))
    else:
        fields.append(StructField(col, StringType()))
schema = StructType(fields)

7. Create Spark DataFrame:

Use the createDataFrame method to create a Spark DataFrame (sdf) from the Pandas DataFrame (df) and the dynamically generated schema. Finally, print the schema of the Spark DataFrame.

Spark dataframe schema
sdf = spark.createDataFrame(df, schema)
print("Spark dataframe schema is:")
print(sdf.printSchema())

Thanks for the reading. Please share your inputs in the comment section.

Rate This
[Total: 1 Average: 5]

Leave a Comment

Your email address will not be published. Required fields are marked *


The reCAPTCHA verification period has expired. Please reload the page.

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Exit mobile version