When you create a Spark DataFrame - One or more Columns can have schema nullable = false. What it means is that these column(s) can not have null values.
When null value is assigned to such columns, we see following exception -
2/7/2023 3:16:00 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 6)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: The 0th field 'colA' of input row cannot be null.
So, as to avoid above error - we are required to update the Schema of DataFrame: to set nullable=true
- One of the way to do that is using When.Otherwise Clause like below -
.withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null)))
This will tell Spark that Column can be null (, in case)
- Other way to do it is creating custom method to be called on Dataframe that returns new Dataframe with modified schema.
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object ExtraDataFrameOperations {
object implicits {
implicit def dFWithExtraOperations(df: DataFrame) = DFWithExtraOperations(df)
}
}
case class DFWithExtraOperations(df: DataFrame) {
/**
* Set nullable property of column.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either nullable or not
*/
def setNullableStateOfColumn( cn: String, nullable: Boolean) : DataFrame = {
// get schema
val schema = df.schema
// modify [[StructField] with name `cn`
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
case y: StructField => y
})
// apply new schema
df.sqlContext.createDataFrame( df.rdd, newSchema )
}
}
import ExtraDataFrameOperations.implicits._
val df = ...
val otherDF = df.setNullableStateOfColumn( "id", true )
Comments
Post a Comment