Skip to main content

Posts

Showing posts from 2022

HTTP 400 Bad Request - Apache HTTP Client - HTTP POST JSON Message

  For one of our projects, we faced issue where-in -  We were utilizing Apache HTTP Client 4.5 and doing HTTP Post of JSON Message. This was resulting HTTP Response as  HTTP /1.1 400 Bad Request ...    On analysis, we found that HTTP Post was failing just for Big JSON Files of size greater then 7 MB. So, initially we thought it to be a server side issue.  But after further analysis we found that Unix CURL command was able to successfully POST message to API. Thus, we came to know that something was wrong with Scala (JAVA) client code that was using HTTP Client.  Solution -  We further updated the code and used java.net.{HttpURLConnection, URL}  instead of org.apache.http.client.methods.{HttpPost} and it worked fine for us. But, we were still buzzed with the problem why HTTPClient is not working. We tried using a proxy to capture HTTP Header and Body. So, as to compare difference between various user agents.  And, we identified that Content-Length  for User-Agent : Apache-HttpClient/4.5

Spark Streaming Kafka Errors

  We observed couple of errors / info messages while running Spark Streaming Applications, which might come  handy for debugging in future. Refer below -  1) We noticed that Spark streaming Job was running but consuming nothing. We could see following messages being iteratively printed in logs -  22/11/18 08:13:53 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=difnrt-uat-001] Group coordinator mymachine.com:9093 (id: 601150796 rack: null) is unavailable or invalid, will attempt rediscovery 22/11/18 08:13:53 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=difnrt-uat-001] Discovered group coordinator mymachine.com:9093 (id: 601150796 rack: null) 22/11/18 08:13:53 INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=difnrt-uat-001] (Re-)joining group Possible Causes / Fixes -  May be due to Kafka Coordinator service. Try restarting Kafka it may fix the issue.  In our case, we couldn't get help from Admin Team. So, we changed "group.i

Spark Streaming - org.apache.kafka.clients.consumer.OffsetOutOfRangeException

  While Running Spark Streaming application, we observed following exception -  Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 410, mymachine.com, executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {mygrouidid=233318826} at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1260) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:607) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1313) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168) at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:200) at org.apache.spark.streaming.ka

PKIX path building failed: sun.security.provider.certpath.SunCertP athBuilderException: unable to find valid certification path to requested target

  Receiving following error while creating build using Maven PKIX path building failed: sun.security.provider.certpath.SunCertP athBuilderException: unable to find valid certification path to requested target We tried resolving it with Solution 1 (, as below). But, after lots of tries - we did went with Solution 2, which is not recommended.  Solution 1: Open your website in browser, click on Lock icon to export selected certificate of website to a file on your local machine.  Then execute below command to import Certificate to your keystore -  keytool –import –noprompt –trustcacerts –alias ALIASNAME -file /PATH/TO/YOUR/DESKTOP/CertificateName.cer -keystore /PATH/TO/YOUR/JDK/jre/lib/security/cacerts -storepass changeit You can also generate your own keystore using following command -  keytool -keystore clientkeystore -genkey -alias client And, give it maven command as below -  mvn package -Djavax.net.ssl.trustStore=clientkeystore But, clearly Solution 1 didn't work for us. So, here

Cryptography: understanding AES and RSA

  Cryptography, or cryptology is the practice and study of techniques for secure communication in the presence of adversarial behavior. More generally, cryptography is about constructing and analyzing protocols that prevent third parties or the public from reading private messages. This secures the information being transmitted in point to point communication. This at lowest level, is achieved by using data & mathematical algorithms. Cryptography prior to the modern age was effectively synonymous with encryption , converting readable information (plaintext) to unintelligible nonsense text (ciphertext), which can only be read by reversing the process (decryption). The sender of an encrypted (coded) message shares the decryption (decoding) technique only with intended recipients to preclude access from adversaries. Modern cryptography is heavily based on mathematical theory and computer science practice; cryptographic algorithms are designed around computational hardness assumptions,

Spark - Creating Sub Folder while writing to Partitioned Hive Table

  We had been writing to a Partitioned Hive Table and realized that data is being written has sub-folder. For ex- Refer Table definition as below -  Create table T1 ( name string, address string) Partitioned by (process_date string) stored as parquet location '/mytable/a/b/c/org=employee'; While writing to table HDFS path being written looks something like this -  /mytable/a/b/c/org=employee/ process_date=20220812/ org=employee The unnecessary addition of   org=employee after  process_date partition is because Hive Table has location consisting "=" operator, which Hive uses as syntax to determine partition column. Re-defining Table resolves above problem -  Create table T1 ( name string, address string) Partitioned by (process_date string) stored as parquet location '/mytable/a/b/c/employee';

Hive QL Spark SQL - Transform Rows into Columns

  For a Structured Tabular Structure it is many a times required to transform Rows into Columns. This blog explains step by step process which can be executed as one SQL to achieve same.  Lets try to understand with help of below example: where -in , we want to implement / transform input Table into table structure mentioned as output. INPUT_TABLE   topic groupId batchTimeMs Partition  offset  Count  t1  g001  1658173779  0 123  122 t1 g001  1658173779  1 2231 100 t2 g001  1658173779  0 12 11 OUTPUT_TABLE   rowkey:key offset:0 count:0     offset:1  count:1  t1:g001:1658173779  123 122 2231 100 t2:g001:1658173779  12 11 NULL NULL   FIRST STEP - Concat Topic, GroupID, and BatchTimeMS to create RowKey  Create Columns - offsets:0, counts:0, offsets:1, counts:1. Such that Columns has value only when respective partition value matches with column name. SQL as below - select concat_ws(':', topic,groupId,batchTimeMs) as rowkey, case when partition='0' then offset else null en

spark-sftp com.springml.spark.sftp org.apache.spark.sql.AnalysisException: Path does not exist:

  We had been facing issue with using spark-sftp Jar while downloading a remote SFTP file and creating Dataframe. Code being executed: val df = spark.read.format("com.springml.spark.sftp") .option("host", HOST) .option("port", PORT) .option("username", UN) .option("password", PWD) .option("fileType", "csv") .option("inferSchema", "true") .option("header", "true") .load(FILENAME) Error response: org.apache.spark.sql.AnalysisException: Path does not exist: wasb://... at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:612) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:595) at scala.collection.TraversableLike$$anonfun$flatMap$1.ap

Spark - Distributed Computing and Multi-Threading

Spark on Hadoop works on concept of data locality, such that code is tried to send to data rather then shuffling data to code. We specify following resources when creating a Spark Job -  executor cores executor memory driver cores driver memory number of executors Above property in general determines number of executors and each executor is kind of JVM process with specified cores and memory to use. The executors connects back to your driver program. Now the driver can send them commands.  Data is distributed on HDFS in blocks, and an input split kind of determine size of a data partition. YARN tries to collocate executors as close to data as possible to minimize network transfer of data. A task is a command sent from the driver to an executor by serializing your Function object. The executor deserializes the command, and executes it on a partition. The number of the Spark tasks in a single stage equals to the number of RDD partitions. Now, "executor cores" or " spark.ex

Spark Error - Caused by: org.apache.spark.SparkException: Could not execute broadcast in 300 secs

  While running Spark applications you would have seen below error -  Caused by: org.apache.spark.SparkException: Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1 at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:150) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:154) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:150) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:165) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:162) at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:150) at org.apache.spark.sql.execut

Load Balance or dynamic discovery of HiveServer2 Connection from Beeline or Hive Shell

To provide high availability or load balancing for HiveServer2, Hive provides a function called dynamic service discovery where multiple HiveServer2 instances can register themselves with Zookeeper. Instead of connecting to a specific HiveServer2 directly, clients connect to Zookeeper which returns a randomly selected registered HiveServer2 instance. For example -  Below command connects to Hive Server on MachineA beeline -u "jdbc:hive2://machineA: 10000" Below command connects to Zookeeper Node: to determine one of the available Hive Server's to make a connection beeline -u "jdbc:hive2://machineA:2181,machineB:2181,machineC:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-mob-batch?tez.queue.name=myyarnqueue" We can Create ZNode with Zookeeper as follows -  Open Zookeeper command line interface zookeeper-client Connect to Zookeeper Server connect  machineA:2181,machineB:2181,machineC:2181 Create ZNode create / hiveserver2-mob-batch Manually,