We wrote a program that iterates and add columns to Spark dataframe.
We already had a table which had 364 columns and we wanted a final dataframe with 864 columns. Thus, we wrote a program like below -
existingColumns.filter(x => !colList.contains(x)).foreach { x =>
{
//stagingDF = stagingDF.withColumn(x, lit(null).cast(StringType))
}
{
//stagingDF = stagingDF.withColumn(x, lit(null).cast(StringType))
}
We realized that this caused program to take almost 20 minutes to add 500 columns to dataframe. On analysis, we found that Spark updates Projection after every withColumn() method invocation.
So, instead of updating dataframe. We created a StringBuilder and appended columns to that and Finally built a Select SQL out of it and executed it on dataframe:
val comma = ","
val select ="select "
val tmptable="tmptable"
val from =" from "
val strbuild = new StringBuilder()
val col = " cast(null as string) as "
val colStr = colList.mkString(comma)
strbuild.append(select).append(colStr)
existingColumns.filter(x => !colList.contains(x)).foreach { x =>
{
//stagingDF = stagingDF.withColumn(x, lit(null).cast(StringType))
strbuild.append(comma).append(col).append(x)
}
}
val selectExpr = strbuild.append(from).append(tmptable).toString()
stagingDF.registerTempTable(tmptable)
spark.sql(selectExpr)
This reduced our time from 20 minutes to 2 minutes for adding 500 new columns to dataframe.
Comments
Post a Comment