Saturday, December 31, 2016

HADOOP (PROOF OF CONCEPT) SENSEXLOG EXCEL DATA BY MAHESH CHANDRA MAHARANA


INDUSTRY: SENSEX LOG

DATA INPUT FORMAT :- .xls (My Input Data is in excel 97-2003 Format)

Kindly check my blog to read any kind of Excel sheet and use the Excel Input format, record reader and excel parser given in that blog. Please find link to my blog below:

https://hadoop-poc-mahesh.blogspot.in/2017/01/hadoop-excel-input-format-to-read-any.html

Like this below created 3000 records on my own:-

ATTRIBUTES are like:-

      1)      SENSEX_ID 
      2)      SENSEX_NAME      
      3)      TYPE_OF_TRADING         
      4)      SENSEX_LOCATION
      5)      OPENING_BAL
      6)      CLOSING_BAL
      7)      FLUCTUATION_RATE

EXAMPLE:


100001            BSE     SIP      NEW_YORK             23486              24876              28



PROBLEM STATEMENT: -

     1)      Take the complete EXCEL Input data on HDFS.

     2)      Develop a Map Reduce Use Case to get the below filtered results from the HDFS Input data (.xls EXCEL data).

2.1)            If TYPEOFTRADING is -->'SIP'.

2.1.1)      If OPEN_BALANCE > 25000 & FLTUATION_RATE > 10 --> store "HighDemandMarket".

2.1.2)      If CLOSING_BALANCE<22000 & FLTUATION_RATE IN BETWEEN 20 - 30  --> store "OnGoingMarketStretegy".

2.2)            If TYPEOFTRADING is -->'SHORTTERM’.

2.2.1)       If OPEN_BALANCE < 5000 --> store "WealthyProducts".

2.2.2)      If SensexLoc --> "NewYork OR California"  --> “ReliableProducts”
    ELSE

2.3)            store in "OtherProducts".

  NOTE: In the mentioned file names, only 5 outputs have to be generated
Example: “WealthyProducts-r-00000”

     3)      Develop a PIG Script to filter the Map Reduce Output in the below fashion.

3.1)            Provide the Unique data
3.2)            Sort the Unique data based on Patient_ID.

     4.)    EXPORT the same PIG Output from HDFS to MySQL using SQOOP.

     5.)    Store the same PIG Output in a HIVE External Table

NOTE:- For this POC I have used custom input format to read EXCEL files using external jar. So the corresponding jar files to be added during coding and to the lib directory of hadoop for successful execution. You can use poi-xml jar for the reading .xlsx file (2010 onwards excel format ).

Below is the steps to make it work... 

1. Download and Install ant from below link.

http://muug.ca/mirror/apache-dist//ant/binaries/apache-ant-1.9.8-bin.tar.gz


2. To install give following command in terminal:

tar -xzvf <apache ant Path>

3. Update bashrc:-

nano ~/.bashrc

Add below two lines:-

export ANT_HOME=${ant_dir}

export PATH=${ANT_HOME}/bin

Now Source bashrc by command:
source ~/.bashrc

4. Then restart the system. (Very Important for the effect to take place)

5. Download the required Jar files from below link:



Place both jar files during Eclipse compilation and only SNAPSHOT.jar in hadoop lib directory.
6. If still not working try to add CLASSPATH:
export CLASSPATH=.:$CLASSPATH:<Path to the jar file 1>:<Path to jar file 2>

Hope it will work now.

POC Processing Details


MAP REDUCE PROCESS IN DETAILS:-



1.     TO TAKE XLS INPUT DATA ON HDFS

hadoop fs -mkdir /Pocinput
hadoop fs -put SENSEX.xls /Pocinput
jar xvf POC.jar


2.     MAP REDUCE CODES:-

EXCEL INPUT DRIVER 
(MAIN DRIVER CLASS)
package com.poc;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PocDriver {
            static public int count = 0;
            public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
                        Configuration conf = new Configuration();
                        GenericOptionsParser parser = new GenericOptionsParser(conf, args);
                        args = parser.getRemainingArgs();
                        Job job = new Job(conf, "Sensex_Log");
                        job.setJarByClass(PocDriver.class);
                        job.setOutputKeyClass(Text.class);
                        job.setOutputValueClass(Text.class);
                        job.setInputFormatClass(ExcelInputFormat.class);
                        job.setOutputFormatClass(TextOutputFormat.class);
                        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
                        FileInputFormat.addInputPath(job, new Path(args[0]));
                        FileOutputFormat.setOutputPath(job, new Path(args[1]));
                        job.setMapperClass(PocMapper.class);
                        job.setReducerClass(PocReducer.class);
                        MultipleOutputs.addNamedOutput(job, "HighDemandMarket", TextOutputFormat.class, IntWritable.class, Text.class);
                        MultipleOutputs.addNamedOutput(job, "OnGoingMarketStretegy", TextOutputFormat.class, IntWritable.class,Text.class);
                        MultipleOutputs.addNamedOutput(job, "WealthyProducts", TextOutputFormat.class, IntWritable.class, Text.class);
                        MultipleOutputs.addNamedOutput(job, "ReliableProducts", TextOutputFormat.class, IntWritable.class, Text.class);
                        MultipleOutputs.addNamedOutput(job, "OtherProducts", TextOutputFormat.class, IntWritable.class, Text.class);
                        System.exit(job.waitForCompletion(true) ? 0 : 1);
            }
}

EXCEL INPUT FORMAT 
(CUSTOM INPUT FORMAT TO READ EXCEL FILES)

package com.poc;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class ExcelInputFormat extends FileInputFormat<LongWritable, Text> {
            @Override
            public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
                                    throws IOException, InterruptedException {
                        return new ExcelRecordReader();
            }
}


EXCEL RECORD READER 
(TO READ EXCEL FILE AND SEND AS KEY, VALUE FORMAT)
package com.poc;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import com.sreejithpillai.excel.parser.ExcelParser;

public class ExcelRecordReader extends RecordReader<LongWritable, Text> {
            private LongWritable key;
            private Text value;
            private InputStream is;
            private String[] strArrayofLines;
            @Override
            public void initialize(InputSplit genericSplit, TaskAttemptContext context)
                                    throws IOException, InterruptedException {
                        FileSplit split = (FileSplit) genericSplit;
                        Configuration job = context.getConfiguration();
                        final Path file = split.getPath();
                        FileSystem fs = file.getFileSystem(job);
                        FSDataInputStream fileIn = fs.open(split.getPath());
                        is = fileIn;
                        String line = new ExcelParser().parseExcelData(is);
                        this.strArrayofLines = line.split("\n");
            }
            @Override
            public boolean nextKeyValue() throws IOException, InterruptedException {
                        if (key == null) {
                                    key = new LongWritable(0);
                                    value = new Text(strArrayofLines[0]);
                        } else {
                                    if (key.get() < (this.strArrayofLines.length - 1)) {
                                                long pos = (int) key.get();
                                                key.set(pos + 1);
                                                value.set(this.strArrayofLines[(int) (pos + 1)]);
                                                pos++;
                                    } else {
                                                return false;
                                    }
                        }
                        if (key == null || value == null) {
                                    return false;
                        } else {
                                    return true;
                        }
            }
            @Override
            public LongWritable getCurrentKey() throws IOException, InterruptedException {
                        return key;
            }
            @Override
            public Text getCurrentValue() throws IOException, InterruptedException {
                        return value;
            }
            @Override
            public float getProgress() throws IOException, InterruptedException {
                        return 0;
            }
            @Override
            public void close() throws IOException {
                        if (is != null) {
                                    is.close();
                        }
            }
}


EXCEL PARSER 
(TO PARSE EXCEL SHEET)

package com.poc;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.poi.hssf.usermodel.HSSFSheet;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;

public class ExcelParser {
            private static final Log LOG = LogFactory.getLog(ExcelParser.class);
            private StringBuilder currentString = null;
            private long bytesRead = 0;
            public String parseExcelData(InputStream is) {
                        try {
                                    HSSFWorkbook workbook = new HSSFWorkbook(is);
                                    HSSFSheet sheet = workbook.getSheetAt(0);
                                    Iterator<Row> rowIterator = sheet.iterator();
                                    currentString = new StringBuilder();
                                    while (rowIterator.hasNext()) {
                                                Row row = rowIterator.next();
                                                Iterator<Cell> cellIterator = row.cellIterator();
                                                while (cellIterator.hasNext()) {
                                                            Cell cell = cellIterator.next();
                                                            switch (cell.getCellType()) {
                                                            case Cell.CELL_TYPE_BOOLEAN:
                                                                        bytesRead++;
                                                                        currentString.append(cell.getBooleanCellValue() + "\t");
                                                                        break;
                                                case Cell.CELL_TYPE_NUMERIC:
                                                                        bytesRead++;
                                                                        currentString.append(cell.getNumericCellValue() + "\t");
                                                                        break;
                                                            case Cell.CELL_TYPE_STRING:
                                                                        bytesRead++;
                                                                        currentString.append(cell.getStringCellValue() + "\t");
                                                                        break;
                                                            }
                                                }
                                                currentString.append("\n");
                                    }
                                    is.close();
                        } catch (IOException e) {
                                    LOG.error("IO Exception : File not found " + e);
                        }
                        return currentString.toString();
            }
            public long getBytesRead() {
                        return bytesRead;
            }
}

EXCEL MAPPER
(HAVING MAPPER LOGIC)

package com.poc;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class PocMapper extends Mapper<LongWritable, Text, Text, Text> {
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                        try {
                                    if (value.toString().contains("SENSEX_NAME") && value.toString().contains("TYPE_OF_TRADING"))
                                                return;
                                    else {
                                                String[] str = value.toString().split(" ");
                                                String data = "";
                                                for (int i = 0; i < str.length; i++) {
                                                            if (str[i] != null || str[i] != " ") {
                                                                        data += (str[i] + " ");
                                                            }
                                                }
                                                String dr1 = data.trim().replaceAll("\\s+", "\t");
                                                String[] str1 = dr1.toString().split("\t");
                                                int id = (int)Double.parseDouble(str1[0]);
                                                int flucrate = (int)Double.parseDouble(str1[6]);
                                                int openbal = (int)Double.parseDouble(str1[4]);
                                                int closebal = (int)Double.parseDouble(str1[5]);
                                                String dr = Integer.toString(id)+"\t"+str1[1]+"\t"+str1[2]+"\t"+str1[3]+"\t"+Integer.toString(openbal)+"\t"+Integer.toString(closebal)+"\t"+Integer.toString(flucrate);
                                                context.write(new Text(""), new Text(dr));
                                    }
                        } catch (Exception e) {
                                    e.printStackTrace();
                        }
            }
}

EXCEL REDUCER
(HAVING REDUCER LOGIC)
package com.poc;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class PocReducer extends Reducer<Text, Text, IntWritable, Text> {
            MultipleOutputs<IntWritable, Text> mos;
            @Override
            public void setup(Context context) {
                        mos = new MultipleOutputs<IntWritable, Text>(context);
            }
            @Override
            public void reduce(Text k1, Iterable<Text> k2, Context context) throws IOException, InterruptedException {
                        while (k2.iterator().hasNext()) {
                                    String sr = k2.iterator().next().toString();
                                    String sr1 = sr.trim().replaceAll("\\s+", "\t");
                                    String[] str1 = sr1.split("\t");
                                    int id = (int)Double.parseDouble(str1[0]);
                                    String s = "NEW_YORK";
                                    String s1 = "CALIFORNIA";
                                    int flucrate = (int)Double.parseDouble(str1[6]);
                                    int openbal = (int)Double.parseDouble(str1[4]);
                                    int closebal = (int)Double.parseDouble(str1[5]);
                                    String dr = Integer.toString(id)+"\t"+str1[1]+"\t"+str1[2]+"\t"+str1[3]+"\t"+Integer.toString(openbal)+"\t"+Integer.toString(closebal)+"\t"+Integer.toString(flucrate);
                                    if (str1[2].equalsIgnoreCase("SIP")) {
                                                if (openbal > 25000 && flucrate > 10) {
                                                            mos.write("HighDemandMarket", null, new Text(dr), "/SensexLog/HighDemandMarket");
                                                } else if (closebal < 22000) {
                                                            mos.write("OnGoingMarketStretegy", null, new Text(dr), "/SensexLog/OnGoingMarketStretegy");
                                                } else {
                                                            mos.write("OtherProducts", null, new Text(dr), "/SensexLog/OtherProducts");
                                                }
                                    } else if (str1[2].equalsIgnoreCase("SHORTTERM")) {
                                                if (openbal > 5000) {
                                                            mos.write("WealthyProducts", null, new Text(dr), "/SensexLog/WealthyProducts");
                                                } else if (str1[3].toLowerCase().contains(s.toLowerCase()) || str1[3].toLowerCase().contains(s1.toLowerCase())) {
                                                            mos.write("ReliableProducts", null, new Text(dr), "/SensexLog/ReliableProducts");
                                                } else {
                                                            mos.write("OtherProducts", null, new Text(dr), "/SensexLog/OtherProducts");
                                                }
                                    } else {
                                                mos.write("OtherProducts", null, new Text(dr), "/SensexLog/OtherProducts");
                                    }
                        }
            }
            @Override
            protected void cleanup(Context context) throws IOException, InterruptedException {
                        mos.close();
            }
}

EXECUTING THE MAP REDUCE CODE

hadoop jar SensexLog.jar com.poc.PocDriver /Input/SENSEX.xls /Sensex



Goto Firefox and open name node page by following command:

http://localhost:50070 and browse the file system , then click on HealthCarePOC directory to check the files created.





3.     PIG SCRIPT


To execute Pig Script file from any directory in terminal type the following command:-
1. For LFS file:
pig -x local <Scriptname.pig>

2. For HDFS files:
pig <Scriptname.pig>

In this case since our data is in HDFS the follwing command should be used:


pig PigScript.pig

PigScript1.pig

A = LOAD '/ SensexLog /' USING PigStorage ('\t') AS (id:int, Name:chararray, age:int, gender:chararray, disease:chararray, hospital:chararray, admitdate:chararray, address:chararray);
B = DISTINCT A;
DUMP B; 



PigScript2.pig

A = LOAD '/ SensexPOC /' USING PigStorage ('\t') AS (id:int, Name:chararray, trade:chararray, openbal:int, closebal:int, flucrate:int);
B = DISTINCT A;
C = ORDER B by Sid;

STORE C INTO '/SensexPOC';






4.     EXPORT the PIG Output from HDFS to MySQL using SQOOP

sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "create database if not exists SENSEX;";


sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "use SENSEX;";


sqoop eval --connect jdbc:mysql://localhost/SENSEX --username root --password root --query "grant all privileges on HEALTHCARE.* to ‘localhost’@’%’;”;

sqoop eval --connect jdbc:mysql://localhost/SENSEX --username root --password root --query "grant all privileges on HEALTHCARE.* to ‘’@’localhost’;”;



sqoop eval --connect jdbc:mysql://localhost/SENSEX --username root --password root --query "create table sensexlogpoc(id int, name varchar(50), trade varchar(50), loc varchar(200), openbal int, closebal int, flucrate int);";


sqoop export --connect jdbc:mysql://localhost/SENSEX --table sensexlogpoc --export-dir /SensexPOC --fields-terminated-by '\t';




5.     STORE THE PIG OUTPUT IN A HIVE EXTERNAL TABLE

goto hive shell using command:
hive

show databases;

create database SensexPOC;

use SensexPOC;


create external table sensexpoc(id int, Name string, trading string, loc string, openbal int, closebal int, flucrate int)
row format delimited
fields terminated by '\t'
stored as textfile location '/SensexPOC';


select * from sensexlogpoc;



Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).
ALL D BEST...