Get the first non-null value per group Spark dataframe

Suppose, we need to get the first non-null value from a Dataframe from each partition. Certainly, we want to get only the first not null value from each column regardless of the rows. That means a not-null value from column A from row 5 can be stitched with another not-null value of column B from row 10. This method can be very helpful, especially in data sampling. Also, instead of a per partition, we can group and extract the non-null values from a table based on any column or group of columns. To do so, we need to use a Window function to partition the table data per the given columns. Next, we can use the partition function to get the first non-null values per partition group.

In this post, we are going to use the dt_Date column of the sample table as the partition column. That means we will be picking the first not-null value from each column based on the dt_Date partition column. To demonstrate this, let’s create a hive-partitioned table with some sample data. This is the script we can use to create the sample table with dummy data. This script is written in scala language. Also, you can download the sample text files from here.

spark.sql("""CREATE TABLE testDB.SampleTable
(
	userName STRING,
	comment STRING
)
PARTITIONED BY (dt_Date STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'SampleTable/' """)

spark.sql(""" ALTER TABLE testDB.SampleTable ADD PARTITION (dt_Date='202204') """).show()
spark.sql(""" ALTER TABLE testDB.SampleTable ADD PARTITION (dt_Date='202205') """).show()

spark.sql(""" LOAD DATA LOCAL INPATH 'Downloads/202204_comments.txt' OVERWRITE INTO TABLE testDB.SampleTable PARTITION(dt_Date='202204') """)
spark.sql(""" LOAD DATA LOCAL INPATH 'Downloads/202205_comments.txt' OVERWRITE INTO TABLE testDB.SampleTable PARTITION(dt_Date='202205') """)

Now, let’s have a look at the table data.

spark.sql("SELECT * FROM testDB.SampleTable").show(false)
Sample table data
Sample table data

Get the first non-null value – Spark Dataframe

Now, to get the first non-null value from each column of the table, we can use the below query.

import org.apache.spark.sql.functions.first
import org.apache.spark.sql.expressions.Window

val df = spark.sql("SELECT * FROM testDB.SampleTable")
val window = Window.partitionBy("dt_Date").orderBy("dt_Date").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
        
val sampleDF = df.withColumn("userName", first("userName", ignoreNulls = true).over(window)).withColumn("comment", first("comment", ignoreNulls = true).over(window))
sampleDF.groupBy("dt_Date", "userName", "comment").count().drop("count").show(false)

Output

First not null values from each column
First not-null values from each column

In the above query, we have extracted the first non-null value from the userName and comment columns. Firstly, we have used the partition by window function to partition the data based on the dt_Date column. Secondly, we have used the Window.unboundedPreceding and Window.unboundedFollowing methods to look for the column values regardless of the table rows. Thirdly, we have used the “first” method with ignoreNulls = true to get the first not null value for that column.

Once, we got the first non-null value, we overwritten the userName and comment column values for each row with the same value using the withColumn dataframe method. Then, we grouped the table data by the dt_Date column to pick the first nonnull value per partition. Instead of the dt_Date column, we can use any other column also. The Window.unoundedPreceding and Window.unboundedFollowing functions are used to get the column values from any row based on the partition column value.

Thanks for the reading. Please share your inputs in the comment section.

Rate This
[Total: 1 Average: 5]

Leave a Comment

Your email address will not be published. Required fields are marked *


The reCAPTCHA verification period has expired. Please reload the page.

This site uses Akismet to reduce spam. Learn how your comment data is processed.