Following code piece depicts integration of Kudu and Spark.
Refer - https://github.com/dinesh028/engineering/blob/master/resources/samples/Spark-Kudu-integration-code.txt
//spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.10.0
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu.KuduContext
import collection.JavaConverters._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
val arr= new java.util.ArrayList[Row] ()
arr.add(Row("jai","ganesh"))
val arraySchema = new StructType().add("id",StringType, false).add("name", StringType, true)
val df = spark.createDataFrame(arr,arraySchema)
df.printSchema
val kuduContext = new KuduContext("mymaster.devhadoop.wm.com:7051", spark.sparkContext)
//This will create the table but will not insert any data
kuduContext.createTable("ds.my_test_table", df.schema, Seq("id"), new CreateTableOptions().setNumReplicas(1).addHashPartitions(List("id").asJava, 3))
// Insert data
kuduContext.insertRows(df, "ds.my_test_table")
//Read or Select from kudu table
val selectdf = spark.read.options(Map("kudu.master" -> "mymaster.devhadoop.wm.com:7051", "kudu.table" -> "ds.my_test_table")).format("kudu").load
selectdf.show
//Delete data
kuduContext.deleteRows(df, "ds.my_test_table")
selectdf.show
//Upsert data
kuduContext.insertRows(df, "ds.my_test_table")
val newselectdf = selectdf.select($"id", concat($"name",lit("_1")).as("name"))
kuduContext.updateRows(newselectdf, "ds.my_test_table")
selectdf.show
//upserts
kuduContext.upsertRows(selectdf.select($"id", lit("ganesh").as("name")), "ds.my_test_table")
selectdf.show
//Using data source - only 'append' supported
df.select(lit("2").as("id"), concat($"name",lit("_1")).as("name")).write.options(Map("kudu.master"-> "mymaster.devhadoop.wm.com:7051", "kudu.table"-> "ds.my_test_table")).mode("append").format("kudu").save
selectdf.show
// Delete a Kudu table
kuduContext.deleteTable("ds.my_test_table")
Refer - https://github.com/dinesh028/engineering/blob/master/resources/samples/Spark-Kudu-integration-code.txt
//spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.10.0
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu.KuduContext
import collection.JavaConverters._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
val arr= new java.util.ArrayList[Row] ()
arr.add(Row("jai","ganesh"))
val arraySchema = new StructType().add("id",StringType, false).add("name", StringType, true)
val df = spark.createDataFrame(arr,arraySchema)
df.printSchema
val kuduContext = new KuduContext("mymaster.devhadoop.wm.com:7051", spark.sparkContext)
//This will create the table but will not insert any data
kuduContext.createTable("ds.my_test_table", df.schema, Seq("id"), new CreateTableOptions().setNumReplicas(1).addHashPartitions(List("id").asJava, 3))
// Insert data
kuduContext.insertRows(df, "ds.my_test_table")
//Read or Select from kudu table
val selectdf = spark.read.options(Map("kudu.master" -> "mymaster.devhadoop.wm.com:7051", "kudu.table" -> "ds.my_test_table")).format("kudu").load
selectdf.show
//Delete data
kuduContext.deleteRows(df, "ds.my_test_table")
selectdf.show
//Upsert data
kuduContext.insertRows(df, "ds.my_test_table")
val newselectdf = selectdf.select($"id", concat($"name",lit("_1")).as("name"))
kuduContext.updateRows(newselectdf, "ds.my_test_table")
selectdf.show
//upserts
kuduContext.upsertRows(selectdf.select($"id", lit("ganesh").as("name")), "ds.my_test_table")
selectdf.show
//Using data source - only 'append' supported
df.select(lit("2").as("id"), concat($"name",lit("_1")).as("name")).write.options(Map("kudu.master"-> "mymaster.devhadoop.wm.com:7051", "kudu.table"-> "ds.my_test_table")).mode("append").format("kudu").save
selectdf.show
// Delete a Kudu table
kuduContext.deleteTable("ds.my_test_table")
Comments
Post a Comment