Skip to main content

HBase Utility - Merging Regions in HBase

 

Growing HBase Cluster, and difficulty to get physical hardware is something that every enterprise deals with...

And sometimes, the amount of ingest starts putting pressure on components before new hosts can be added. As, I write this post our cluster was running with 46 nodes and each node having 600 regions per server. This is bad for performance.

You can use the following formula to estimate the number of regions for a RegionServer:

(regionserver_memory_size) * (memstore_fraction) / ((memstore_size) * (num_column_families))

Reference https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.0/hbase-data-access/content/memstore-size-regionservers.html


We noticed that HBase doesn't have automatic process to reduce and merge regions. As over the time, many small size or empty regions are formed on cluster which degrades the performance. 

While researching to cope up with this problem: we came across following scripts - 

  • https://appsintheopen.com/posts/51-merge-empty-hbase-regions
  • https://blg.robot-house.us/posts/merging-regions-in-hbase/
We analyzed that - 
  • It is merging empty regions only.
  • Later ruby script creates occasional overlaps. 
  • It doesn't have check for Degenerated, Splitting, Meta Region.
  • It doesn't have size based check, such that merge should not lead to HStore size greater then hregion.max.filesize.

  • It is using deprecated Java functions.
  • It is merging same region again which many a times lead to error where region is not found.

So, we came up with enhanced Java Utility, with all the checks in place. Such that it helps merging adjacent regions - 

Note - HBase region adjacent means lexicographically adjacent. That is end key of one region is start key of another, which can be hosted on different region servers.


Command to run Merge Script - 
 

sh merge_hbase_table.sh "<HBase Table Name>" true

Java Source code location - 

https://github.com/dinesh028/engineering/blob/master/HBaseUtility/src/main/java/com/aquaifer/HBaseMergePartitions.java 


This Utility is utilizes Online Merge https://docs.cloudera.com/runtime/7.2.10/managing-hbase/topics/hbase-online-merge.html , which issues Asynchronous command to Merge Adjacent Regions.


This Utility helped us bring down 18652 regions to 14500 in a few minutes. But, use this utility in off-hours as Merge Regions may invoke minor and major compactions, which in-turn can lead to performance degradation.


Another way is to set appropriate SPLIT_PLOICY on HBaseTable - 

alter '<TABLE_NAME>', {'SPLIT_POLICY' => '<SPLIT_POLICY>'}


<SPLIT_POLICY> can be replaced with one of below - 

  • org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy

A RegionSplitPolicy implementation which splits a region as soon as any of its store files exceeds a maximum configurable size.

  • org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy

Split size is the number of regions that are on this server that all are of the same table, cubed, times 2x the region flush size OR the maximum region split size, whichever is smaller.

For example, if the flush size is 128MB, then after two flushes (256MB) we will split which will make two regions that will split when their size is 2^3 * 128MB*2 = 2048MB.

If one of these regions splits, then there are three regions and now the split size is 3^3 * 128MB*2 = 6912MB, and so on until we reach the configured maximum file size and then from there on out, we'll use that.

  • org.apache.hadoop.hbase.regionserver.BusyRegionSplitPolicy

This class represents a split policy which makes the split decision based on how busy a region is. The metric that is used here is the fraction of total write requests that are blocked due to high memstore utilization. This fractional rate is calculated over a running window of "hbase.busy.policy.aggWindow" milliseconds. The rate is a time-weighted aggregated average of the rate in the current window and the true average rate in the previous window.

  • org.apache.hadoop.hbase.regionserver.DelimitedKeyPrefixRegionSplitPolicy
  • org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy
  • org.apache.hadoop.hbase.regionserver.SteppingSplitPolicy


Comments

Popular posts

Hive Parse JSON with Array Columns and Explode it in to Multiple rows.

 Say we have a JSON String like below -  { "billingCountry":"US" "orderItems":[       {          "itemId":1,          "product":"D1"       },   {          "itemId":2,          "product":"D2"       }    ] } And, our aim is to get output parsed like below -  itemId product 1 D1 2 D2   First, We can parse JSON as follows to get JSON String get_json_object(value, '$.orderItems.itemId') as itemId get_json_object(value, '$.orderItems.product') as product Second, Above will result String value like "[1,2]". We want to convert it to Array as follows - split(regexp_extract(get_json_object(value, '$.orderItems.itemId'),'^\\["(.*)\\"]$',1),'","') as itemId split(regexp_extract(get_json_object(value, '$.orderItems.product'),'^\\["(.*)\\"]$',1),&




Read from a hive table and write back to it using spark sql

In context to Spark 2.2 - if we read from an hive table and write to same, we get following exception- scala > dy . write . mode ( "overwrite" ). insertInto ( "incremental.test2" ) org . apache . spark . sql . AnalysisException : Cannot insert overwrite into table that is also being read from .; org . apache . spark . sql . AnalysisException : Cannot insert overwrite into table that is also being read from .; 1. This error means that our process is reading from same table and writing to same table. 2. Normally, this should work as process writes to directory .hiveStaging... 3. This error occurs in case of saveAsTable method, as it overwrites entire table instead of individual partitions. 4. This error should not occur with insertInto method, as it overwrites partitions not the table. 5. A reason why this happening is because Hive table has following Spark TBLProperties in its definition. This problem will solve for insertInto met




Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary

Exception -  Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:44) at org.apache.spark.sql.execution.vectorized.ColumnVector.getUTF8String(ColumnVector.java:645) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) Analysis - This might occur because of data type mismatch between Hive Table & written Parquet file. Solution - Correct the data type to match between Hive Table & Parquet




Hadoop Distcp Error Duplicate files in input path

  One may face following error while copying data from one cluster to other, using Distcp  Command: hadoop distcp -i {src} {tgt} Error: org.apache.hadoop.toolsCopyListing$DulicateFileException: File would cause duplicates. Ideally there can't be same file names. So, what might be happening in your case is you trying to copy partitioned table from one cluster to other. And, 2 different named partitions have same file name. Your solution is to correct Source path  {src}  in your command, such that you provide path uptil partitioned sub directory, not the file. For ex - Refer below : /a/partcol=1/file1.txt /a/partcol=2/file1.txt If you use  {src}  as  "/a/*/*"  then you will get the error  "File would cause duplicates." But, if you use  {src}  as  "/a"  then you will not get error in copying.




org.apache.spark.sql.AnalysisException: Cannot overwrite a path that is also being read from.;

  Caused by: org.apache.spark.sql.AnalysisException: Cannot overwrite a path that is also being read from.; at org.apache.spark.sql.execution.command.DDLUtils$.verifyNotReadPath(ddl.scala:906) at org.apache.spark.sql.execution.datasources.DataSourceAnalysis$$anonfun$apply$1.applyOrElse(DataSourceStrategy.scala:192) at org.apache.spark.sql.execution.datasources.DataSourceAnalysis$$anonfun$apply$1.applyOrElse(DataSourceStrategy.scala:134) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) at org.apache.spark.sql.execution.datasources.DataSourceAnalysis.apply(DataSourceStrategy.scala:134) at org.apache.spark.sql.execution.datasource