Skip to main content

Posts

Showing posts from 2019

Machine Learning Part 5

In this blog, we will describe another example that utilize KMeans & Spark to determine locations. Before that, we would suggest you to got to previous blogs - https://querydb.blogspot.com/2019/12/machine-learning-part-4.html https://querydb.blogspot.com/2019/12/machine-learning-part-3.html https://querydb.blogspot.com/2019/12/machine-learning-part-2.html https://querydb.blogspot.com/2019/12/machine-learning-part-1.html In this blog, we will analyze and try to make predictions on Fire detection GIS Data-  https://fsapps.nwcg.gov/gisdata.php We will have historical data of Wild fires. And, we will try to analyze. That is eventually helpful to reduce response time in case of fire, reduce cost, reduce damages due to fire, etc. Fire can grow exponentially based on various factors like - Wild life, Wind Velocity, terrain surface, etc. Incident tackle time is limited by various factors one of which is moving firefighting equipment. If we are plan in advance where to pl

Machine Learning Part 4

In the previous blog, we learned about creating K-Means Clustering Model . In this blog we will use the created model in a streaming use case for analysis in real time. For previous blog refer @  https://querydb.blogspot.com/2019/12/machine-learning-part-3.html 1)  Load the Model created in previous blog. 2) Create a dataframe with cluster id, and centroid location ( centroid longitude , centroid latitude) 3) Create a Kafka Streaming dataframe. 4) Parse the Message into a Typed Object. 5) Use Vector assembler to put all features in to a vector 6) Transform the dataframe using model to get predictions. 7) Join with dataframe created in #2 8) Print the results to console or save it to HBase. Note that this example also describes about Spark Structured Streaming, where-in, We created a streaming Kafka Source  And, a custom Foreach Sink to write data to HBase.  Refer code @  https://github.com/dinesh028/SparkDS/tree/master/src/indore/dinesh/sachdev/uber/streaming

Machine Learning Part 3

Refer code  @ https://github.com/dinesh028/SparkDS/blob/master/src/indore/dinesh/sachdev/uber/UberClusteringDriver.scala Now days, Machine Learning is helping to improve cities. The analysis of location and behavior patterns within cities allows optimization of traffic, better planning decisions, and smarter advertising. For example, analysis of GPS data to optimize traffic flow, Many companies are using it for Field Technician optimization. It can also be used for recommendations, anomaly detection, and fraud. Uber is using same to optimize customer experience - https://www.datanami.com/2015/10/05/how-uber-uses-spark-and-hadoop-to-optimize-customer-experience/ In this blog, we will see clustering and the k-means algorithm. And, its usage to analyze public Uber data. Clustering is a family of unsupervised machine learning algorithms that discover groupings that occur in collections of data by analyzing similarities between input examples. Some examples of clustering uses inc

Machine Learning Part 2

Refer code @  https://github.com/dinesh028/SparkDS/blob/master/src/indore/dinesh/sachdev/FlightDelayDriver.scala This post talks about predicting flight delays. Since, there is growing interest in predicting flight delays beforehand in order to optimize operations and improve customer satisfaction. We will use Historic Flight status data with Random Forest Classifier algorithm to find common patterns in late departures in order to predict flight delays and share the reasons for those delays. And, we will be using Apache Spark for same. As mentioned in Part 1 - Classification is kind of supervised learning which requires Features (if questions) and Labels (outcome) in advance to build the model. What are we trying to predict (Label)? Whether a flight will be delayed or not. (TRUE or FALSE) What are the “if questions” or properties that you can use to make predictions? What is the originating airport? What is the destination airport? What is the scheduled time

How to determine Spark JOB Resources over YARN

This is a question which I believe is not answerable in a direct way. Because it depends upon various run-time factors - Data Distribution Skewed Data Operations that we are performing Join, Windowing, etc. Shuffle Time, GC Overhead, etc A run-time  analysis of DAG & JOB execution help us tune optimal resources for a Spark Job. But, we will try to provide very basic answer to this question in the blog. Note that it is a run time behavior, so the answer may not fit all the use cases. Suppose you have a multi tenant cluster and you determine how much hardware resources is available for your yarn queue or user. Say you have - 6 Nodes, and Each node has 16 cores, 64 GB RAM Also, note the configurations of you Edge Node from where you will trigger the Spark JOB. As multiple Jobs will spawned from same edge node. So, resources of edge node can be a bottleneck too. 1 core and 1 GB is needed for OS and Hadoop Daemon. So, you have 6 machines with 15 cores and 63 GB

Machine Learning Part 1

Machine learning uses algorithms to find patterns in data, and then uses a model that recognizes those patterns to make predictions on new data. Machine learning may be broken down into - Supervised learning algorithms use labeled data - Classification, Regression. Unsupervised learning algorithms find patterns in unlabeled data - Clustering, Collaborative Filtering, Frequent Pattern Mining Semi-supervised learning uses a mixture of labeled and unlabeled data. Reinforcement learning trains algorithms to maximize rewards based on feedback. Classification - Mailing Servers like Gmail uses ML to classify if an email is Spam or not based on the data of an email: the sender, recipients, subject, and message body. Classification takes a set of data with known labels and learns how to label new records based on that information. For example- An items is important or not. A transaction is fraud or not based upon known labeled examples of transactions which were classified fraud or

Buzzwords - Deep learning, machine learning, artificial intelligence

Deep learning, machine learning, artificial intelligence – all buzzwords and representative of the future of analytics. Basic thing about all these buzzwords is to provoke a review of your own data to identify new opportunities. Like - Retail Marketing Healthcare Telecommunication Finance Demand Forecasting Recommendation engines and targeting Predicting patient disease risk Customer churn Risk analytics Supply chain optimization Customer 360 Diagnostics and alerts System log analysis Customer 360 Pricing optimization Click-stream analysis Fraud Anomaly detection Fraud Market segmentation and targeting Social media analysis Preventive maintenance Credit scoring Recommendations Ad optimization Smart meter analysis While writing this blog, I realized that I have worked upon highlighted use cases. But, it didn't involved all these buzzwords. The basic philosophy behind these things is Knowing the Unkown. Once, you know the business

Spark - Ways to Cache data

SQL CACHE TABLE, Dataframe.cache, spark.catalog.cacheTable These persist in both on-heap RAM and local SSD's with the MEMORY_AND_DISK strategy. You can inspect where the RDD partitions are stored (in-memory or on disk) using Spark UI. The in-memory portion is stored in columnar format optimized for fast columnar aggregations and automatically compressed to minimize memory and GC pressure. This cache should be considered scratch/temporary space as the data will not survive a Worker failure.  dbutils.fs.cacheTable(), and Table view -> Cache Table These only persist to the local SSDs mounted at /local_disk. This cache will survive cluster restarts.

Spark Datasets vs Dataframe vs SQL

Datasets are composed of typed objects, which means that transformation syntax errors(like a typo in the method name) and analysis errors (like an incorrect input variable type) can be caught at compile time.  DataFrames are composed of untyped Row objects, which means that only syntax errors can be caught at compile time. Spark SQL is composed of a string, which means that syntax errors and analysis errors are only caught at runtime.  Error SQL DataFrames DataSets Syntax Run Time Compile Time Compile Time Analysis Run Time Run Time Compile Time Also, note that Spark has encoders for all predefined basic data types like Int, String, etc. But, in case required then we have to write custom encoder to form a typed custom object dataset.

HBase Bulk Delete Column Qualifiers

Refer below Map Reduce Program that can be used to delete  column qualifier from HBase Table - -------- # Set HBase class path export HADOOP_CLASSPATH=`hbase classpath` #execute MR hadoop jar Test-0.0.1-SNAPSHOT.jar com.test.mymr.DeleteHBaseColumns --------- Refer - https://github.com/dinesh028/engineering/blob/master/src/com/test/mymr/DeleteHBaseColumns.java

HBase Phoenix Cause and Solution for dummy column "_0" or "Column Family:_0"

Cause - This dummy column is added by Phoenix- if someone created Phoenix table on top of existing HBase table. Solutions-  Following solutions can be used to delete _0 column from each row -  Execute Unix command like below -  echo "scan 'ns:tbl1', {COLUMNS => 'cf:_0'}" |hbase shell | grep "column=cf:_0" | cut -d' ' -f 2 | awk '{$1=$1};1'|sed -e 's/^/delete '"'"'ns:tbl1'"'"', '"'"'/' -e 's/$/'"'"', '"'"'cf:_0'"'"'/'  | hbase shell Above command will scan rows which has these columns and prepare delete statements and execute them to remove _0 column. But, above will not give a good performance in case of bigger tables. ------------------------------------------------------------------------------------------------------------------------- Other Solution can b

Hive - Create a Table delimited with Regex expression.

Example - I wanted to create a table with delimiter "~*". That can be done like below  - CREATE external TABLE db1.t1(a string COMMENT '', b string COMMENT '', c string COMMENT '') ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' WITH SERDEPROPERTIES (   'serialization.format' = '1',   'input.regex' = '(.*?)~\\*(.*?)~\\*(.*)' ) STORED AS   INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'   OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION '\abc' Note - In regex, I have to repeat the expression for as many columns as in table definition. For example, we had 3 columns. It can't be used for serialization that is. If I wanted to insert data in to the table I can't. It will throw exception. Other way to do same is -  create external TABLE `db1`.`t2` (col string) row format delimited location '\abc';

Caused by: org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans

Error - Caused by: org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans RepartitionByExpression [src_db_id#90, ssd_id#83, last_upd_dt#89], 200 +- Filter (isnotnull(ldy#94) && (ldy#94 = 2019))    +- HiveTableRelation `db2`.`t2`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, [ssd_id#83, flctr_bu_id#84, group_name#85, eval_level#86, last_chg_actn#87, last_upd_oper#88, last_upd_dt#89, src_db_id#90, across_skill_type_eval_type#91, dlf_batch_id#92, dlf_load_dttm#93], [ldy#94] and Aggregate [2019] +- Project    +- HiveTableRelation `db1`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [ssd_id#36, flctr_bu_id#37, group_name#38, eval_level#39, last_chg_actn#40, last_upd_oper#41, last_upd_dt#42, src_db_id#43, across_skill_type_eval_type#44] Join condition is missing or trivial. Use the CROSS JOIN syntax to allow cartesian products between these relations.; at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProd

Error - org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.io.IntWritable

In case your Spark application is failing with below error -  Caused by: java.lang.ClassCastException:  org.apache.hadoop.io .Text cannot be cast to  org.apache.hadoop.io .IntWritable at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector.get(WritableIntObjectInspector.java:36) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$5.apply(TableReader.scala:399) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$5.apply(TableReader.scala:399) Analysis & Cause - This is a data reading error. It may be because ORC Data files have some column as Text (, or String). But, Hive table defined on top of that has column defined as Int. Solution- Either update the datatype in ORC file or Hive Metadata. So, that both are in Sync. Also, to verify above behavior execute the query from -  Hive Shell - It should work fine. Spark Shell - It should fail.

Spark Error : Unsupported data type NullType.

Spark Job failing with exception like -  Caused by: org.apache.spark.SparkException: Task failed while writing rows         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:270)         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)         at org.apache.spark.scheduler.Task.run(Task.scala:108)         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:

Thinking in Hive or Spark SQL - Interview questions

1) Table 1 has 100 records and table 2 has 10 records. If I do LEFT JOIN it is expected to get 100 records. But, 'm getting more records . How? 2) There are 2 data sets Table 1 1,a 2,b 3,c Table 2 1 2 I need to select all the records from Table 1 that are there in Table 2. Hot can I do that? Solution 1 - Inner JOIN Solution 2 - Sub Query (select * from Table 1 where ID in (select ID from Table 2)) Which one is better solution and why ? 3) If a table has too many partitions. Does it impact performance ? If Yes, how can we solve problem of too many partitions in Hive? 4) How will you solve "large number of small files" problem in Hive / Hadoop? What is the impact of having too many files? 5) How will you write a Pipe '|' character  delimited file in Hive ? 6) Difference between ROW_NUMBER and RANK window function ? 7) I have a table as follows - TABLE1 c1,c2,c3 1  ,2  ,3 How will I transpose column to rows? So, that output comes i

Read from a hive table and write back to it using spark sql

In context to Spark 2.2 - if we read from an hive table and write to same, we get following exception- scala > dy . write . mode ( "overwrite" ). insertInto ( "incremental.test2" ) org . apache . spark . sql . AnalysisException : Cannot insert overwrite into table that is also being read from .; org . apache . spark . sql . AnalysisException : Cannot insert overwrite into table that is also being read from .; 1. This error means that our process is reading from same table and writing to same table. 2. Normally, this should work as process writes to directory .hiveStaging... 3. This error occurs in case of saveAsTable method, as it overwrites entire table instead of individual partitions. 4. This error should not occur with insertInto method, as it overwrites partitions not the table. 5. A reason why this happening is because Hive table has following Spark TBLProperties in its definition. This problem will solve for insertInto met

Spark method to save as table

Spark 2.2 method to save as table ( def saveAsTable(tableName: String): Unit) can not read and write data to same table i.e. one can not have input source table and output target table as same.  If it is done then Spark throws an exception -  Caused by: org.apache.spark.sql.AnalysisException: Cannot overwrite table XXX that is also being read from;