Thursday, February 23, 2017

HIVE ON RESCUE- A HEALTHCARE USE_CASE ON STRUCTURED DATA

Dear Friends,


We know that Hadoop's HIVE component is very good for structured data processing. 

Structured data first depends on creating a data model – a model of the types of business data that will be recorded and how they will be stored, processed and accessed. This includes defining what fields of data will be stored and how that data will be stored: data type (numeric, currency, alphabetic, name, date, address).

Structured data has the advantage of being easily entered, stored, queried and analyzed. At one time, because of the high cost and performance limitations of storage, memory and processing, relational databases and spreadsheets using structured data were the only way to effectively manage data. Anything that couldn't fit into a tightly organized structure would have to be stored on paper in a filing cabinet. 

Hadoop has an advantage over this by using its component HIVE. Though HDFS solved the storing part but MR code writing & processing is bit lengthy. So, Hive is used for processing large amount of structured data more conveniently then any other component. Even though it uses MR as internal process the query writing is much simpler and easier comparing that with MR.

This is the use-case given to me by one of my friend and asked me not to use MR programming in this use-case.

Seriously, working with MR I am used to think like that but this POC made me to think in different way and I am glad that I solved it with using MR Programming. (Though MR is the background internal process happens in all Hadoop's components).

USE-CASE DEFINITION


Medicare Companies (Medical Insurance) tie up with different Hospitals for their customers to have cashless/problem free health checkups. Patients on recovery from certain heath issue don't go for checkup again (In fear of Cost/Travel/Time). They may no longer visit a hospital, or consult doctors altogether till they have some health issues again.

But a Medicare Companies shouldn’t give up on their customers without a fair try. Whether your customers have gone to the competition or just gone silent, they are worth your time and effort to revert them again with excellent proposals/policies.

There are number of Medicare-card holders who visit different Hospitals regularly. If some medicare-card holder doesn’t visit a particular Hospital for a period of time, that Hospital consider them as PASSIVE customer. Same way, if some Medicare-care holder visit a Hospital for first time, that customer is called ACTIVE. The problem is to find out ACTIVE & PASSIVE Mdicare-Card Holder for every Hospital from the data provided, on monthly basis to know how their business are performing ?

This will create a next steps for a luring that medicare-card holder who turned passive over a period of time by giving them offers, discounts etc.

Note # Passive period varies from Hospitals , For APPOLO, it may be 90 days, but for small Hospitals like DAVITA, DIALYSIS Care it may be 30 days.

PROBLEM STATEMENT


1. If PASSIVE period is 31 days & application is executing on 1 APRIL 2014 (New Financial Year) , Then the count of the Passive medicare-care holder that occurred as of 31st December 2013. It implies that the records to be used as quarterly basis.
2. If a Medicare-Card Holder is appearing first time in a Hospital or it has not appeared within period of 90 Days in same Hospital then it will be considered as new customer.
3. Output Report should contain daily Reports , Monthly Reports as well as Quarterly Reports.
4. Data to be used :
PASSIVE.info : Information about Passive period of a Hospital
HealthCare.csv: Total Transactions of the Hospitals WRT Medicare-Card Holder for the period of 2 years.

You can download the sample data used in this use-case from HERE.



USE_CASE SOLUTION ARCHITECTURE



In this Use-Case we will first sort the number of Hospitals for whom we need to find Active & Passive customers, by using HIVE partition table. Then using CTAS command in HIVE we will further divide each Hospital into Active & Passive customers  based on their last date of activity and keep them in a separate table.

After the above is done now based on our requirement we can put any query such as JOIN to find only Active/Passive customers or both, quarterly data,etc.

1. LOAD THE DATA IN HDFS:-


Our first step is to load the data in HDFS, Hope you all know that by this time. Still as a part of my Blog I will keep it.

To load the data in HDFS give the below command:-

Command > hadoop fs -mkdir /Health               (To make a directory)

Command > hadoop fs -put HealthCare.csv /Health         (To put the file in HDFS)



2. CREATE A DATABASE AND EXTERNAL TABLE TO KEEP THE DATA:-


Now as we have our data in HDFS and we need to work in HIVE, we have to create a database for the same. The benefit of creating an external table is that we don't have to copy the entire data to Hive's warehouse but can point to the location where the actual data is residing. To that if we delete the external table only table schema information will be lost not the actual data.

Below is the command for creating the database and external table:

Command > create database HealthCare;



Command > create external table health
                 > (cid int,card int,age int,disease string, hospital string,date date,address string,unit int)
                 > row format delimited               
                 > fields terminated by ','
                 > stored as textfile location '/Health'
                 > tblproperties ("skip.header.line.count"="1");            (Used to remove head line)


Check that all data is showing or not by below command.

Command > select * from health;

(If everything is fine then you will see all results)

3. CREATE PARTITION TABLE FOR KEEPING EACH HOSPITAL'S RECORDS:-


Now we will create an external partition table to diving the entire records into individual hospitals and keep each records separately by below command:

Command > create external table HealthPart
                 > (cid int,card int,age int,disease string, phospital string,date date,address string,unit int)                      > PARTITIONED BY (hospital string)                                                                                                      > row format delimited                                                                      
                 > fields terminated by ','                                                                  
                 > stored as textfile location '/PartHealth';      
                                                                    
After creating the table insert data from Health table and it will automatically divide it into Hospital names. Below is the command for inserting data from Health table:

Command > insert overwrite table healthpart PARTITION (hospital)
                  > select cid,card,age,disease,hospital,date,address,unit,hospital from health;





4. CREATE TABLE FROM PARTITION TABLE FOR ACTIVE & PASSIVE CUSTOMERS FOR EACH HOSPITAL:-


Now we have to create two separate tables containing Active & Passive customers respectively. This can be achieved by CTAS(Create Table As Select), creating a table by selecting columns from partition table. Below is the command for achieving the same:-

Command > create table newdavita as select cid, card, age, disease, phospital, date, address, unit
                  > from healthpart
                  > where hospital='DAVITA'                                                                              
                  > and date>date_sub('2014-04-01',30) and date<'2014-04-01';
date_sub('yyyy-mm-dd',no. of days):- Is used for taking date with difference as no. of days. (For Davita its 30 and Appolo its 90, so on..)
(This command will take selected columns from partition table based on the date for Active customers)

Command > create table olddavita as select cid, card, age, disease, phospital, date, address, unit
                 > from healthpart
                 > where hospital='DAVITA'                                                                              
                 > and date<date_sub('2014-04-01',30) and date>'2014-01-01';

(This command will take selected columns from partition table based on the date for Passive customers). Here 30 is used as no. of days for Passiveness.

NOTE:- Do the same for each and every Hospital. You will get each Hospital's Active & Passive Customers.


(As APPOLO is having 90 days for passiveness all within that date range will be Active customer for that quarter and no Passive customer will be there.)



Check with command > select * from olddavita; to confirm about the data with date range.

NOTE:- You can workout with different date range according to the requirement, for month & year also.

5. REMOVE DUPLICATE ENTRIES FROM NEW & OLD TABLE:-


You have achieved the Active and Passive customer data from each Part-Hospital table. But the fact may be there a member may have come before Passive time difference and after passive time difference. So many Member Card holder will be registered in both New & Old table.

We will only consider the Member coming in Active data and remove them from Passive data for data overwrite or duplicate's. This can be achieve through JOIN Command, which you can find below:-

For Getting Active Data without overlapping from Passive Data:-

Command > select a.cid,a.card,a.date,a.phospital from newdavita a left outer join olddavita b
                  > on a.card=b.card WHERE a.card IS NULL;

(Here we are taking everything on left hand side ie; Active data and joining with right hand side ie; Passive data and using null for removing the same/duplicate data entries from Active data.)

OR Using semi join



Command > select a.cid,a.card,a.date,a.hospital from newdavita a left semi join olddavita b
                 > on a.card=b.card;


For Getting Passive Data without overlapping from Active Data:-

Command > select a.cid,a.card,a.date,a.phospital from newdavita a right outer join olddavita b
                  > on a.card=b.card WHERE a.card IS NULL;

(Here we are taking everything on left hand side ie; Active data and joining with right hand side ie; Passive data and using null for removing the same/duplicate data entries from Passive data.)

NOTE:- For my use-case it didn't worked as both data contained same members. You can try to make some changes and try. 



After removing duplicates/overlapping entries from Active & Passive data. Use group by option to group the same Member card number for display. Below is the command for the same.

Command > select card,max(date) from newdavita group by card;

This will remove repeating card number and return only 1 card number for each entries.


6. CONSOLIDATE EVERY NEW TABLE & OLD TABLE TO GET ACTIVE & PASSIVE CUSTOMERS BY CTAS & UNION ALL:-


Now, Once we have all Active & Passive customers for each Hospital, consolidate it to get Active & Passive Customers for the Quarter.
Below is the command to join all Active customer data by UNION ALL and keep it in a table for further queries/analysis:-

Command > create table active_customer as select * from newdavita
                  > union all select * from newappolo;

NOTE:-  You can join multiple table using Union All to keep in one table like for eg; CTAS select * from new1 union all select * from new2 union all select * from new3 union all select * from new4 and so on....



STORE DATA IN HDFS:-

If you want to save the Active/Passive Customer Union data then below is the command:-

Command > insert overwrite directory '/NewCustomer'
                  > select * from newdavita union all      
                  > select * from newappolo;              

NOTE:- This will create a directory in HDFS and store the result. (But in Hive's format)



STORE DATA IN LFS:-

1. From Hive shell:-

If you want to keep it in .csv format give the below command:-

Command > insert overwrite local directory '/home/gopal/Desktop/NewCustomer'
                  > row format delimited                                            
                  > fields terminated by ','                                        
                  > select * from active_customer;    




2. From Terminal:-

If you want to keep it in .tsv format follow the below procedure:-

1. Close all active HIVE Shell.
2. Open a new terminal:-
Goto the directory where hive is running/Metastore is present and give the below command:-

Command > hive -e 'use healthcare; select * from newdavita union all select * from newappolo' > /home/gopal/Desktop/ActiveCustomer.tsv



Now go on and try different aspects of query to get variable results like monthly active and passive customers, count of each hospital customers, etc.

Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
ALL D BEST...

Thursday, February 16, 2017

WAYS TO BULK LOAD DATA IN HBASE

Dear Friends,


Going ahead with my post, this one was asked by one of my friend about HBase, for which I am sharing my thoughts and working procedure for the Loading of Bulk Data in HBase.

HBase is an open-source, NoSQL, distributed, column-oriented data store which has been implemented from Google BigTable that runs on top of HDFS. It was developed as part of Apache’s Hadoop project and runs on top of HDFS (Hadoop Distributed File System). HBase provides all the features of Google BigTable. We can call HBase a “Data Store” than a “Data Base” as it lacks many of the features available in traditional database, such as typed columns, secondary indexes, triggers, and advanced query languages, etc.
The Data model consists of Table name, row key, column family, columns, time stamp. While creating tables in HBase, the rows will be uniquely identified with the help of row keys and time stamp. In this data model the column family are static whereas columns are dynamic. Now let us look into the HBase Architecture. Hbase is a column oriented database where one has to specify what data belongs to which column family name.. So a Hbase table comprises of this minimum thing ie; A table Name and atleast 1 Column family name.

Apache HBase is all about giving you random, real-time, read/write access to your Big Data, but how do you efficiently get that data into HBase in the first place? Intuitively, a new user will try to do that via the client APIs or by using a MapReduce job with TableOutputFormat, but those approaches are problematic, as you will learn below. Instead, the HBase bulk loading feature is much easier to use and can insert the same amount of data more quickly.

In this blog I will take you through the number of ways to achieve Bulk Loading of Data in HBase.

There are basically 3 ways to Bulk load the data in HBase:-

1. Using ImportTsv Class to load txt Data to HBase.

2. Using Hive's HCatalog & Pig command.

3. Using MapReduce API.

You can download the Sample.txt file used in this blog HERE.

(NOTE:- While driving these examples, Please be sure to have your Hadoop Daemons & Hbase Daemons are up and running.)

1. Using ImportTsv Class to load txt Data to HBase:-


A) Uploading Sample.txt file to HDFS:-


Upload the sample file into HDFS by the following command:

Command > hadoop fs -put Sample.txt /Input

B) Create Table in HBase:-


For using this method we have to first create a table in HBase with number of column family according to the data. Here I am using 2 Column family in my data.

First go to HBase shell by giving below command and create a table with column family names:

Command > hbase shell      (To enter into HBase shell)

Command > create ‘Test′,’cf1’,'cf2'                 (To create a table with column family)


 C) Using ImportTsv Class  LOAD the Sample.txt file to HBase:-


Now we are set and ready to load the file in HBase. To load the file we will be using ImportTsv class from Hadoop/HBase jar file using the below command (goto hbase folder and give command):-

Command > ./bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=”,” -Dimporttsv.columns=HBASE_ROW_KEY,cf1,cf2 Test /Input/Sample.txt


Check the data is loaded in Hbase Table:-

Command >scan 'Test'



Here’s a explanation of the different configuration elements:

-Dimporttsv.separator= " ," specifies that the separator is a comma separated value.
-Dimporttsv.bulk.output=output is a relative path to where the HFiles will be written. Since your user on the VM is “cloudera” by default, it means the files will be in /user/cloudera/output. Skipping this option will make the job write directly to HBase. (We have not used but is useful).
-Dimporttsv.columns=HBASE_ROW_KEY,f:count is a list of all the columns contained in this file. The row key needs to be identified using the all-caps HBASE_ROW_KEY string; otherwise it won’t start the job. (I decided to use the qualifier “count” but it could be anything else.)

2. Using Hive's HCatalog & Pig command:-


In this method different jar files from PIG, HIVE and HCatalog is required which can be exported using HADOOP_CLASSPATH Command, else error: ClassNotFoundException will come with respective class details.
(For the safe-side and since my classpath command didn't worked, I copied all jar file from pig/lib, hive/lib & hive/hcatalog/lib to hadoop/lib, After which it worked fine without any error.)

A) Create a Script using HIVE SerDe & Table Properties:-


After loading the data in HDFS define the HBase schema for the data in HIVE shell. Continuing with the Sample example, create a script file called sample.ddl, which will contain the HBase schema for data used by HIVE. To do so write the below code in a file and name it as Sample.ddl:

Script Sample.ddl :- 

CREATE TABLE sample_hcat_load_table (id STRING, cf1 STRING, cf2 STRING) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
WITH SERDEPROPERTIES ( 'hbase.columns.mapping' = 'd:cf1,d:cf2' ) 
TBLPROPERTIES ( 'hbase.table.name' = 'sample_load_table'); 


B) Now Create and register the HBase table in HCatalog.


To register the ddl file use HCatalog. (Hcatalog will be inside HIVE folder (/usr/local/hadoop/hive/hcatalog) export the Hcatalog home and path in ~/.bashrc file (Like you did in installing hive)). After that source ~/.bashrc file to update it by giving below command:

Command > source ~/.bashrc

Now register the ddl file using syntax :- hcat -f $HBase_Table_Name.

The following HCatalog command-line command runs the DDL script Sample.ddl:

Command > hcat -f sample.ddl



Goto HBase shell by giving below command to check whether the table is created or not:-

Command > hbase shell


C) Create the import file using PIG Script:-.


The following command/script instructs Pig to load data from Sample. and store it in sample_load_table.

Script Hbase-bulk-load.pig:-

A = LOAD '/Input/Sample.txt' USING PigStorage(',') AS (id:chararray, c1:chararray, c2:chararray);

STORE A INTO 'simple_hcat_load_table' USING org.apache.hive.hcatalog.pig.HCatStorer();



Use Pig command to populate the HBase table via HCatalog bulkload:-

Continuing with the example, execute the following command:

Command > pig -useHCatalog Hbase-bulk-load.pig

Command > pig Hbase-bulk-load.pig 

(Since in my system it failed to read the Sample.txt data from HDFS I used local storage for my ease of usage by giving command pig -x local Hbase-bulk-load.pig or pig -x local -useHCatalog Hbase-bulk-load.pig )



Goto HBase shell and give scan command to check the result:-



Below is another example for achieving the same (I have not tried it.).

A = LOAD '/hbasetest.txt' USING PigStorage(',') as (id:chararray, c1:chararray, c2:chararray);
STORE A INTO 'hbase://mydata'  USING
org.apache.pig.backend.hadoop.hbase.HBaseStorage('mycf:intdata');


3. Using MapReduce API.


HBase's Put API can be used to insert the data into HDFS, but the data has to go through the complete HBase path as explained here. So, for inserting the data in bulk into HBase using the Put API is lot slower than the bulk loading option. There are some references to bulk loading (1, 2), but either they are incomplete or a bit too complicated.

1. Extract data from source(in our case from Text File).
2. Transform data into HFiles.
3. Loading the files into HBase by telling RegionServers where to find them.

Below is the coding I used for the same for my Sample.txt data file. You can modify it according to your requirement.

NOTE:- This code doesn't create a tablein HBase so, before ruuning this code in Hadoop environment, make sure to create a table in HBase using create command with coulmn families.


HBaseBulkLoadDriver

DRIVER CLASS

package com.poc.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class HBaseBulkLoadDriver extends Configured implements Tool {
    private static final String DATA_SEPERATOR = ",";
    private static final String TABLE_NAME = "sample-data";
    private static final String COLUMN_FAMILY_1="cf1";
    private static final String COLUMN_FAMILY_2="cf2";
  
    public static void main(String[] args) {
        try {
            int response = ToolRunner.run(HBaseConfiguration.create(), new HBaseBulkLoadDriver(), args);
            if(response == 0) {
                System.out.println("Job is successfully completed...");
            } else {
                System.out.println("Job failed...");
            }
        } catch(Exception exception) {
            exception.printStackTrace();
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        int result=0;
        String outputPath = args[1];
        Configuration configuration = getConf();
        configuration.set("data.seperator", DATA_SEPERATOR);
        configuration.set("hbase.table.name",TABLE_NAME);
        configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1);
        configuration.set("COLUMN_FAMILY_2",COLUMN_FAMILY_2);
        Job job = new Job(configuration);
        job.setJarByClass(HBaseBulkLoadDriver.class);
        job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapperClass(HBaseBulkLoadMapper.class);
        FileInputFormat.addInputPaths(job, args[0]);
        FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        job.setMapOutputValueClass(Put.class);
        HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME));
        job.waitForCompletion(true);
        if (job.isSuccessful()) {
            HBaseBulkLoad.doBulkLoad(outputPath, TABLE_NAME);
        } else {
            result = -1;
        }
        return result;
    }
}

HBaseBulkLoadMapper

MAPPER CLASS

package com.poc.hbase;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Mapper;

public class HBaseBulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    private String hbaseTable;
    private String dataSeperator;
    private String columnFamily1;
    private String columnFamily2;
    private ImmutableBytesWritable hbaseTableName;

    public void setup(Context context) {
        Configuration configuration = context.getConfiguration();
        hbaseTable = configuration.get("hbase.table.name");
        dataSeperator = configuration.get("data.seperator");
        columnFamily1 = configuration.get("COLUMN_FAMILY_1");
        columnFamily2 = configuration.get("COLUMN_FAMILY_2");
        hbaseTableName = new ImmutableBytesWritable(Bytes.toBytes(hbaseTable));
    }

    public void map(LongWritable key, Text value, Context context) {
        try {
            String[] values = value.toString().split(dataSeperator);
            String rowKey = values[0];
            Put put = new Put(Bytes.toBytes(rowKey));
            put.add(Bytes.toBytes(columnFamily1), Bytes.toBytes("cf1"), Bytes.toBytes(values[1]));
            put.add(Bytes.toBytes(columnFamily2), Bytes.toBytes("cf2"), Bytes.toBytes(values[2]));
            context.write(hbaseTableName, put);
        } catch(Exception exception) {
            exception.printStackTrace();
        }
    }
}

HBaseBulkLoad

HBASE CONFIGURATION CLASS

package com.poc.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;

public class HBaseBulkLoad {

public static void doBulkLoad(String pathToHFile, String tableName) {
try {
Configuration configuration = new Configuration();
configuration.set("mapreduce.child.java.opts", "-Xmx1g");
HBaseConfiguration.addHbaseResources(configuration);
LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);
HTable hTable = new HTable(configuration, tableName);
loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);
System.out.println("Bulk Load Completed..");
} catch (Exception exception) {
exception.printStackTrace();
}
}
}

NOTE:- To create a table you can tweak and use below coding.

You have to create the table first using Java API. You can do it with the below code

//Create table and do pre-split
HTableDescriptor descriptor = new HTableDescriptor(
Bytes.toBytes(tableName)
);

descriptor.addFamily(
new HColumnDescriptor(Constants.COLUMN_FAMILY_NAME)
);

HBaseAdmin admin = new HBaseAdmin(config);

byte[] startKey = new byte[16];
Arrays.fill(startKey, (byte) 0);

byte[] endKey = new byte[16];
Arrays.fill(endKey, (byte)255);

admin.createTable(descriptor, startKey, endKey, REGIONS_COUNT);
admin.close();

Run the Jar File

Compile the above coding in eclipse with including HBase jars while compilation and export the jar file and run.

Command > hadoop jar hbase.jar com/poc/hbase/HBaseBulkLoadDriver /Input/Sample.txt /Out



Now goto HBase terminal to check data is loaded.

Hbase shell > scan 'sample-table'



That's all friends...

Now go ahead and tweak the coding to learn more about  HBase working Mechanism.



References:-



Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).
ALL D BEST...


Friday, February 10, 2017

MULTIPLE OUTPUT WITH MULTIPLE INPUT FILE NAME

Dear Friends,


I was being asked to solve how to process different files at a time and store the same under each file name. Its a real-time problem where say for example, you have log files from different places and you have to process the  same logic on all but have to store it in different file name. How to do this????

In this Blog, I will take you through how to do the same using simple multiple output method in  MapReduce program. Here I am using wordcount program logic.

Problem Statement is as below.
1. N no.of input files will be in HDFS. Each input file is having list of sentences/words.
2. Write a Mapreduce program which will give wordcount of each input file in corresponding part-r file. Where part-r filename has to be <input file name> -r-0000.

The problem statement though looks difficult yet very easy to understand and implement. (Just think simple and logically).

Solution:-

The simple logical solution is:-
1. Extract the name of each file using FileSplit method.
2. Give output of the each file after processing as the name extracted by FileSplit using multiple output method.



DOWNLOAD MY INPUT FILE FROM BELOW LINK:

https://drive.google.com/file/d/0BzYUKIo7aWL_M0s2UFRKS2xoMVE/view?usp=sharing




1. TO TAKE INPUT DATA ON HDFS


hadoop fs -mkdir /Input
hadoop fs -put Input* /Input
jar xvf mulout.jar 






2. MAP REDUCE CODES:-


DRIVER CLASS


package com.mulout.wordcount;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Multiwordcnt {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

Configuration conf = new Configuration();
Job myJob = new Job(conf, "Multiwordcnt");
args = new GenericOptionsParser(conf, args).getRemainingArgs();
FileSystem fs = FileSystem.get(new Configuration());
fs.delete(new Path("/NewOut/"), true);

myJob.setJarByClass(Multiwordcnt.class);
myJob.setMapperClass(MyMapper.class);
myJob.setReducerClass(MyReducer.class);
myJob.setMapOutputKeyClass(Text.class);
myJob.setMapOutputValueClass(IntWritable.class);
// myJob.setNumReduceTasks(0);
myJob.setOutputKeyClass(Text.class);
myJob.setOutputValueClass(IntWritable.class);
LazyOutputFormat.setOutputFormatClass(myJob, TextOutputFormat.class);

myJob.setInputFormatClass(TextInputFormat.class);
myJob.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(myJob, new Path(args[0]));
FileOutputFormat.setOutputPath(myJob, new Path(args[1]));

System.exit(myJob.waitForCompletion(true) ? 0 : 1);
}

}


EXPLANATION:- In driver class LazyOutputFormat is used to store the file in -r-0000 format, without using the same we will not get output.
(Here I have used delete syntax to delete if the existing folder is there in HDFS.)

MAPPER CLASS


package com.mulout.wordcount;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

Text emitkey = new Text();
IntWritable emitvalue = new IntWritable(1);

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String filePathString = ((FileSplit) context.getInputSplit()).getPath().getName().toString();
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {

String filepathword = filePathString + "*" + tokenizer.nextToken();
emitkey.set(filepathword);
context.write(emitkey, emitvalue);
}
}
}

EXPLANATION:- In Mapper class we took the File Input Name using FileSplit menthod and combined that with the individual word and kept as output key. Then we assinged 1 for each word as output value for futher processing in reducer

REDUCER CLASS


package com.mulout.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
Text emitkey = new Text();
IntWritable emitvalue = new IntWritable();
private MultipleOutputs<Text, IntWritable> multipleoutputs;

public void setup(Context context) throws IOException, InterruptedException {
multipleoutputs = new MultipleOutputs<Text, IntWritable>(context);
}

public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;

for (IntWritable value : values) {
sum = sum + value.get();
}
String pathandword = key.toString();
String[] splitted = pathandword.split("\\*");
String path = splitted[0];
String word = splitted[1];
emitkey.set(word);
emitvalue.set(sum);
System.out.println("word:" + word + "\t" + "sum:" + sum + "\t" + "path:  " + path);
multipleoutputs.write(emitkey, emitvalue, ("/NewOut/"+path));
}

public void cleanup(Context context) throws IOException, InterruptedException {
multipleoutputs.close();
}
}

EXPLANATION:- In reducer class we splitted the key containing Input File Name and added all 1 to get sum of number of times the word occurred and then used multiple output method with 3 parameters <Key,Value,Path> to display our result in individual File Name.
(Here I have used additional output folder "/NewOut/" for storing my results.)



3. EXECUTING THE MAP REDUCE CODE


Command > hadoop jar mulout.jar com/mulout/wordcount/Multiwordcnt /Input /Out1







That's all....

Now you can take N number of Input files and process it and store it in same File name.



Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).
ALL D BEST...


Saturday, February 4, 2017

XML FILE PROCESSING IN HADOOP

Dear Friends,


Welcome back, after a long time. I was asked by one of my friend to explain about XML processing in hadoop.

I went through many articles, weblinks, etc in search of that answer and now I am ready to showcase the same in this blog.

PROBLEM


Working with XML is painful. XML structure is variable by design, which means no universal mapping to native Pig data structures. This is the price we pay for such a flexible, robust markup, but, as Software Developers, we can’t continue to ignore this problem. There’s XML data everywhere just waiting for us to crack it open and extract value for analysis.

XML processing is quite different then other formats such as Word, Excel or PDFs as it contains tags which is different in different files along with number of subtags. XML is semi-structured file and since the structure of XML is variable by design, we cannot have defined mapping. Thus, to process the XML in Hadoop, you need to know the tags required to extract the data. One has to define every time according to the different data contents.

SOLUTION


There are 3 ways of processing xml files in Hadoop:-

1. PIG:- Using classes from Piggybank jar file.
2. HIVE:- Using SerDe (Serialization Deserialization) Method.
3. MapRedude Coding:- Lengthy coding using classes from OOXML jar files

Input data used in this Blog can be downloaded here. simple.xml, cd_catlog.xml, plant_catlog.xml

In my usecase I will be using simple.xml file.

<?xml version="1.0" encoding="ISO8859-1" ?>
<breakfast-menu>
  <food>
    <name>Belgian Waffles</name>
    <price>$5.95</price>
    <description>two of our famous Belgian Waffles with plenty of real maple syrup</description>
    <calories>650</calories>
  </food>
  <food>
    <name>Strawberry Belgian Waffles</name>
    <price>$7.95</price>
    <description>light Belgian waffles covered with strawberrys and whipped cream</description>
    <calories>900</calories>
  </food>
  <food>
    <name>Berry-Berry Belgian Waffles</name>
    <price>$8.95</price>
    <description>light Belgian waffles covered with an assortment of fresh berries and whipped cream</description>
    <calories>900</calories>
  </food>
  <food>
    <name>French Toast</name>
    <price>$4.50</price>
    <description>thick slices made from our homemade sourdough bread</description>
    <calories>600</calories>
  </food>
  <food>
    <name>Homestyle Breakfast</name>
    <price>$6.95</price>
    <description>two eggs, bacon or sausage, toast, and our ever-popular hash browns</description>
    <calories>950</calories>
  </food>
</breakfast-menu>

1. XML processing using PIG


Apache Pig is a tool that can be used to analyse XML, and it represents them as data flows. Pig Latin is a scripting language that can do the operations of Extract, Transform, Load (ETL), ad hoc data analysis and iterative processingcan be easily achieved. Pig is an abstraction over MapReduce. In other words, all Pig scripts internally are converted into Map and Reduce tasks to get the task done. Pig was built to make programming MapReduce applications easier. The Pig scripts are internally converted to MapReduce jobs. Pig scripts are procedural and implement lazy evaluation, i.e., unless an output is required, the steps aren’t executed.

The XMLLoader currently included with Pig allows us to specify the tags that delimit our documents. This is nice. What it returns, though, is raw XML, which will invariably require a pile of custom UDFs to handle parsing and semantics. To process XMLs in Pig, piggybank.jar is essential. This jar contains a UDF called XMLLoader() that will be used to load the XML document.

Below is the flow diagram to describe the complete flow.




Step 1:- Download and register Jar

To use Piggybank jar in XML, first download the jar and register the path of the jar in Pig.

(Download the Piggybank jar file from HERE)

Use the following command for registering the jar file:

Command > register piggybank.jar;

Step 2:- Loading the XML file

Load the document using XMLLoader() into a char array. Specify the parent tag to be extracted. If all the elements are defined under root_element without a parent tag, then the root element will be loaded using the XMLLoader()

In the simple.xml file, breakfast_menu is the root element and the tag to be extracted is food.

If all the elements are defined under root_element without parent tag, then the root element will be loaded using the XMLLoader()

Command > A = LOAD '/xmlfile/simple.xml' using org.apache.pig.piggybank.storage.XMLLoader ('breakfast-menu') as (x:chararray);

Step 3:- Extracting data from the tags


To extract data from XML tags in Pig, there are two methods:
1. Using regular expressions
2. Using XPath

1. Using regular expressions

Use the regular expressions to extract the data between the tags. Regular expressions can be used to determine simple tags in the document. [Tag <title> in the document]

For nested tags, writing regular expression will be tedious because if any small character is missed in the expression, it will give null output.

Command > B = FOREACH A GENERATE FLATTEN (REGEX_EXTRACT_ALL(x,'(?s)<breakfast-menu>.*?<name>([^>]*?)</name>.*?</breakfast-menu>'));

OR

Command  >  B = FOREACH A GENERATE FLATTEN (REGEX_EXTRACT_ALL(x,'(?s)<breakfast-menu>\\s*<calories>(.*)</calories>\\s*<price>(.*)</price>\\s*</breakfast-menu>'));

Then DUMP B to see Results.



2. Using XPath

XPath uses path expressions to access a node.

The function for XPath UDF consists of a long string:org.apache.pig.piggybank.evaluation.xml. Thus, you should define a small temporary function name for simplicity and ease of use.

To access a particular element, start from loading the parent node and navigate to the required tag.

Note that every repeating parent and child nodes become separate rows and columns respectively. In the above file, the tag<IntervalReading> repeats in the file, thus, upon extraction, each tag <IntervalTag> becomes a new row with the tags under it becoming attributes.

Piggybank.jar doesn't have the class XPath so, Download the piggybank-0.15.0 Jar file HERE.

Register the above jar file.

Load the XML file.

Use the XPath class to get the element in the title.

Command > B = FOREACH A GENERATE (XPath (x, 'food/name')), (XPath (x, 'food/price')), (XPath (x, 'food/calories'));

Then DUMP or STORE the result.




NOTE:- In both of the above process you have to mention the titles that you want to be displayed or stored using the same coding by adding additional title. 

2. XML Processing Using HIVE


To process XML files in HIVE there are two methods 1. Using UDFs.     2. Using SerDe..

In this blog I am using SerDe method to process XML file. 

Hive SerDe method uses a class from hivexmlserde jar file which can be downloaded from HERE.

Step 1: Add the SerDe Jar file to Hive Path.

Command > add jar /home/gopal/Desktop/hivexmlserde-1.0.5.3.jar;
The ADD JAR statement ensures the document reader is available for job execution. On successfull addition you will get below message

Added /home/gopal/Desktop/hivexmlserde-1.0.5.3.jar to class path
Added resource: /home/gopal/Desktop/hivexmlserde-1.0.5.3.jar


Step 2:- Create an External table using SerDe class and giving the existing XML file location.

Command > CREATE EXTERNAL TABLE xml (name string, price string, description string, calories string)

> ROW FORMAT SERDE 'com.ibm.spss.hive.serde2.xml.XmlSerDe'
> WITH SERDEPROPERTIES ("column.xpath.name"="/food/name/text()",
> "column.xpath.price"="/food/price/text()",
> "column.xpath.description"="/food/description/text()",
> "column.xpath.calories"="/food/calories/text()")
> STORED AS INPUTFORMAT 'com.ibm.spss.hive.serde2.xml.XmlInputFormat'
> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
> LOCATION '/xmlfile/'
> TBLPROPERTIES ("xmlinput.start"="<food","xmlinput.end"="</food>");


Explanation:

The INPUTFORMAT option allows for the definition of the required document reader. The OUTPUTFORMAT specified is the default for Hive. In this example I have defined an EXTERNAL table over the directory containing the extracted XML; independently copied to the Hadoop cluster.

3. XML Processing Using MapReduce


XML processing using MapReduce needs custom XML Input Format which will read XML files using a custom XML RecordReader method. XML files have tags ie; start-tag and end-tag, you have to mention that in your driver class to identify the same and inside mapper you have to mention the sub-tag and elements inside it.
(Kindly change the tag names accordingly in Driver and Mapper class)

Please find below coding for custom File format and record reader for XML processing.

FIRST LOAD THE DATA IN HDFS


To load the data in HDFS use the following command:-

Command > hadoop fs -mkdir /xmlfile

Command > hadoop fs -put simple.xml /xmlfile

XML INPUT FORMAT


package com.poc.xml;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class XmlInputFormat extends TextInputFormat {

public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new XmlRecordReader();
}
}

XML RECORD READER


package com.poc.xml;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class XmlRecordReader extends RecordReader<LongWritable, Text> {
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();

public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";

private LongWritable key = new LongWritable();
private Text value = new Text();

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
FileSplit fileSplit = (FileSplit) split;

// open the file and seek to the start of the split
start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
fsin = fs.open(fileSplit.getPath());
fsin.seek(start);

}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
key.set(fsin.getPos());
value.set(buffer.getData(), 0, buffer.getLength());
return true;
}
} finally {
buffer.reset();
}
}
}
return false;
}

@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}

@Override
public void close() throws IOException {
fsin.close();
}

@Override
public float getProgress() throws IOException {
return (fsin.getPos() - start) / (float) (end - start);
}

private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
// end of file:
if (b == -1)
return false;
// save to buffer:
if (withinBlock)
buffer.write(b);
// check if we're matching:
if (b == match[i]) {
i++;
if (i >= match.length)
return true;
} else
i = 0;
// see if we've passed the stop point:
if (!withinBlock && i == 0 && fsin.getPos() >= end)
return false;
}
}
}

XML PROCESSING 

Driver class

package com.poc.xml;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class XMLProcessing {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();

conf.set("xmlinput.start", "<breakfast-menu>");
conf.set("xmlinput.end", "</breakfast-menu>");
Job job = new Job(conf);
job.setJarByClass(XMLProcessing.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

job.setInputFormatClass(XmlInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);
}
}

XML MAPPER CLASS


package com.poc.xml;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
//import mrdp.logging.LogWriter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
public class Map extends Mapper<LongWritable, Text, Text, NullWritable> {
    private static final Log LOG = LogFactory.getLog(Map.class);
    // Fprivate Text videoName = new Text();
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            InputStream is = new ByteArrayInputStream(value.toString().getBytes());
            DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
            DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
            Document doc = dBuilder.parse(is);
            doc.getDocumentElement().normalize();
            NodeList nList = doc.getElementsByTagName("food");
            for (int temp = 0; temp < nList.getLength(); temp++) {
                Node nNode = nList.item(temp);
                if (nNode.getNodeType() == Node.ELEMENT_NODE) {
                    Element eElement = (Element) nNode;
                    String name = eElement.getElementsByTagName("name").item(0).getTextContent();
                    String price = eElement.getElementsByTagName("price").item(0).getTextContent();
                   String calories = eElement.getElementsByTagName("calories").item(0).getTextContent();
                    context.write(new Text(name + "," + price + "," + calories), NullWritable.get());
                }
            }
        }
    }
}

NOTE:- I am using only mapper class to give the output from XML to CSV format (I used "," for getting CSV format, you can use tab for the same). After that based on the requirement we can use the reducer class for reducer logic.







Now go ahead and feed XML to your PIG and Store XML honey in your HIVE.

References:-


Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).


ALL D BEST...