Tuesday, April 4, 2017

HIVE INTERVIEW RELATED PREPARATION

Dear Friends....

Few days I spent preparing and giving interviews for job change in HADOOP and few HIVE questions were like most common for almost every interview I faced. Few questions were being asked by few of my friends for which I decided to give a showcase for practical understanding.

Here, with my practicals I am showing various concept on HIVE.

1. Difference between External & Managed Tables:-


Ans:> Almost asked by all Interviewers to explain about external and manged tables. Though its very common to answer but what I observed was that all it matters on how you answer the same in Interview.

Answering like External table is on HDFS and Managed table is on HIVE's Datawarehouse is simply not enough to crack the interview. With a brief discussion with my friends what I found that to really impress the interviewers you have step beyond your comfort zone and try to explain more the same.

Here is how I suggest anyone to answer the same.

First of all understand the concept of the both, As both tables reside in HDFS here is basic differences:-

1. Managed Tables:- A managed table is the default table created inside the HIVE's datawarehouse which is managed by a external database such as derby, etc. Thus while creating a table it will always use such database and create the table inside that. You don't have to give any extra command or keyword for the creation of this table. 
Now the most discussed & practical part is that let's take an example where the tabled is dropped by mistake.. Then the schema is dropped as default but along with schema the data that you have loaded is also get deleted from the database. So retriving the data in real-time is difficult after dropping the table.

Command> create table Test (Id int, Name string, Salary int)
                > row format delimited
                > fields terminated by '\t'
                > stored as textfile;


2. External Table:- An External table created in HIVE always takes/stores data from/in HDFS location instead of HIVE's Warehouse. Thus while creating the External Table we have to mention the keyword 'EXTERNAL' and while storing we have to give keyword 'LOCATION' with HDFS path.
Now for practical part when the External Table is dropped by mistake then the schema of the table from the HIVE's Metastore is deleted where as the actual data in HDFS remains. So, retrieving of data and creating another table with different schema having same table name is possible. 

Command> create EXTERNAL table Test (Id int, Name string, Salary int)
                > row format delimited
                > fields terminated by '\t'
                > stored as textfile LOCATION '/HDFS PATH';

2. Different JOINS in HIVE:-


Ans:> This is a bit tricky question... Here by this question they don't mean the simple joins like outer join, inner join, etc. They are actually looking for performance optimization by different joins.. 

Hive joins are executed by MapReduce jobs through different execution engines like for example Tez, Spark or MapReduce. JOINs even of multiple tables can be achieved by one job only. Many optimizations have been added to Hive giving users various options for query improvements of joins.

Understanding how joins are implemented with MapReduce helps to recognize the different optimization techniques in Hive today. Below are details few JOINS which is used for performance optimization in HIVE:-

2.1. MAP JOIN (Broadcast Join):-


> A simple JOIN operation (Shuffle JOIN) will always follow the basic concept of MR ie; Starting Map phase then Sort&Shuffle and finally Reduce phase to give the JOIN output, which is very time taking for a simple query. MAP JOIN in other case uses only mapper phase to perform the JOIN operation thus removing the other 2 phase of MR and produce the output in much faster time. As no reducers are necessary, map joins are way faster than the regular joins.


The limitation to this JOIN is that it allows a table to be loaded into memory so that a (very fast) join could be performed entirely within a mapper without having to use the entire Map/Reduce steps. If your queries frequently rely on small table joins (e.g. cities or countries, etc.) you might see a very substantial speed-up from using mapjoins. But for large files this will not work efficiently. 

Hive supports MAPJOINs, which are well suited for this scenario – at least for dimensions small enough to fit in memory. A MAPJOIN could be invoked either through an optimizer hint:

The C-style comment " /*+ MAPJOIN(aliasname)*/ " should be placed immediately following the SELECT. It directs Hive to load aliasname (which is a table or alias of the query) into memory.

Command > SELECT /*+ MAPJOIN(c) */ * FROM orders o JOIN cities c ON (o.city_id = c.id);

OR via auto join conversion:

Command > set hive.auto.convert.join=true;                        ( before performing join query)
                  > SELECT * FROM orders o JOIN cities c ON (o.city_id = c.id);

OR activating the same from conf file by setting the name as hive.auto.convert.join and value as true in hive-site.xml in conf directory.

2.2 Hive Bucket Map Join:-


> As the name suggests it is performed on buckets of a HIVE table. Bucketing is a performance enhancer in HIVE where a large dataset is divided into bucket and querying a Bucket Map JOIN will not only use mapper phase only but will perform on specific bucket, thus reducing the latency.

The limitation to this JOIN is that it can be performed on tables which should be bucketed and before bucketing don't forget to set "hive.enforce.bucketing=true" before inserting data. The number of buckets in one table is a multiple of the number of buckets in the other table.So that we can apply MAPJOIN on smaller table by taking it into memory.

For example:-

create table Test1(Id int,Name string,Address string,Salary int)
clustered by (Id) into 4 buckets;
create table Test2(Id int, State string,Pf int, bonus int)
clustered by (Id) into 2 buckets;

set hive.enforce.bucketing = true; 
Insert OVERWRITE  table Test1 select * From Empdata;
Insert OVERWRITE  table Test2 select * From Pfdata;

Now to perform JOIN operation:-

command > set hive.optimize.bucketmapjoin=true;
                > select /*+ MAPJOIN(Test2) */ Test1.* from Test1,Test2 where Test1.Id=Test2.Id;

2.3 Sort Merge Bucket(SMB) Map Join:-


Similar to Bucket Map Join, in this join both tables have same number of buckets. The tables need to be created bucketed and sorted on the same join columns and also data need to be bucketed when inserting.


For example:-

create table Test1(Id int,Name string,Address string,Salary int)
clustered by (Id) sorted by (Id) into 4 buckets;
create table Test2(Id int, State string,Pf int, bonus int)
clustered by (Id) sorted by (Id) into 4 buckets;

set hive.enforce.bucketing = true; 
Insert OVERWRITE  table Test1 select * From Empdata;
Insert OVERWRITE  table Test2 select * From Pfdata;

Now to perform JOIN operation:-

set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;

select /*+ MAPJOIN(Test2) */ Test1.* from Test1,Test2 where Test1.Id=Test2.Id;

2.4 Skew Join:-


When in our data we have very large number of records associated with one(or more) key, then this kind of data leads to skew issue and the data is said to be skewed on that key.  That is in simple wording, 1 key having large number of dataset. For such kind of file when we perform JOIN operation reducers are started according to keys and the reducer having skewed key will take more time to finish while other reducer will finish early and will be in idle state, waiting for the skewed reducer to finish for displaying/storing results.

In this case using Skew JOIN we can mention the skewed key which can be performed by Mapper join while rest other keys will go through a single MapReduce operation.

To perform this JOIN we have to set parameters ie;

set hive.optimize.skewjoin=true;
 (To enable skew join optimization and let hive server optimize the join where there is skew. We need to set it to true.)

set hive.skewjoin.key = 100000
(The value of this property determines which key is a skew key. During join for any particular key if the number of rows is more than this specified number, then that key is considered as a skew join key.)

set hive.skewjoin.mapjoin.map.tasks = 10000
To set the number of map tasks to use in the map join job to perform the skew join. This property needs to be used in association with the hive.skewjoin.mapjoin.min.split.

set hive.skewjoin.mapjoin.min.split = 33554432
To set the minimum split size and hence calculate the maximum number of mappers to be used for the map join job for a skew join. This property should be used with hive.skewjoin.mapjoin.map.tasks for an efficient control.

For Example:-

CREATE TABLE A (ID int,Name string, Salary int)
  SKEWED BY (ID) ON (1,5,6);

Suppose we have table A with a key column, "id" which has values 1, 2, 3 and 4, and table B with a similar column, which has values 1, 2 and 3.
We want to do a JOIN corresponding to the following query
select A.id from A join B on A.id = B.id
A set of Mappers read the tables and gives them to Reducers based on the keys. e.g., rows with key 1 go to Reducer R1, rows with key 2 go to Reducer R2 and so on. These Reducers do a cross product of the values from A and B, and write the output. The Reducer R4 gets rows from A, but will not produce any results.
Now let's assume that A was highly skewed in favor of id = 1. Reducers R2 and R3 will complete quickly but R1 will continue for a long time, thus becoming the bottleneck. If the user has information about the skew, the bottleneck can be avoided manually as follows:
Do two separate queries
select A.id from A join B on A.id = B.id where A.id <> 1;
select A.id from A join B on A.id = B.id where A.id = 1 and B.id = 1;
The first query will not have any skew, so all the Reducers will finish at roughly the same time. If we assume that B has only few rows with B.id = 1, then it will fit into memory. So the join can be done efficiently by storing the B values in an in-memory hash table. This way, the join can be done by the Mapper itself and the data do not have to go to a Reducer. The partial results of the two queries can then be merged to get the final results.



3. CAN WE UPDATE A DATA IN HIVE


Ans> You must be thinking, is the interviewer has gone mad by asking silly question or is this a trick question.. Believe me if you are a newbie then you would answer like most of my friends, that HIVE is a component of HADOOP which uses HDFS and update of a data cannot be done in HDFS so we can't Update a data in HIVE also....

All Wrong.... 

We have a concept of UPSERT (UPdate & inSERT) in HIVE and from 0.14 version onwards a UPDATE feature is added in HIVE.

In UPSERT we first create a new table from old table using CTAS (Create Table As Select) where we remove/skip data we want to update and then insert the new data to the new table. This is a long method, but using UPDATE concept now we can directly change any value in the HIVE table data.

Standard Command:

command> UPDATE tablename SET column = value [, column = value ...] [WHERE expression] 

Parameters to be set before performing UPDATE:-

set hive.support.concurrency=true;

The limitation of using this concept is that the values of column data by which partitioning or bucketing is done can't be updated..

4. INDEXING IN HIVE:


Ans> Hive indexing is to improve the speed of query lookup on certain columns of a table. Without an index, queries with predicates like 'WHERE tab1.col1 = 10' load the entire table or partition and process all the rows. But if an index exists for col1, then only a portion of the file needs to be loaded and processed. An Index acts as a reference to the records. Instead of searching all the records, we can refer to the index to search for a particular record. Indexes maintain the reference of the records. So that it is easy to search for a record with minimum overhead.

Types of Indexes in Hive:-

1. Compact Indexing
2. Bitmap Indexing

Bit map indexing is commonly used for columns with distinct values.

The main difference is the storing of the mapped values of the rows in the different blocks. When the data inside a Hive table is stored by default in the HDFS, they are distributed across the nodes in a cluster. There needs to be a proper identification of the data, like the data in block indexing. This data will be able to identity which row is present in which block, so that when a query is triggered it can go directly into that block. So, while performing a query, it will first check the index and then go directly into that block.

Compact indexing stores the pair of indexed column’s value and its blockid.

Bitmap indexing stores the combination of indexed column value and list of rows as a bitmap.

Syntax/Command:-

CREATE INDEX index_name ON TABLE base_table_name (col_name, ...)
AS 'index.handler.class.name'
[WITH DEFERRED REBUILD]
[IDXPROPERTIES (property_name=property_value, ...)]
[IN TABLE index_table_name]
[PARTITIONED BY (col_name, ...)]
[
   [ ROW FORMAT ...] STORED AS ...
   | STORED BY ...
]
[LOCATION hdfs_path]
[TBLPROPERTIES (...)]

For example:-

> CREATE INDEX table02_index ON TABLE table02 (column3) AS 'COMPACT' WITH DEFERRED REBUILD;
> ALTER INDEX table02_index ON table2 REBUILD;
> SHOW FORMATTED INDEX ON table02;
> DROP INDEX table02_index ON table02;

The following link describes about the indexing with examples.


5. Windowing Functions In Hive:


Ans> Windowing allows you to create a window on a set of data further allowing aggregation surrounding that data. Windowing in Hive is introduced from Hive 0.11. 

Windowing in Hive includes the following functions

5.1 Lead

The number of rows to lead can optionally be specified. If the number of rows to lead is not specified, the lead is one row.

Returns null when the lead for the current row extends beyond the end of the window.

5.2 Lag

The number of rows to lag can optionally be specified. If the number of rows to lag is not specified, the lag is one row.

Returns null when the lag for the current row extends before the beginning of the window.

FIRST_VALUE

LAST_VALUE

The OVER clause

OVER with standard aggregates:
COUNT
SUM
MIN
MAX
AVG
OVER with a PARTITION BY statement with one or more partitioning columns.

OVER with PARTITION BY and ORDER BY with one or more partitioning and/or ordering columns.
Analytics functions

RANK
ROW_NUMBER
DENSE_RANK
CUME_DIST
PERCENT_RANK
NTILE

Refer to the below link for more description and example on the same:-

https://acadgild.com/blog/windowing-functions-in-hive/

6. QUERY VECTORIZATION:- 

(Only the ORC file format is supported in the current implementation.)

Ans> Vectorized query execution is a Hive feature that greatly reduces the CPU usage for typical query operations like scans, filters, aggregates, and joins.. Vectorization allows Hive to process a batch of rows together instead of processing one row at a time.

A standard query execution system processes one row at a time. This involves long code paths and significant metadata interpretation in the inner loop of execution. Vectorized query execution streamlines operations by processing a block of 1024 rows at a time. Within the block, each column is stored as a vector (an array of a primitive data type). Simple operations like arithmetic and comparisons are done by quickly iterating through the vectors in a tight loop, with no or very few function calls or conditional branches inside the loop. These loops compile in a streamlined way that uses relatively few instructions and finishes each instruction in fewer clock cycles, on average, by effectively using the processor pipeline and cache memory.

Enabling vectorized execution

To use vectorized query execution, you must store your data in ORC format, and set the following variable as shown in Hive:-

set hive.vectorized.execution.enabled = true;

Vectorized execution is off by default, so your queries only utilize it if this variable is turned on. To disable vectorized execution and go back to standard execution, do the following:

set hive.vectorized.execution.enabled = false;

Will update & refine more when I will come across more interview related questions in HIVE. Feel free to post me more question / suggestions on the same which I will include in this blog....

References:

Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
ALL D BEST...

Sunday, March 12, 2017

A USECASE ON TRAVEL APP

Dear Friends,


Welcome Back....

Day by Day I am learning different thing which I like to share with you all. 
As a great person said " Learning is a journey not a Destination".

I was little busy last few day, acquiring different know-how on Hadoop. I was asked whether I can solve this usecase by one of the Travel app provider to know where is the demand of their services so that they can take decision on how to keep offers/discounts to lure the customers to use their services. 

In this blog, I am using this usecase to solve the problem and help the client to take decision for better business development. This usecase can be used to find the same for a particular state or a region. You can this blog as reference and modify this according to your need. Hope you all enjoy solving this usecase.

Eclipse IDE :- Neon.1
Hadoop Version:- 1.2.1
Ubuntu Version:- 12.04 LTS
Jars Used:- Apache POI 3.15


Problem Statement:


Travel app agency have loads and loads of data about various trips made all over India. They are unable to check where is the most demand and where they can use offers or discounts to lure customers in using the existing data.

1. Find the area/place where more and least demand is there so that appropriate offers and discounts can be given.

2. Remove duplicate entries and segregate customers according to area/place.

3. Use Graphical representation for decision making.

The data is in EXCEL format for which I used third party jar from Apache to parse excel data. (You can find the same from my previous blogs). I have created a small list of data as example for this usecase, which you can modify and bring a large set of data for more precise workflow/understanding.

The data is in following format:- (Download the data file HERE)




In our usecase the data is in excel format which we will load to HDFS for storing and using MR we will extract the data using a custom input format. After extraction of required data we will then load it into HIVE table using partition to divide it into different regions and the perform Count query to find the highest and lowest customers demand regions. Using Tableau to represent it graphically.

I kept this usecase as simple as I can. you can try different variation on the same to get desired result. For example; in this usecase I was asked to map the output along with the driver ID to know the best performing driver.


1. LOAD THE DATA IN HDFS:-


Our first step is to load the data in HDFS, Hope you all know that by this time. Still as a part of my Blog I will keep it.

To load the data in HDFS give the below command:-

Command > hadoop fs -mkdir /Input               (To make a directory)

Command > hadoop fs -put Trip.xlsx /Input               (To put the file in HDFS)




2.     MAP REDUCE CODES:-


(DRIVER CLASS)

package com.poc.trip;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PocDriver {

static public int count = 0;

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();

GenericOptionsParser parser = new GenericOptionsParser(conf, args);
args = parser.getRemainingArgs();

Job job = new Job(conf, "Trip_Log");
job.setJarByClass(PocDriver.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setInputFormatClass(ExcelInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// job.setNumReduceTasks(0);

job.setMapperClass(MyMap.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}
}

MY MAPPER 
(HAVING MAPPER LOGIC)

package com.poc.trip;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class MyMap extends Mapper<LongWritable, Text, Text, Text> {
MultipleOutputs<Text, Text> mos;

@Override
public void setup(Context context) {
mos = new MultipleOutputs<Text, Text>(context);
}

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] str1 = value.toString().split("\t");
String sr1 = str1[0] + "\t" + str1[1] + "\t" + str1[3] + "\t" + str1[5] + "\t" + str1[6] + "\t" + str1[7];

if (str1[3].contains("Bihar")) {
mos.write(new Text("Bihar"), new Text(sr1), ("/TripData/Bihar"));
} else if (str1[3].contains("Pondicherry")) {
mos.write(new Text("Pondicherry"), new Text(sr1), ("/TripData/Pondicherry"));

} else if (str1[3].contains("Uttarakhand")) {
mos.write(new Text("Uttarakhand"), new Text(sr1), "/TripData/Uttarakhand");

} else if (str1[3].contains("Chhattisgarh")) {
mos.write(new Text("Chhattisgarh"), new Text(sr1), "/TripData/Chhattisgarh");

} else if (str1[3].contains("Goa")) {
mos.write(new Text("Goa"), new Text(sr1), "/TripData/Goa");

} else if (str1[3].contains("Assam")) {
mos.write(new Text("Assam"), new Text(sr1), "/TripData/Assam");

} else if (str1[3].contains("Himachal_Pradesh")) {
mos.write(new Text("Himachal_Pradesh"), new Text(sr1), "/TripData/Himachal_Pradesh");

} else {
mos.write(new Text("Other"), new Text(sr1), "/TripData/Other");

}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}

We are not using reducer as we are already doing the entire work in Map phase only. There is no reduce work needed.

3. EXECUTING THE MAP REDUCE CODE


Run the Jar file by below command:-

Command >  hadoop jar trip.jar com.poc.trip.PocDriver /Input/Trip.xlsx /Tripout





4. Create HIVE table and upload Processed data:-

In this blog I will be executing the hive command directly from terminal instead of going inside the HIVE shell. The execution time of the command will be more but I am doing this for a different approach. (You can go inside the HIVE shell and drive the commands)

First Creation of a Database and create a external table in the same to contain the MR data:-

You can achieve the same by the following command:-

Command >  hive -e 'create database Trip; use Trip;'



Create table in a script file and save it as .sql: 

Follow the below command for creation of table.

Command > nano hivetable.sql

Script        > hive -e 'create external table Trip (state string,sid int,tid int,address string,trip int)
                  > row format delimited
                  > fields terminated by '\t'
                  > stored as textfile location '/TripData';'



Now run the script file using the following command.

Command > hive -f hivetable.sql 



This will create an external table and store the data in that table.

Now its time to bring the data and count the number of trip made in each state. Follow the below command for the same.

Command > hive -e 'Select state, count(*) from trip group by state;' > /home/gopal/Desktop/Tripcount.txt

This will create a text file and store the output in that with tab delimited format.




To get the output in desired delimited format we can use a script for the same. Follow below command to get desired delimited format.

Command > nano hiveout.sql

Script > INSERT OVERWRITE LOCAL DIRECTORY '/home/gopal/Desktop/Tripcount'
           > ROW FORMAT DELIMITED
           > FIELDS TERMINATED BY ','
           > select state, count(*) from trip group by state;


Now run the script file to get desired output by below command.

Command > hive -f hiveout.sql




Now use Tableau software to give a graphical representation.

I have used the tableau software and exported the results into a PDF file.



Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...

ALL D BEST...

Thursday, February 23, 2017

HIVE ON RESCUE- A HEALTHCARE USE_CASE ON STRUCTURED DATA

Dear Friends,


We know that Hadoop's HIVE component is very good for structured data processing. 

Structured data first depends on creating a data model – a model of the types of business data that will be recorded and how they will be stored, processed and accessed. This includes defining what fields of data will be stored and how that data will be stored: data type (numeric, currency, alphabetic, name, date, address).

Structured data has the advantage of being easily entered, stored, queried and analyzed. At one time, because of the high cost and performance limitations of storage, memory and processing, relational databases and spreadsheets using structured data were the only way to effectively manage data. Anything that couldn't fit into a tightly organized structure would have to be stored on paper in a filing cabinet. 

Hadoop has an advantage over this by using its component HIVE. Though HDFS solved the storing part but MR code writing & processing is bit lengthy. So, Hive is used for processing large amount of structured data more conveniently then any other component. Even though it uses MR as internal process the query writing is much simpler and easier comparing that with MR.

This is the use-case given to me by one of my friend and asked me not to use MR programming in this use-case.

Seriously, working with MR I am used to think like that but this POC made me to think in different way and I am glad that I solved it with using MR Programming. (Though MR is the background internal process happens in all Hadoop's components).

USE-CASE DEFINITION


Medicare Companies (Medical Insurance) tie up with different Hospitals for their customers to have cashless/problem free health checkups. Patients on recovery from certain heath issue don't go for checkup again (In fear of Cost/Travel/Time). They may no longer visit a hospital, or consult doctors altogether till they have some health issues again.

But a Medicare Companies shouldn’t give up on their customers without a fair try. Whether your customers have gone to the competition or just gone silent, they are worth your time and effort to revert them again with excellent proposals/policies.

There are number of Medicare-card holders who visit different Hospitals regularly. If some medicare-card holder doesn’t visit a particular Hospital for a period of time, that Hospital consider them as PASSIVE customer. Same way, if some Medicare-care holder visit a Hospital for first time, that customer is called ACTIVE. The problem is to find out ACTIVE & PASSIVE Mdicare-Card Holder for every Hospital from the data provided, on monthly basis to know how their business are performing ?

This will create a next steps for a luring that medicare-card holder who turned passive over a period of time by giving them offers, discounts etc.

Note # Passive period varies from Hospitals , For APPOLO, it may be 90 days, but for small Hospitals like DAVITA, DIALYSIS Care it may be 30 days.

PROBLEM STATEMENT


1. If PASSIVE period is 31 days & application is executing on 1 APRIL 2014 (New Financial Year) , Then the count of the Passive medicare-care holder that occurred as of 31st December 2013. It implies that the records to be used as quarterly basis.
2. If a Medicare-Card Holder is appearing first time in a Hospital or it has not appeared within period of 90 Days in same Hospital then it will be considered as new customer.
3. Output Report should contain daily Reports , Monthly Reports as well as Quarterly Reports.
4. Data to be used :
PASSIVE.info : Information about Passive period of a Hospital
HealthCare.csv: Total Transactions of the Hospitals WRT Medicare-Card Holder for the period of 2 years.

You can download the sample data used in this use-case from HERE.



USE_CASE SOLUTION ARCHITECTURE



In this Use-Case we will first sort the number of Hospitals for whom we need to find Active & Passive customers, by using HIVE partition table. Then using CTAS command in HIVE we will further divide each Hospital into Active & Passive customers  based on their last date of activity and keep them in a separate table.

After the above is done now based on our requirement we can put any query such as JOIN to find only Active/Passive customers or both, quarterly data,etc.

1. LOAD THE DATA IN HDFS:-


Our first step is to load the data in HDFS, Hope you all know that by this time. Still as a part of my Blog I will keep it.

To load the data in HDFS give the below command:-

Command > hadoop fs -mkdir /Health               (To make a directory)

Command > hadoop fs -put HealthCare.csv /Health         (To put the file in HDFS)



2. CREATE A DATABASE AND EXTERNAL TABLE TO KEEP THE DATA:-


Now as we have our data in HDFS and we need to work in HIVE, we have to create a database for the same. The benefit of creating an external table is that we don't have to copy the entire data to Hive's warehouse but can point to the location where the actual data is residing. To that if we delete the external table only table schema information will be lost not the actual data.

Below is the command for creating the database and external table:

Command > create database HealthCare;



Command > create external table health
                 > (cid int,card int,age int,disease string, hospital string,date date,address string,unit int)
                 > row format delimited               
                 > fields terminated by ','
                 > stored as textfile location '/Health'
                 > tblproperties ("skip.header.line.count"="1");            (Used to remove head line)


Check that all data is showing or not by below command.

Command > select * from health;

(If everything is fine then you will see all results)

3. CREATE PARTITION TABLE FOR KEEPING EACH HOSPITAL'S RECORDS:-


Now we will create an external partition table to diving the entire records into individual hospitals and keep each records separately by below command:

Command > create external table HealthPart
                 > (cid int,card int,age int,disease string, phospital string,date date,address string,unit int)                      > PARTITIONED BY (hospital string)                                                                                                      > row format delimited                                                                      
                 > fields terminated by ','                                                                  
                 > stored as textfile location '/PartHealth';      
                                                                    
After creating the table insert data from Health table and it will automatically divide it into Hospital names. Below is the command for inserting data from Health table:

Command > insert overwrite table healthpart PARTITION (hospital)
                  > select cid,card,age,disease,hospital,date,address,unit,hospital from health;





4. CREATE TABLE FROM PARTITION TABLE FOR ACTIVE & PASSIVE CUSTOMERS FOR EACH HOSPITAL:-


Now we have to create two separate tables containing Active & Passive customers respectively. This can be achieved by CTAS(Create Table As Select), creating a table by selecting columns from partition table. Below is the command for achieving the same:-

Command > create table newdavita as select cid, card, age, disease, phospital, date, address, unit
                  > from healthpart
                  > where hospital='DAVITA'                                                                              
                  > and date>date_sub('2014-04-01',30) and date<'2014-04-01';
date_sub('yyyy-mm-dd',no. of days):- Is used for taking date with difference as no. of days. (For Davita its 30 and Appolo its 90, so on..)
(This command will take selected columns from partition table based on the date for Active customers)

Command > create table olddavita as select cid, card, age, disease, phospital, date, address, unit
                 > from healthpart
                 > where hospital='DAVITA'                                                                              
                 > and date<date_sub('2014-04-01',30) and date>'2014-01-01';

(This command will take selected columns from partition table based on the date for Passive customers). Here 30 is used as no. of days for Passiveness.

NOTE:- Do the same for each and every Hospital. You will get each Hospital's Active & Passive Customers.


(As APPOLO is having 90 days for passiveness all within that date range will be Active customer for that quarter and no Passive customer will be there.)



Check with command > select * from olddavita; to confirm about the data with date range.

NOTE:- You can workout with different date range according to the requirement, for month & year also.

5. REMOVE DUPLICATE ENTRIES FROM NEW & OLD TABLE:-


You have achieved the Active and Passive customer data from each Part-Hospital table. But the fact may be there a member may have come before Passive time difference and after passive time difference. So many Member Card holder will be registered in both New & Old table.

We will only consider the Member coming in Active data and remove them from Passive data for data overwrite or duplicate's. This can be achieve through JOIN Command, which you can find below:-

For Getting Active Data without overlapping from Passive Data:-

Command > select a.cid,a.card,a.date,a.phospital from newdavita a left outer join olddavita b
                  > on a.card=b.card WHERE a.card IS NULL;

(Here we are taking everything on left hand side ie; Active data and joining with right hand side ie; Passive data and using null for removing the same/duplicate data entries from Active data.)

OR Using semi join



Command > select a.cid,a.card,a.date,a.hospital from newdavita a left semi join olddavita b
                 > on a.card=b.card;


For Getting Passive Data without overlapping from Active Data:-

Command > select a.cid,a.card,a.date,a.phospital from newdavita a right outer join olddavita b
                  > on a.card=b.card WHERE a.card IS NULL;

(Here we are taking everything on left hand side ie; Active data and joining with right hand side ie; Passive data and using null for removing the same/duplicate data entries from Passive data.)

NOTE:- For my use-case it didn't worked as both data contained same members. You can try to make some changes and try. 



After removing duplicates/overlapping entries from Active & Passive data. Use group by option to group the same Member card number for display. Below is the command for the same.

Command > select card,max(date) from newdavita group by card;

This will remove repeating card number and return only 1 card number for each entries.


6. CONSOLIDATE EVERY NEW TABLE & OLD TABLE TO GET ACTIVE & PASSIVE CUSTOMERS BY CTAS & UNION ALL:-


Now, Once we have all Active & Passive customers for each Hospital, consolidate it to get Active & Passive Customers for the Quarter.
Below is the command to join all Active customer data by UNION ALL and keep it in a table for further queries/analysis:-

Command > create table active_customer as select * from newdavita
                  > union all select * from newappolo;

NOTE:-  You can join multiple table using Union All to keep in one table like for eg; CTAS select * from new1 union all select * from new2 union all select * from new3 union all select * from new4 and so on....



STORE DATA IN HDFS:-

If you want to save the Active/Passive Customer Union data then below is the command:-

Command > insert overwrite directory '/NewCustomer'
                  > select * from newdavita union all      
                  > select * from newappolo;              

NOTE:- This will create a directory in HDFS and store the result. (But in Hive's format)



STORE DATA IN LFS:-

1. From Hive shell:-

If you want to keep it in .csv format give the below command:-

Command > insert overwrite local directory '/home/gopal/Desktop/NewCustomer'
                  > row format delimited                                            
                  > fields terminated by ','                                        
                  > select * from active_customer;    




2. From Terminal:-

If you want to keep it in .tsv format follow the below procedure:-

1. Close all active HIVE Shell.
2. Open a new terminal:-
Goto the directory where hive is running/Metastore is present and give the below command:-

Command > hive -e 'use healthcare; select * from newdavita union all select * from newappolo' > /home/gopal/Desktop/ActiveCustomer.tsv



Now go on and try different aspects of query to get variable results like monthly active and passive customers, count of each hospital customers, etc.

Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
ALL D BEST...

Thursday, February 16, 2017

WAYS TO BULK LOAD DATA IN HBASE

Dear Friends,


Going ahead with my post, this one was asked by one of my friend about HBase, for which I am sharing my thoughts and working procedure for the Loading of Bulk Data in HBase.

HBase is an open-source, NoSQL, distributed, column-oriented data store which has been implemented from Google BigTable that runs on top of HDFS. It was developed as part of Apache’s Hadoop project and runs on top of HDFS (Hadoop Distributed File System). HBase provides all the features of Google BigTable. We can call HBase a “Data Store” than a “Data Base” as it lacks many of the features available in traditional database, such as typed columns, secondary indexes, triggers, and advanced query languages, etc.
The Data model consists of Table name, row key, column family, columns, time stamp. While creating tables in HBase, the rows will be uniquely identified with the help of row keys and time stamp. In this data model the column family are static whereas columns are dynamic. Now let us look into the HBase Architecture. Hbase is a column oriented database where one has to specify what data belongs to which column family name.. So a Hbase table comprises of this minimum thing ie; A table Name and atleast 1 Column family name.

Apache HBase is all about giving you random, real-time, read/write access to your Big Data, but how do you efficiently get that data into HBase in the first place? Intuitively, a new user will try to do that via the client APIs or by using a MapReduce job with TableOutputFormat, but those approaches are problematic, as you will learn below. Instead, the HBase bulk loading feature is much easier to use and can insert the same amount of data more quickly.

In this blog I will take you through the number of ways to achieve Bulk Loading of Data in HBase.

There are basically 3 ways to Bulk load the data in HBase:-

1. Using ImportTsv Class to load txt Data to HBase.

2. Using Hive's HCatalog & Pig command.

3. Using MapReduce API.

You can download the Sample.txt file used in this blog HERE.

(NOTE:- While driving these examples, Please be sure to have your Hadoop Daemons & Hbase Daemons are up and running.)

1. Using ImportTsv Class to load txt Data to HBase:-


A) Uploading Sample.txt file to HDFS:-


Upload the sample file into HDFS by the following command:

Command > hadoop fs -put Sample.txt /Input

B) Create Table in HBase:-


For using this method we have to first create a table in HBase with number of column family according to the data. Here I am using 2 Column family in my data.

First go to HBase shell by giving below command and create a table with column family names:

Command > hbase shell      (To enter into HBase shell)

Command > create ‘Test′,’cf1’,'cf2'                 (To create a table with column family)


 C) Using ImportTsv Class  LOAD the Sample.txt file to HBase:-


Now we are set and ready to load the file in HBase. To load the file we will be using ImportTsv class from Hadoop/HBase jar file using the below command (goto hbase folder and give command):-

Command > ./bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=”,” -Dimporttsv.columns=HBASE_ROW_KEY,cf1,cf2 Test /Input/Sample.txt


Check the data is loaded in Hbase Table:-

Command >scan 'Test'



Here’s a explanation of the different configuration elements:

-Dimporttsv.separator= " ," specifies that the separator is a comma separated value.
-Dimporttsv.bulk.output=output is a relative path to where the HFiles will be written. Since your user on the VM is “cloudera” by default, it means the files will be in /user/cloudera/output. Skipping this option will make the job write directly to HBase. (We have not used but is useful).
-Dimporttsv.columns=HBASE_ROW_KEY,f:count is a list of all the columns contained in this file. The row key needs to be identified using the all-caps HBASE_ROW_KEY string; otherwise it won’t start the job. (I decided to use the qualifier “count” but it could be anything else.)

2. Using Hive's HCatalog & Pig command:-


In this method different jar files from PIG, HIVE and HCatalog is required which can be exported using HADOOP_CLASSPATH Command, else error: ClassNotFoundException will come with respective class details.
(For the safe-side and since my classpath command didn't worked, I copied all jar file from pig/lib, hive/lib & hive/hcatalog/lib to hadoop/lib, After which it worked fine without any error.)

A) Create a Script using HIVE SerDe & Table Properties:-


After loading the data in HDFS define the HBase schema for the data in HIVE shell. Continuing with the Sample example, create a script file called sample.ddl, which will contain the HBase schema for data used by HIVE. To do so write the below code in a file and name it as Sample.ddl:

Script Sample.ddl :- 

CREATE TABLE sample_hcat_load_table (id STRING, cf1 STRING, cf2 STRING) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
WITH SERDEPROPERTIES ( 'hbase.columns.mapping' = 'd:cf1,d:cf2' ) 
TBLPROPERTIES ( 'hbase.table.name' = 'sample_load_table'); 


B) Now Create and register the HBase table in HCatalog.


To register the ddl file use HCatalog. (Hcatalog will be inside HIVE folder (/usr/local/hadoop/hive/hcatalog) export the Hcatalog home and path in ~/.bashrc file (Like you did in installing hive)). After that source ~/.bashrc file to update it by giving below command:

Command > source ~/.bashrc

Now register the ddl file using syntax :- hcat -f $HBase_Table_Name.

The following HCatalog command-line command runs the DDL script Sample.ddl:

Command > hcat -f sample.ddl



Goto HBase shell by giving below command to check whether the table is created or not:-

Command > hbase shell


C) Create the import file using PIG Script:-.


The following command/script instructs Pig to load data from Sample. and store it in sample_load_table.

Script Hbase-bulk-load.pig:-

A = LOAD '/Input/Sample.txt' USING PigStorage(',') AS (id:chararray, c1:chararray, c2:chararray);

STORE A INTO 'simple_hcat_load_table' USING org.apache.hive.hcatalog.pig.HCatStorer();



Use Pig command to populate the HBase table via HCatalog bulkload:-

Continuing with the example, execute the following command:

Command > pig -useHCatalog Hbase-bulk-load.pig

Command > pig Hbase-bulk-load.pig 

(Since in my system it failed to read the Sample.txt data from HDFS I used local storage for my ease of usage by giving command pig -x local Hbase-bulk-load.pig or pig -x local -useHCatalog Hbase-bulk-load.pig )



Goto HBase shell and give scan command to check the result:-



Below is another example for achieving the same (I have not tried it.).

A = LOAD '/hbasetest.txt' USING PigStorage(',') as (id:chararray, c1:chararray, c2:chararray);
STORE A INTO 'hbase://mydata'  USING
org.apache.pig.backend.hadoop.hbase.HBaseStorage('mycf:intdata');


3. Using MapReduce API.


HBase's Put API can be used to insert the data into HDFS, but the data has to go through the complete HBase path as explained here. So, for inserting the data in bulk into HBase using the Put API is lot slower than the bulk loading option. There are some references to bulk loading (1, 2), but either they are incomplete or a bit too complicated.

1. Extract data from source(in our case from Text File).
2. Transform data into HFiles.
3. Loading the files into HBase by telling RegionServers where to find them.

Below is the coding I used for the same for my Sample.txt data file. You can modify it according to your requirement.

NOTE:- This code doesn't create a tablein HBase so, before ruuning this code in Hadoop environment, make sure to create a table in HBase using create command with coulmn families.


HBaseBulkLoadDriver

DRIVER CLASS

package com.poc.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class HBaseBulkLoadDriver extends Configured implements Tool {
    private static final String DATA_SEPERATOR = ",";
    private static final String TABLE_NAME = "sample-data";
    private static final String COLUMN_FAMILY_1="cf1";
    private static final String COLUMN_FAMILY_2="cf2";
  
    public static void main(String[] args) {
        try {
            int response = ToolRunner.run(HBaseConfiguration.create(), new HBaseBulkLoadDriver(), args);
            if(response == 0) {
                System.out.println("Job is successfully completed...");
            } else {
                System.out.println("Job failed...");
            }
        } catch(Exception exception) {
            exception.printStackTrace();
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        int result=0;
        String outputPath = args[1];
        Configuration configuration = getConf();
        configuration.set("data.seperator", DATA_SEPERATOR);
        configuration.set("hbase.table.name",TABLE_NAME);
        configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1);
        configuration.set("COLUMN_FAMILY_2",COLUMN_FAMILY_2);
        Job job = new Job(configuration);
        job.setJarByClass(HBaseBulkLoadDriver.class);
        job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapperClass(HBaseBulkLoadMapper.class);
        FileInputFormat.addInputPaths(job, args[0]);
        FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        job.setMapOutputValueClass(Put.class);
        HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME));
        job.waitForCompletion(true);
        if (job.isSuccessful()) {
            HBaseBulkLoad.doBulkLoad(outputPath, TABLE_NAME);
        } else {
            result = -1;
        }
        return result;
    }
}

HBaseBulkLoadMapper

MAPPER CLASS

package com.poc.hbase;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Mapper;

public class HBaseBulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    private String hbaseTable;
    private String dataSeperator;
    private String columnFamily1;
    private String columnFamily2;
    private ImmutableBytesWritable hbaseTableName;

    public void setup(Context context) {
        Configuration configuration = context.getConfiguration();
        hbaseTable = configuration.get("hbase.table.name");
        dataSeperator = configuration.get("data.seperator");
        columnFamily1 = configuration.get("COLUMN_FAMILY_1");
        columnFamily2 = configuration.get("COLUMN_FAMILY_2");
        hbaseTableName = new ImmutableBytesWritable(Bytes.toBytes(hbaseTable));
    }

    public void map(LongWritable key, Text value, Context context) {
        try {
            String[] values = value.toString().split(dataSeperator);
            String rowKey = values[0];
            Put put = new Put(Bytes.toBytes(rowKey));
            put.add(Bytes.toBytes(columnFamily1), Bytes.toBytes("cf1"), Bytes.toBytes(values[1]));
            put.add(Bytes.toBytes(columnFamily2), Bytes.toBytes("cf2"), Bytes.toBytes(values[2]));
            context.write(hbaseTableName, put);
        } catch(Exception exception) {
            exception.printStackTrace();
        }
    }
}

HBaseBulkLoad

HBASE CONFIGURATION CLASS

package com.poc.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;

public class HBaseBulkLoad {

public static void doBulkLoad(String pathToHFile, String tableName) {
try {
Configuration configuration = new Configuration();
configuration.set("mapreduce.child.java.opts", "-Xmx1g");
HBaseConfiguration.addHbaseResources(configuration);
LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);
HTable hTable = new HTable(configuration, tableName);
loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);
System.out.println("Bulk Load Completed..");
} catch (Exception exception) {
exception.printStackTrace();
}
}
}

NOTE:- To create a table you can tweak and use below coding.

You have to create the table first using Java API. You can do it with the below code

//Create table and do pre-split
HTableDescriptor descriptor = new HTableDescriptor(
Bytes.toBytes(tableName)
);

descriptor.addFamily(
new HColumnDescriptor(Constants.COLUMN_FAMILY_NAME)
);

HBaseAdmin admin = new HBaseAdmin(config);

byte[] startKey = new byte[16];
Arrays.fill(startKey, (byte) 0);

byte[] endKey = new byte[16];
Arrays.fill(endKey, (byte)255);

admin.createTable(descriptor, startKey, endKey, REGIONS_COUNT);
admin.close();

Run the Jar File

Compile the above coding in eclipse with including HBase jars while compilation and export the jar file and run.

Command > hadoop jar hbase.jar com/poc/hbase/HBaseBulkLoadDriver /Input/Sample.txt /Out



Now goto HBase terminal to check data is loaded.

Hbase shell > scan 'sample-table'



That's all friends...

Now go ahead and tweak the coding to learn more about  HBase working Mechanism.



References:-



Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).
ALL D BEST...