Saturday, January 7, 2017

HADOOP (PROOF OF CONCEPT) RETAIL DATA BY MAHESH CHANDRA MAHARANA

INDUSTRY: RETAIL

Data Input Format :- .xls (My Input Data is in excel 2007-2003 Format)


Kindly check my blog to read any kind of Excel sheet and use the Excel Input format, record reader and excel parser given in that blog. Please find link to my blog below:


This POC Input file and Problem statement was shared to me by Mr. Sunil Pashikanti like this below was created 3000 records:-

ATTRIBUTES are like:-
1. RETAIL_ID
2. RETAIL_NAME
3. TYPE_OF_CRAWLING
4. PRODUCT_URL
5. TITTLE
6. SALE_PRICE
7. REG_PRICE
8. REBATE_PERCENTAGE
9. STOCK_INFO

Example:

12 Amazon BS http://www.amazon.com/dell/lp Amazon.com:Dell Laptop 100.00
150.00 33 InStock


DOWNLOAD MY INPUT FILE FROM BELOW LINK:

https://drive.google.com/file/d/0BzYUKIo7aWL_Sm5mT2l1cnZSQ0E/view?usp=sharing

PROBLEM STATEMENT: -

1. take the complete Excel Input data on HDFS
  
2. Develop a Map Reduce Use Case to get the below filtered results from the HDFS Input data(Excel data)

     IF Type_Of_Crawling is -->'BS'
          -salePrice < 100.00  & RebatePercent>50  --> store "HighBuzzProducts"
          -RegPrice<150.00 & RebatePercent in 25-50 --> store "NormalProducts"
          -lengthOf(title)>100 ---> 'rare products'
          
     IF Type_Of_Crawling is -->'ODC'
          - salePrice < 150.00 --> store "OnDemandCrawlProducts"
          - StockInfo --> "InStock"  -->store "AvailableProducts"
    ELSE
          store in "OtherProducts"

  NOTE: In the mentioned file names only 5 outputs have to be generated

3. Develop a PIG Script to filter the Map Reduce Output in the below fashion
       - Provide the Unique data
       - Sort the Unique data based on RETAIL_ID in DESC order

4. EXPORT the same PIG Output from HDFS to MySQL using SQOOP

5. Store the same PIG Output in a HIVE External Table.


NOTE:- For this POC I have used custom input format to read EXCEL files using external jar. So the corresponding jar files to be added during coding and to the lib directory of hadoop for successful execution. You can use poi-xml jar for the reading .xlsx file (2010 onwards excel format ).

Below is the steps to make it work... 

1. Download and Install ant from below link.

http://muug.ca/mirror/apache-dist//ant/binaries/apache-ant-1.9.8-bin.tar.gz


2. To install give following command in terminal:

tar -xzvf <apache ant Path>

3. Update bashrc:-

nano ~/.bashrc

Add below two lines:-

export ANT_HOME=${ant_dir}

export PATH=${ANT_HOME}/bin

Now Source bashrc by command:
source ~/.bashrc

4. Then restart the system. (Very Important for the effect to take place)

5. Download the required Jar files from below link:



Place both jar files during Eclipse compilation and only SNAPSHOT.jar in hadoop lib directory.
6. If still not working try to add CLASSPATH:
export CLASSPATH=.:$CLASSPATH:<Path to the jar file 1>:<Path to jar file 2>

Hope it will work now.

POC Processing Details



MAP REDUCE PROCESS IN DETAILS:-


1.     TO TAKE XLS INPUT DATA ON HDFS

hadoop fs -mkdir /Input
hadoop fs -put POC.xls /Input
jar xvf poc.jar 



2.     MAP REDUCE CODES:-

EXCEL INPUT DRIVER 
(DRIVER CLASS)

package com.poc;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PocDriver {

static public int count = 0;

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();

GenericOptionsParser parser = new GenericOptionsParser(conf, args);
args = parser.getRemainingArgs();

Job job = new Job(conf, "Retail_Poc");
job.setJarByClass(PocDriver.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setInputFormatClass(ExcelInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// job.setNumReduceTasks(0);

job.setMapperClass(PocMapper.class);
job.setReducerClass(PocReducer.class);

MultipleOutputs.addNamedOutput(job, "HighBuzzProducts", TextOutputFormat.class, IntWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "NormalProducts", TextOutputFormat.class, IntWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "RareProducts", TextOutputFormat.class, IntWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "OnDemandCrawlProducts", TextOutputFormat.class, IntWritable.class,
Text.class);
MultipleOutputs.addNamedOutput(job, "AvailableProducts", TextOutputFormat.class, IntWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "OtherProducts", TextOutputFormat.class, IntWritable.class, Text.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}
}


EXCEL INPUT FORMAT 
(CUSTOM INPUT FORMAT TO READ EXCEL FILES)

package com.poc;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class ExcelInputFormat extends FileInputFormat<LongWritable, Text> {
            @Override
            public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
                                    throws IOException, InterruptedException {
                        return new ExcelRecordReader();
            }
}

EXCEL RECORD READER 
(TO READ EXCEL FILE AND SEND AS KEY, VALUE FORMAT)

package com.poc;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import com.sreejithpillai.excel.parser.ExcelParser;

public class ExcelRecordReader extends RecordReader<LongWritable, Text> {
            private LongWritable key;
            private Text value;
            private InputStream is;
            private String[] strArrayofLines;
            @Override
            public void initialize(InputSplit genericSplit, TaskAttemptContext context)
                                    throws IOException, InterruptedException {
                        FileSplit split = (FileSplit) genericSplit;
                        Configuration job = context.getConfiguration();
                        final Path file = split.getPath();
                        FileSystem fs = file.getFileSystem(job);
                        FSDataInputStream fileIn = fs.open(split.getPath());
                        is = fileIn;
                        String line = new ExcelParser().parseExcelData(is);
                        this.strArrayofLines = line.split("\n");
            }
            @Override
            public boolean nextKeyValue() throws IOException, InterruptedException {
                        if (key == null) {
                                    key = new LongWritable(0);
                                    value = new Text(strArrayofLines[0]);
                        } else {
                                    if (key.get() < (this.strArrayofLines.length - 1)) {
                                                long pos = (int) key.get();
                                                key.set(pos + 1);
                                                value.set(this.strArrayofLines[(int) (pos + 1)]);
                                                pos++;
                                    } else {
                                                return false;
                                    }
                        }
                        if (key == null || value == null) {
                                    return false;
                        } else {
                                    return true;
                        }
            }
            @Override
            public LongWritable getCurrentKey() throws IOException, InterruptedException {
                        return key;
            }
            @Override
            public Text getCurrentValue() throws IOException, InterruptedException {
                        return value;
            }
            @Override
            public float getProgress() throws IOException, InterruptedException {
                        return 0;
            }
            @Override
            public void close() throws IOException {
                        if (is != null) {
                                    is.close();
                        }
            }
}


EXCEL PARSER 
(TO PARSE EXCEL SHEET)

package com.poc;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.poi.hssf.usermodel.HSSFSheet;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;

public class ExcelParser {
            private static final Log LOG = LogFactory.getLog(ExcelParser.class);
            private StringBuilder currentString = null;
            private long bytesRead = 0;
            public String parseExcelData(InputStream is) {
                        try {
                                    HSSFWorkbook workbook = new HSSFWorkbook(is);
                                    HSSFSheet sheet = workbook.getSheetAt(0);
                                    Iterator<Row> rowIterator = sheet.iterator();
                                    currentString = new StringBuilder();
                                    while (rowIterator.hasNext()) {
                                                Row row = rowIterator.next();
                                                Iterator<Cell> cellIterator = row.cellIterator();
                                                while (cellIterator.hasNext()) {
                                                            Cell cell = cellIterator.next();
                                                            switch (cell.getCellType()) {
                                                            case Cell.CELL_TYPE_BOOLEAN:
                                                                        bytesRead++;
                                                                        currentString.append(cell.getBooleanCellValue() + "\t");
                                                                        break;
                                                case Cell.CELL_TYPE_NUMERIC:
                                                                        bytesRead++;
                                                                        currentString.append(cell.getNumericCellValue() + "\t");
                                                                        break;
                                                            case Cell.CELL_TYPE_STRING:
                                                                        bytesRead++;
                                                                        currentString.append(cell.getStringCellValue() + "\t");
                                                                        break;
                                                            }
                                                }
                                                currentString.append("\n");
                                    }
                                    is.close();
                        } catch (IOException e) {
                                    LOG.error("IO Exception : File not found " + e);
                        }
                        return currentString.toString();
            }
            public long getBytesRead() {
                        return bytesRead;
            }
}

EXCEL MAPPER 
(HAVING MAPPER LOGIC)

package com.poc;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class PocMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
if (value.toString().contains("RTL_NAME") && value.toString().contains("TYPE_OF_CRAWLING"))
return;
else {
String[] str = value.toString().split(" ");
String data = "";
for (int i = 0; i < str.length; i++) {
if (str[i] != null || str[i] != " ") {
data += (str[i] + " ");

}
}
String dr1 = data.trim().replaceAll("\\s+", "\t");
String[] str1 = dr1.split("\t");

int id = (int) Double.parseDouble(str1[0]);
int regprice = (int) Double.parseDouble(str1[6]);
int rebate = (int) Double.parseDouble(str1[7]);
int saleprice = (int) Double.parseDouble(str1[5]);
String dr = Integer.toString(id) + "\t" + str1[1] + "\t" + str1[2] + "\t" + str1[3] + "\t" + str1[4]+ "\t" + Integer.toString(saleprice) + "\t" + Integer.toString(regprice) + "\t"+Integer.toString(rebate) + "\t" + str1[8];

context.write(new Text(""), new Text(dr));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

EXCEL REDUCER 
(HAVING REDUCER LOGIC)

package com.poc;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class PocReducer extends Reducer<Text, Text, IntWritable, Text> {
MultipleOutputs<IntWritable, Text> mos;

@Override
public void setup(Context context) {
mos = new MultipleOutputs<IntWritable, Text>(context);
}

@Override
public void reduce(Text k1, Iterable<Text> k2, Context context) throws IOException, InterruptedException {
while (k2.iterator().hasNext()) {
String sr = k2.iterator().next().toString();
String sr1 = sr.trim().replaceAll("\\s+", "\t");

String[] str1 = sr1.split("\t");

int regprice = Integer.parseInt(str1[6]);
int rebate = Integer.parseInt(str1[7]);
int saleprice = Integer.parseInt(str1[5]);
String dr = str1[0] + "\t" + str1[1] + "\t" + str1[2] + "\t" + str1[3] + "\t" + str1[4] + "\t" + str1[5]
+ "\t" + str1[6] + "\t" + str1[7] + "\t" + str1[8];
if (str1[2].equalsIgnoreCase("BS")) {
if (saleprice < 100 && rebate > 50) {
mos.write("HighBuzzProducts", null, new Text(dr), "/Retail/HighBuzzProducts");
} else if (regprice < 150 && rebate > 25 && rebate < 50) {
mos.write("NormalProducts", null, new Text(dr), "/Retail/NormalProducts");

} else if (str1[4].length() > 100) {
mos.write("RareProducts", null, new Text(dr), "/Retail/RareProducts");
} else {
mos.write("OtherProducts", null, new Text(dr), "/Retail/OtherProducts");
}
} else if (str1[2].equalsIgnoreCase("ODC")) {
if (saleprice < 150) {
mos.write("OnDemandCrawlProducts", null, new Text(dr), "/Retail/OnDemandCrawlProducts");
} else if (str1[8].equalsIgnoreCase("IN_STOCK")) {
mos.write("AvailableProducts", null, new Text(dr), "/Retail/AvailableProducts");
} else {
mos.write("OtherProducts", null, new Text(dr), "/Retail/OtherProducts");
}
} else {
mos.write("OtherProducts", null, new Text(dr), "/Retail/OtherProducts");
}

}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}

EXECUTING THE MAP REDUCE CODE

hadoop jar poc.jar com/poc/PocDriver /Input/POC.xls /Poc


Goto Firefox and open name node page by following command:

http://localhost:50070 and browse the file system , then click on HealthCarePOC directory to check the files created.





3.     PIG SCRIPT

A = LOAD '/Retail/' USING PigStorage ('\t') AS (id:int, Name:chararray, crawl:chararray, produrl:chararray, tittle:chararray, sale:int, reg:int, rebate:int, stockinfo:chararray);

B = DISTINCT A;
DUMP B; 





PigScript2.pig

A = LOAD '/Retail/' USING PigStorage ('\t') AS (id:int, Name:chararray, crawl:chararray, produrl:chararray, tittle:chararray, sale:int, reg:int, rebate:int, stockinfo:chararray);

B = DISTINCT A;
C = ORDER B BY id;
STORE C INTO '/RETAILPOC'; 




4.     EXPORT the PIG Output from HDFS to MySQL using SQOOP

sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "create database if not exists RETAIL;";



sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "use RETAIL;";


sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "grant all privileges on RETAIL.* to ‘localhost’@’%’;”;

sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "grant all privileges on RETAIL.* to ‘’@’localhost’;”;



sqoop eval --connect jdbc:mysql://localhost/RETAIL --username root --password root --query "create table retailpoc(id int, name varchar(50), crawl varchar(50), produrl varchar(200), tittle varchar(200), sale int, reg int, rebate int, stockinfo varchar(50));";



sqoop export --connect jdbc:mysql://localhost/RETAIL--table retailpoc --export-dir /RETAILPOC --fields-terminated-by '\t';





5.     STORE THE PIG OUTPUT IN A HIVE EXTERNAL TABLE

goto hive shell using command:

hive

show databases;
create database RetailPOC;
use RetailPOC;



create external table retailpoc(id int, Name string, crawl string, produrl string, tittle string, sale int, reg int, rebate int, stockinfo string)
row format delimited
fields terminated by '\t'
stored as textfile location '/RETAILPOC';





Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).
ALL D BEST...







15 comments:

  1. What a great blog it is..Just need to follow blindly..Any person from any background can grasp the subject..Its great :) Cheers :D

    ReplyDelete
  2. It's a great blog mahesh.... U have provided complete information.... Including input files... Which are provided by very few. Thank you yar.

    ReplyDelete
  3. Thanks All...

    I believe knowledge should be shared....

    But it will be worthy if you try by yourself by understanding each concept...

    ReplyDelete