Skip to main content

Posts

Hive - Merge large number of small files

There can be multiple ways to merge files. One such way suggested by Hive is to use - alter table <Table Name>  PARTITION <Partition Name>  CONCATENATE; But, above solution does not work directly. Because , it triggers a MR Job with only map task and no reduce task. So, in output number of files will be equal to number of mapper running. So, one can reduce number of mappers running that will eventually reduce number of files in output. Set below properties and that will cause " CONCATENATE " job to output less mappers JOBS - hive> set hive.merge.mapfiles=true; hive> set hive.merge.mapredfiles=true; hive> set hive.merge.size.per.task=1073741824; hive> set hive.merge.smallfiles.avgsize=1073741824; hive >set mapreduce.input.fileinputformat.split.maxsize=1073741824; hive> set mapred.job.reuse.jvm.num.tasks=5; Also, note that Concatenate can cause data to loss in cause of improper ORC file statistics. Refer - https://issues.a

Spark & Hive over Spark - Performance Problems Hortonworks

I had been using Spark & Hive to Insert data in to Table. I have following table in Hive - CREATE TABLE `ds_test`(   `name` string) PARTITIONED BY (   `company` string,   `market` string,   `eventdate` string,   `processdate` string) ROW FORMAT SERDE   'org.apache.hadoop.hive.ql.io.orc.OrcSerde' STORED AS INPUTFORMAT   'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION   'hdfs://hdpprod/apps/hive/warehouse/ds_test' TBLPROPERTIES (   'transient_lastDdlTime'='1524769102') I was inserting data into table using Hive over SQL like below -  sqlContext.sql("INSERT OVERWRITE TABLE ds_test PARTITION(COMPANY = 'MCOM', MARKET, EVENTDATE, PROCESSDATE) Select name, MARKET, EVENTDATE, PROCESSDATE from Table1") Above method was working fine. But, we were facing performance problems -  1) We saw that application was running t

Hive Analytics Functions - rank() vs dense_rank() vs percent_rank() vs row_umber() vs cume_dist()

RANK - Rank of each row within partition of result set. DENSE_RANK - Mostly, similar to RANK. But, there will be no gaps in ranking. PERCENT_RANK - Relative Rank of row within group of rows. ROW_NUMBER - Sequential number of row within partition of a result set. CUME_DIST - For row r, the number of rows with value lower than or equal to value of r , divided by number of rows evaluated in partition. Practice - hive> create table test (v string ) row format delimited fields terminated by ','; hive> alter table test add columns (t string); hive> load data local inpath '/root/test' overwrite into table test; test data in local looks like below - a,1 a,2 a,3 a,1 a,2 b,1 c,1 c,2 d,1 e,1 Execute below query and analyze the result - hive> select v, t, rank() over (partition by v ), dense_rank() over (partition by v  ), row_number() over (partition by v  ), percent_rank()over (partition by v  ), cume_dist() over (partition by v  )

SPARK running explained - 3

SPARK running explained - 2 SPARK running explained - 1 1.        YARN Cluster Manager – Basic YARN architecture is described as below, and it is similar to Spark Standalone Cluster Manager. Main components – a.        Resource manager – (Just like Spark Master process) b.       Node manager – (similar to Spark’s worker processes) Unlike running on Spark’s standalone cluster, applications on YARN run in containers (JVM processes to which CPU and memory resources are granted). There is an “Application Master” for each application, running in its own container, it’s responsible for requesting application resources from Resource Manager. Node managers track resources used by containers and report to the resource manager. Below depicts the Spark application (cluster-deploy mode) running on YARN cluster with 2 nodes- 1.        Client submit application to Resource Manager 2.        Resource Manger asks one node manager to allocate  container for Application

SPARK running explained - 2

SPARK running explained - 1 Set Speculative execution by setting – 1.        “spark.speculation” to true, default is false. 2.        “spark.speculation.interval” Spark checks with given interval to see if any task needs to be restarted 3.        “spark.speculation.quantile” percentage of tasks that need to complete before speculation is started for a stage 4.        “spark.speculation.multiplier” how many times a task needs to run before it needs to be restarted Data locality means Spark tries to run tasks as close to the data location as possible. Five levels of data locality – 1.        PROCESS_LOCAL - Execute a task on the executor that cached the partition 2.        NODE_LOCAL - Execute a task on the node where the partition is available 3.        RACK_LOCAL - Execute the task on the same rack as the partition, Rack information is available from YARN cluster. 4.        NO_PREF - No preferred locations are associated with the task 5.        ANY - Default