Skip to main content

Posts

Spark Custom Kafka Partitioner

  Custom Partitoner can be implemented by extending org.apache.kafka.clients.producer.Partitioner.  This can be used with Spark-SQL Kafka Data Source by setting property "kafka.partitioner.class" For example  df.write.format("kafka").option("kafka.partitioner.class", "com.mycustom.ipartitioner") We implemented one such custom partitioner extending org.apache.kafka.clients.producer.RoundRobinPartitioner.  Complete Source code is available @ https://github.com/dinesh028/engineering/blob/master/Kafka/com/aquaifer/producer/KeyPartitioner.scala This paritioner  -  Reads a configuration file which has Kafka Key and PrimaryKey Name mapping. Value in Kafka is a JSON Message which has a Primary Key with unique Value.  Idea is to partition messages based on this unique value, such that messages with same value for primarykey go into same partition. Once, configurations are loaded. For each byte array message-  convert it to String JSON Parse JSON Get uniqu

Fix - HBase Master UI - hbck.jsp - NULLPointerException

  At times HBase Master UI hbck report shows nullpointer exception - https://hmaster:16010/hbck.jsp This page displays two reports: the  HBCK Chore Report  and the  CatalogJanitor Consistency Issues  report. Only report titles show if there are no problems to list. Note some conditions are  transitory  as regions migrate. See below for how to run reports. ServerNames will be links if server is live, italic if dead, and plain if unknown. Solution- If this page displays nullpointer exception then execute -  echo "hbck_chore_run" |hbase shell if page still displays null pointer exception and not the report then  execute -  echo " catalogjanitor_run " |hbase shell

Hue Oracle Database- Delete non standard user entries from Parent and Child Tables.

  We recently implemented authentication for Hue (https://gethue.com/) before that folks were allowed to authenticate with any kind of username and use Hue WebUI.  After implementing authentication it was found that Hue Database consisted entries for old unwanted garbage users which were still able to use the service and by pass authentication. Thus, it was required to delete such user entries from table HUE.AUTH_USER. But there were many associated child constraints, with the table which made deleting an entry violate constraints.  Thus, it was required to find out all child tables associated with HUE.AUTH_USER. First delete entries from Child tables followed by deleting entries from Parent HUE.AUTH_USER. We used following query to find all child tables, constraints and associated columns -  select a.table_name parent_table_name, b.r_constraint_name parent_constraint, c.column_name parent_column, b.table_name child_table, b.constraint_name as child_constraint, d.column_name child_colu

HBase Performance Optimization

  Please refer -  First blog in series to reduce Regions on Region Server - https://querydb.blogspot.com/2023/03/hbase-utility-merging-regions-in-hbase.html Second to delete column's in HBase - https://querydb.blogspot.com/2019/11/hbase-bulk-delete-column-qualifiers.html In this article, we would discuss options to further optimize HBase. We could use COMPRESSION=>'SNAPPY' for Column families. And, invoke Major Compaction right after setting the property. This will reduce size of tables by 70% yet giving same read & write performance. Once size of regions & tables is compressed then we can re invoke the Merge Region utility to reduce number of regions per server. Set Region Split policy as - SPLIT_POLICY=>'org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy' Enable Request Throttle by setting hbase.quota.enabled  to true Our HBase Cluster is used by  Real Time Api's as well as Analytical Spark & MR Jobs. Analytical workloads crea

(AWS EMR) Spark Error - org.apache.kafka.common.TopicPartition; class invalid for deserialization

  Spark Kafka Integration Job leads to error below -  Caused by: java.io.InvalidClassException: org.apache.kafka.common.TopicPartition; class invalid for deserialization   at java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:169)   at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:885) That is because CLASSPATH might be having two or more different version of kafka-clients-*.jar. For example - One may be dependent Jar with "spark-sql-kafka", and other version might be present by default on cluster.  For example in our case-  AWS EMR had "/usr/lib/hadoop-mapreduce/kafka-clients-0.8.2.1.jar" But, we provided following in spark-submit classpath -  spark-sql-kafka-0-10_2.12-2.4.4.jar kafka-clients-2.4.0.jar We tried removing "kafka-clients-2.4.0.jar" from spark-submit --jars but that lead to same error. So, we were finally required to remove EMR provided Jar - "kafka-clients-0.8.2.1.jar" to fix