In the vast and ever-expanding landscape of big data technologies, Apache Spark is an open-source, lightning-fast, and versatile framework that ignites the power of large-scale data analytics. It is a powerful distributed data processing framework that helps us to analyze and derive insights from massive datasets. On the other hand, Cosmos DB is a globally distributed, NoSQL database service from Microsoft that offers high availability and scalability with very low latency. By combining the capabilities of Apache Spark and the versatility of Cosmos DB, organizations can gain valuable insights from their extensive and varied data collections. In this post, we will walk you through the process of how to read and write data from Cosmos DB to Spark.
Before we start, make sure we have the required setup. As a prerequisite, we need the following:
- An Azure Cosmos DB
- Apache Spark set up
- Cosmos DB Spark connector
Once we have the above-required setup, we can proceed to the next step.
Reading data from Cosmos DB using Spark – PySpark
If we want to retrieve data from Cosmos DB with PySpark, we can utilize the code provided below.
# Create a Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CosmosDBSpark").getOrCreate()
# Define the configuration options for Cosmos DB connectivity
cosmosEndpoint = "https://REPLACE_WITH_COSMOSDB_URL.documents.azure.com:443/"
cosmosMasterKey = "REPLACE_WITH_YOUR_MASTER_KEY"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"
config = {
"spark.cosmos.accountEndpoint" : cosmosEndpoint,
"spark.cosmos.accountKey" : cosmosMasterKey,
"spark.cosmos.database" : cosmosDatabaseName,
"spark.cosmos.container" : cosmosContainerName,
}
# Read data from Cosmos DB into Spark dataframe
df = spark.read.format("cosmos.olap").options(**config).option("spark.cosmos.read.inferSchema.enabled", "true").load()
# Display the data
df.show()
Above, we have used PySpark to read the data from a Cosmos DB. First, we created a SparkSession, then we defined the required configuration for Cosmos DB connectivity. Finally, we used the Cosmos OLTP connector to connect and read the data from Cosmos DB into a Spark dataframe.
Similarly, we can use PySpark code to write the data into a Cosmos DB using the Spark framework.
Writing data to Cosmos DB using Spark – PySpark
If we want to write data to Cosmos DB with PySpark, we can utilize the code provided below.
# Create a Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CosmosDBSpark").getOrCreate()
# Define the configuration options for Cosmos DB connectivity
cosmosEndpoint = "https://REPLACE_WITH_COSMOSDB_URL.documents.azure.com:443/"
cosmosMasterKey = "REPLACE_WITH_YOUR_MASTER_KEY"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"
config = {
"spark.cosmos.accountEndpoint" : cosmosEndpoint,
"spark.cosmos.accountKey" : cosmosMasterKey,
"spark.cosmos.database" : cosmosDatabaseName,
"spark.cosmos.container" : cosmosContainerName,
}
# Sample data
data = [("Adam", 28),
("Smith", 24),
("Charlie", 22),
("Angels", 29),
("Eva", 31)]
# Define the schema for the DataFrame
schema = StructType([
StructField("Name", StringType(), True),
StructField("Age", IntegerType(), True)
])
# Create a DataFrame from the sample data and schema
df = spark.createDataFrame(data, schema)
# Write data to Cosmos DB from Spark dataframe
df.write.format("cosmos.oltp").options(**config).mode("append").save()
Above, we have used PySpark to write the data to a Cosmos DB. First, we created a SparkSession, then we defined the required configuration for Cosmos DB connectivity. After that, we created a sample dataframe from the sample dataset. Finally, we used the Cosmos OLTP connector to connect and write the data to Cosmos DB from a Spark dataframe.
Thanks for the reading. Please feel free to share your input in the comment section.