Thursday, February 16, 2017

WAYS TO BULK LOAD DATA IN HBASE

Dear Friends,


Going ahead with my post, this one was asked by one of my friend about HBase, for which I am sharing my thoughts and working procedure for the Loading of Bulk Data in HBase.

HBase is an open-source, NoSQL, distributed, column-oriented data store which has been implemented from Google BigTable that runs on top of HDFS. It was developed as part of Apache’s Hadoop project and runs on top of HDFS (Hadoop Distributed File System). HBase provides all the features of Google BigTable. We can call HBase a “Data Store” than a “Data Base” as it lacks many of the features available in traditional database, such as typed columns, secondary indexes, triggers, and advanced query languages, etc.
The Data model consists of Table name, row key, column family, columns, time stamp. While creating tables in HBase, the rows will be uniquely identified with the help of row keys and time stamp. In this data model the column family are static whereas columns are dynamic. Now let us look into the HBase Architecture. Hbase is a column oriented database where one has to specify what data belongs to which column family name.. So a Hbase table comprises of this minimum thing ie; A table Name and atleast 1 Column family name.

Apache HBase is all about giving you random, real-time, read/write access to your Big Data, but how do you efficiently get that data into HBase in the first place? Intuitively, a new user will try to do that via the client APIs or by using a MapReduce job with TableOutputFormat, but those approaches are problematic, as you will learn below. Instead, the HBase bulk loading feature is much easier to use and can insert the same amount of data more quickly.

In this blog I will take you through the number of ways to achieve Bulk Loading of Data in HBase.

There are basically 3 ways to Bulk load the data in HBase:-

1. Using ImportTsv Class to load txt Data to HBase.

2. Using Hive's HCatalog & Pig command.

3. Using MapReduce API.

You can download the Sample.txt file used in this blog HERE.

(NOTE:- While driving these examples, Please be sure to have your Hadoop Daemons & Hbase Daemons are up and running.)

1. Using ImportTsv Class to load txt Data to HBase:-


A) Uploading Sample.txt file to HDFS:-


Upload the sample file into HDFS by the following command:

Command > hadoop fs -put Sample.txt /Input

B) Create Table in HBase:-


For using this method we have to first create a table in HBase with number of column family according to the data. Here I am using 2 Column family in my data.

First go to HBase shell by giving below command and create a table with column family names:

Command > hbase shell      (To enter into HBase shell)

Command > create ‘Test′,’cf1’,'cf2'                 (To create a table with column family)


 C) Using ImportTsv Class  LOAD the Sample.txt file to HBase:-


Now we are set and ready to load the file in HBase. To load the file we will be using ImportTsv class from Hadoop/HBase jar file using the below command (goto hbase folder and give command):-

Command > ./bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=”,” -Dimporttsv.columns=HBASE_ROW_KEY,cf1,cf2 Test /Input/Sample.txt


Check the data is loaded in Hbase Table:-

Command >scan 'Test'



Here’s a explanation of the different configuration elements:

-Dimporttsv.separator= " ," specifies that the separator is a comma separated value.
-Dimporttsv.bulk.output=output is a relative path to where the HFiles will be written. Since your user on the VM is “cloudera” by default, it means the files will be in /user/cloudera/output. Skipping this option will make the job write directly to HBase. (We have not used but is useful).
-Dimporttsv.columns=HBASE_ROW_KEY,f:count is a list of all the columns contained in this file. The row key needs to be identified using the all-caps HBASE_ROW_KEY string; otherwise it won’t start the job. (I decided to use the qualifier “count” but it could be anything else.)

2. Using Hive's HCatalog & Pig command:-


In this method different jar files from PIG, HIVE and HCatalog is required which can be exported using HADOOP_CLASSPATH Command, else error: ClassNotFoundException will come with respective class details.
(For the safe-side and since my classpath command didn't worked, I copied all jar file from pig/lib, hive/lib & hive/hcatalog/lib to hadoop/lib, After which it worked fine without any error.)

A) Create a Script using HIVE SerDe & Table Properties:-


After loading the data in HDFS define the HBase schema for the data in HIVE shell. Continuing with the Sample example, create a script file called sample.ddl, which will contain the HBase schema for data used by HIVE. To do so write the below code in a file and name it as Sample.ddl:

Script Sample.ddl :- 

CREATE TABLE sample_hcat_load_table (id STRING, cf1 STRING, cf2 STRING) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
WITH SERDEPROPERTIES ( 'hbase.columns.mapping' = 'd:cf1,d:cf2' ) 
TBLPROPERTIES ( 'hbase.table.name' = 'sample_load_table'); 


B) Now Create and register the HBase table in HCatalog.


To register the ddl file use HCatalog. (Hcatalog will be inside HIVE folder (/usr/local/hadoop/hive/hcatalog) export the Hcatalog home and path in ~/.bashrc file (Like you did in installing hive)). After that source ~/.bashrc file to update it by giving below command:

Command > source ~/.bashrc

Now register the ddl file using syntax :- hcat -f $HBase_Table_Name.

The following HCatalog command-line command runs the DDL script Sample.ddl:

Command > hcat -f sample.ddl



Goto HBase shell by giving below command to check whether the table is created or not:-

Command > hbase shell


C) Create the import file using PIG Script:-.


The following command/script instructs Pig to load data from Sample. and store it in sample_load_table.

Script Hbase-bulk-load.pig:-

A = LOAD '/Input/Sample.txt' USING PigStorage(',') AS (id:chararray, c1:chararray, c2:chararray);

STORE A INTO 'simple_hcat_load_table' USING org.apache.hive.hcatalog.pig.HCatStorer();



Use Pig command to populate the HBase table via HCatalog bulkload:-

Continuing with the example, execute the following command:

Command > pig -useHCatalog Hbase-bulk-load.pig

Command > pig Hbase-bulk-load.pig 

(Since in my system it failed to read the Sample.txt data from HDFS I used local storage for my ease of usage by giving command pig -x local Hbase-bulk-load.pig or pig -x local -useHCatalog Hbase-bulk-load.pig )



Goto HBase shell and give scan command to check the result:-



Below is another example for achieving the same (I have not tried it.).

A = LOAD '/hbasetest.txt' USING PigStorage(',') as (id:chararray, c1:chararray, c2:chararray);
STORE A INTO 'hbase://mydata'  USING
org.apache.pig.backend.hadoop.hbase.HBaseStorage('mycf:intdata');


3. Using MapReduce API.


HBase's Put API can be used to insert the data into HDFS, but the data has to go through the complete HBase path as explained here. So, for inserting the data in bulk into HBase using the Put API is lot slower than the bulk loading option. There are some references to bulk loading (1, 2), but either they are incomplete or a bit too complicated.

1. Extract data from source(in our case from Text File).
2. Transform data into HFiles.
3. Loading the files into HBase by telling RegionServers where to find them.

Below is the coding I used for the same for my Sample.txt data file. You can modify it according to your requirement.

NOTE:- This code doesn't create a tablein HBase so, before ruuning this code in Hadoop environment, make sure to create a table in HBase using create command with coulmn families.


HBaseBulkLoadDriver

DRIVER CLASS

package com.poc.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class HBaseBulkLoadDriver extends Configured implements Tool {
    private static final String DATA_SEPERATOR = ",";
    private static final String TABLE_NAME = "sample-data";
    private static final String COLUMN_FAMILY_1="cf1";
    private static final String COLUMN_FAMILY_2="cf2";
  
    public static void main(String[] args) {
        try {
            int response = ToolRunner.run(HBaseConfiguration.create(), new HBaseBulkLoadDriver(), args);
            if(response == 0) {
                System.out.println("Job is successfully completed...");
            } else {
                System.out.println("Job failed...");
            }
        } catch(Exception exception) {
            exception.printStackTrace();
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        int result=0;
        String outputPath = args[1];
        Configuration configuration = getConf();
        configuration.set("data.seperator", DATA_SEPERATOR);
        configuration.set("hbase.table.name",TABLE_NAME);
        configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1);
        configuration.set("COLUMN_FAMILY_2",COLUMN_FAMILY_2);
        Job job = new Job(configuration);
        job.setJarByClass(HBaseBulkLoadDriver.class);
        job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapperClass(HBaseBulkLoadMapper.class);
        FileInputFormat.addInputPaths(job, args[0]);
        FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        job.setMapOutputValueClass(Put.class);
        HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME));
        job.waitForCompletion(true);
        if (job.isSuccessful()) {
            HBaseBulkLoad.doBulkLoad(outputPath, TABLE_NAME);
        } else {
            result = -1;
        }
        return result;
    }
}

HBaseBulkLoadMapper

MAPPER CLASS

package com.poc.hbase;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Mapper;

public class HBaseBulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    private String hbaseTable;
    private String dataSeperator;
    private String columnFamily1;
    private String columnFamily2;
    private ImmutableBytesWritable hbaseTableName;

    public void setup(Context context) {
        Configuration configuration = context.getConfiguration();
        hbaseTable = configuration.get("hbase.table.name");
        dataSeperator = configuration.get("data.seperator");
        columnFamily1 = configuration.get("COLUMN_FAMILY_1");
        columnFamily2 = configuration.get("COLUMN_FAMILY_2");
        hbaseTableName = new ImmutableBytesWritable(Bytes.toBytes(hbaseTable));
    }

    public void map(LongWritable key, Text value, Context context) {
        try {
            String[] values = value.toString().split(dataSeperator);
            String rowKey = values[0];
            Put put = new Put(Bytes.toBytes(rowKey));
            put.add(Bytes.toBytes(columnFamily1), Bytes.toBytes("cf1"), Bytes.toBytes(values[1]));
            put.add(Bytes.toBytes(columnFamily2), Bytes.toBytes("cf2"), Bytes.toBytes(values[2]));
            context.write(hbaseTableName, put);
        } catch(Exception exception) {
            exception.printStackTrace();
        }
    }
}

HBaseBulkLoad

HBASE CONFIGURATION CLASS

package com.poc.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;

public class HBaseBulkLoad {

public static void doBulkLoad(String pathToHFile, String tableName) {
try {
Configuration configuration = new Configuration();
configuration.set("mapreduce.child.java.opts", "-Xmx1g");
HBaseConfiguration.addHbaseResources(configuration);
LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);
HTable hTable = new HTable(configuration, tableName);
loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);
System.out.println("Bulk Load Completed..");
} catch (Exception exception) {
exception.printStackTrace();
}
}
}

NOTE:- To create a table you can tweak and use below coding.

You have to create the table first using Java API. You can do it with the below code

//Create table and do pre-split
HTableDescriptor descriptor = new HTableDescriptor(
Bytes.toBytes(tableName)
);

descriptor.addFamily(
new HColumnDescriptor(Constants.COLUMN_FAMILY_NAME)
);

HBaseAdmin admin = new HBaseAdmin(config);

byte[] startKey = new byte[16];
Arrays.fill(startKey, (byte) 0);

byte[] endKey = new byte[16];
Arrays.fill(endKey, (byte)255);

admin.createTable(descriptor, startKey, endKey, REGIONS_COUNT);
admin.close();

Run the Jar File

Compile the above coding in eclipse with including HBase jars while compilation and export the jar file and run.

Command > hadoop jar hbase.jar com/poc/hbase/HBaseBulkLoadDriver /Input/Sample.txt /Out



Now goto HBase terminal to check data is loaded.

Hbase shell > scan 'sample-table'



That's all friends...

Now go ahead and tweak the coding to learn more about  HBase working Mechanism.



References:-



Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).
ALL D BEST...


6 comments:

  1. Understanding Hadoop By Mahesh Maharana: Ways To Bulk Load Data In Hbase >>>>> Download Now

    >>>>> Download Full

    Understanding Hadoop By Mahesh Maharana: Ways To Bulk Load Data In Hbase >>>>> Download LINK

    >>>>> Download Now

    Understanding Hadoop By Mahesh Maharana: Ways To Bulk Load Data In Hbase >>>>> Download Full

    >>>>> Download LINK

    ReplyDelete