INDUSTRY: RETAIL
Data Input Format
:- .xls (My Input Data is
in excel 2007-2003 Format)
Kindly check my blog to read any kind of Excel sheet and use the Excel Input format, record reader and excel parser given in that blog. Please find link to my blog below:
This POC Input file and Problem statement was shared to me by Mr. Sunil Pashikanti like this below was created 3000 records:-
ATTRIBUTES are like:-
1. RETAIL_ID
2. RETAIL_NAME
3. TYPE_OF_CRAWLING
4. PRODUCT_URL
5. TITTLE
6. SALE_PRICE
7. REG_PRICE
8. REBATE_PERCENTAGE
9. STOCK_INFO
Example:
12 Amazon BS http://www.amazon.com/dell/lp Amazon.com:Dell Laptop 100.00
150.00 33 InStock
DOWNLOAD MY INPUT FILE FROM BELOW LINK:
https://drive.google.com/file/d/0BzYUKIo7aWL_Sm5mT2l1cnZSQ0E/view?usp=sharing
PROBLEM STATEMENT: -
https://drive.google.com/file/d/0BzYUKIo7aWL_Sm5mT2l1cnZSQ0E/view?usp=sharing
PROBLEM STATEMENT: -
1. take the complete Excel Input data on HDFS
2. Develop a Map Reduce Use Case to get the below filtered results from the HDFS Input data(Excel data)
IF Type_Of_Crawling is -->'BS'
-salePrice < 100.00 & RebatePercent>50 --> store "HighBuzzProducts"
-RegPrice<150.00 & RebatePercent in 25-50 --> store "NormalProducts"
-lengthOf(title)>100 ---> 'rare products'
IF Type_Of_Crawling is -->'ODC'
- salePrice < 150.00 --> store "OnDemandCrawlProducts"
- StockInfo --> "InStock" -->store "AvailableProducts"
ELSE
store in "OtherProducts"
NOTE: In the mentioned file names only 5 outputs have to be generated
3. Develop a PIG Script to filter the Map Reduce Output in the below fashion
- Provide the Unique data
- Sort the Unique data based on RETAIL_ID in DESC order
4. EXPORT the same PIG Output from HDFS to MySQL using SQOOP
5. Store the same PIG Output in a HIVE External Table.
NOTE:- For this POC I have used custom input format to read EXCEL files using external jar. So the corresponding jar files to be added during coding and to the lib directory of hadoop for successful execution. You can use poi-xml jar for the reading .xlsx file (2010 onwards excel format ).
Below is the steps to make it work...
1. Download and Install ant from below link.
http://muug.ca/mirror/apache-dist//ant/binaries/apache-ant-1.9.8-bin.tar.gz
2. To install give following command in terminal:
tar -xzvf <apache ant Path>
3. Update bashrc:-
nano ~/.bashrc
Add below two lines:-
export ANT_HOME=${ant_dir}
export PATH=${ANT_HOME}/bin
NOTE:- For this POC I have used custom input format to read EXCEL files using external jar. So the corresponding jar files to be added during coding and to the lib directory of hadoop for successful execution. You can use poi-xml jar for the reading .xlsx file (2010 onwards excel format ).
Below is the steps to make it work...
1. Download and Install ant from below link.
http://muug.ca/mirror/apache-dist//ant/binaries/apache-ant-1.9.8-bin.tar.gz
tar -xzvf <apache ant Path>
3. Update bashrc:-
nano ~/.bashrc
Add below two lines:-
export ANT_HOME=${ant_dir}
export PATH=${ANT_HOME}/bin
Now Source bashrc by command:
source ~/.bashrc
4. Then restart the system. (Very Important for the effect to take place)
5. Download the required Jar files from below link:
Place both jar files during Eclipse compilation and only SNAPSHOT.jar in hadoop lib directory.
6. If still not working try to add CLASSPATH:
export CLASSPATH=.:$CLASSPATH:<Path to the jar file 1>:<Path to jar file 2>
Hope it will work now.
POC
Processing Details
MAP
REDUCE PROCESS IN DETAILS:-
1.
TO TAKE XLS INPUT DATA ON HDFS
hadoop
fs -mkdir /Input
hadoop
fs -put POC.xls /Input
jar
xvf poc.jar
2.
MAP REDUCE CODES:-
EXCEL INPUT DRIVER
(DRIVER CLASS)
package com.poc;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class PocDriver {
static public int count = 0;
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
args = parser.getRemainingArgs();
Job job = new Job(conf, "Retail_Poc");
job.setJarByClass(PocDriver.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(ExcelInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// job.setNumReduceTasks(0);
job.setMapperClass(PocMapper.class);
job.setReducerClass(PocReducer.class);
MultipleOutputs.addNamedOutput(job, "HighBuzzProducts", TextOutputFormat.class, IntWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "NormalProducts", TextOutputFormat.class, IntWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "RareProducts", TextOutputFormat.class, IntWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "OnDemandCrawlProducts", TextOutputFormat.class, IntWritable.class,
Text.class);
MultipleOutputs.addNamedOutput(job, "AvailableProducts", TextOutputFormat.class, IntWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "OtherProducts", TextOutputFormat.class, IntWritable.class, Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
EXCEL INPUT FORMAT
(CUSTOM INPUT
FORMAT TO READ EXCEL FILES)
package com.poc;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class ExcelInputFormat extends FileInputFormat<LongWritable,
Text> {
@Override
public
RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
TaskAttemptContext context)
throws
IOException, InterruptedException {
return
new ExcelRecordReader();
}
}
EXCEL RECORD READER
(TO READ EXCEL
FILE AND SEND AS KEY, VALUE FORMAT)
package com.poc;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import
org.apache.hadoop.mapreduce.lib.input.FileSplit;
import com.sreejithpillai.excel.parser.ExcelParser;
public class ExcelRecordReader extends
RecordReader<LongWritable, Text> {
private
LongWritable key;
private
Text value;
private
InputStream is;
private
String[] strArrayofLines;
@Override
public
void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws
IOException, InterruptedException {
FileSplit
split = (FileSplit) genericSplit;
Configuration
job = context.getConfiguration();
final
Path file = split.getPath();
FileSystem
fs = file.getFileSystem(job);
FSDataInputStream
fileIn = fs.open(split.getPath());
is
= fileIn;
String
line = new ExcelParser().parseExcelData(is);
this.strArrayofLines
= line.split("\n");
}
@Override
public
boolean nextKeyValue() throws IOException, InterruptedException {
if
(key == null) {
key
= new LongWritable(0);
value
= new Text(strArrayofLines[0]);
}
else {
if
(key.get() < (this.strArrayofLines.length - 1)) {
long
pos = (int) key.get();
key.set(pos
+ 1);
value.set(this.strArrayofLines[(int)
(pos + 1)]);
pos++;
}
else {
return
false;
}
}
if
(key == null || value == null) {
return
false;
}
else {
return
true;
}
}
@Override
public
LongWritable getCurrentKey() throws IOException, InterruptedException {
return
key;
}
@Override
public
Text getCurrentValue() throws IOException, InterruptedException {
return
value;
}
@Override
public
float getProgress() throws IOException, InterruptedException {
return
0;
}
@Override
public
void close() throws IOException {
if
(is != null) {
is.close();
}
}
}
EXCEL PARSER
(TO PARSE EXCEL SHEET)
package com.poc;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import
org.apache.commons.logging.LogFactory;
import
org.apache.poi.hssf.usermodel.HSSFSheet;
import
org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
public
class ExcelParser {
private static final Log LOG =
LogFactory.getLog(ExcelParser.class);
private StringBuilder currentString
= null;
private long bytesRead = 0;
public String parseExcelData(InputStream
is) {
try {
HSSFWorkbook
workbook = new HSSFWorkbook(is);
HSSFSheet
sheet = workbook.getSheetAt(0);
Iterator<Row>
rowIterator = sheet.iterator();
currentString
= new StringBuilder();
while
(rowIterator.hasNext()) {
Row
row = rowIterator.next();
Iterator<Cell>
cellIterator = row.cellIterator();
while
(cellIterator.hasNext()) {
Cell
cell = cellIterator.next();
switch
(cell.getCellType()) {
case
Cell.CELL_TYPE_BOOLEAN:
bytesRead++;
currentString.append(cell.getBooleanCellValue()
+ "\t");
break;
case
Cell.CELL_TYPE_NUMERIC:
bytesRead++;
currentString.append(cell.getNumericCellValue()
+ "\t");
break;
case
Cell.CELL_TYPE_STRING:
bytesRead++;
currentString.append(cell.getStringCellValue()
+ "\t");
break;
}
}
currentString.append("\n");
}
is.close();
} catch (IOException e)
{
LOG.error("IO
Exception : File not found " + e);
}
return
currentString.toString();
}
public long getBytesRead() {
return bytesRead;
}
}
EXCEL MAPPER
(HAVING MAPPER LOGIC)
package com.poc;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class PocMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
if (value.toString().contains("RTL_NAME") && value.toString().contains("TYPE_OF_CRAWLING"))
return;
else {
String[] str = value.toString().split(" ");
String data = "";
for (int i = 0; i < str.length; i++) {
if (str[i] != null || str[i] != " ") {
data += (str[i] + " ");
}
}
String dr1 = data.trim().replaceAll("\\s+", "\t");
String[] str1 = dr1.split("\t");
int id = (int) Double.parseDouble(str1[0]);
int regprice = (int) Double.parseDouble(str1[6]);
int rebate = (int) Double.parseDouble(str1[7]);
int saleprice = (int) Double.parseDouble(str1[5]);
String dr = Integer.toString(id) + "\t" + str1[1] + "\t" + str1[2] + "\t" + str1[3] + "\t" + str1[4]+ "\t" + Integer.toString(saleprice) + "\t" + Integer.toString(regprice) + "\t"+Integer.toString(rebate) + "\t" + str1[8];
context.write(new Text(""), new Text(dr));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
EXCEL REDUCER
(HAVING REDUCER LOGIC)
package com.poc;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class PocReducer extends Reducer<Text, Text, IntWritable, Text> {
MultipleOutputs<IntWritable, Text> mos;
@Override
public void setup(Context context) {
mos = new MultipleOutputs<IntWritable, Text>(context);
}
@Override
public void reduce(Text k1, Iterable<Text> k2, Context context) throws IOException, InterruptedException {
while (k2.iterator().hasNext()) {
String sr = k2.iterator().next().toString();
String sr1 = sr.trim().replaceAll("\\s+", "\t");
String[] str1 = sr1.split("\t");
int regprice = Integer.parseInt(str1[6]);
int rebate = Integer.parseInt(str1[7]);
int saleprice = Integer.parseInt(str1[5]);
String dr = str1[0] + "\t" + str1[1] + "\t" + str1[2] + "\t" + str1[3] + "\t" + str1[4] + "\t" + str1[5]
+ "\t" + str1[6] + "\t" + str1[7] + "\t" + str1[8];
if (str1[2].equalsIgnoreCase("BS")) {
if (saleprice < 100 && rebate > 50) {
mos.write("HighBuzzProducts", null, new Text(dr), "/Retail/HighBuzzProducts");
} else if (regprice < 150 && rebate > 25 && rebate < 50) {
mos.write("NormalProducts", null, new Text(dr), "/Retail/NormalProducts");
} else if (str1[4].length() > 100) {
mos.write("RareProducts", null, new Text(dr), "/Retail/RareProducts");
} else {
mos.write("OtherProducts", null, new Text(dr), "/Retail/OtherProducts");
}
} else if (str1[2].equalsIgnoreCase("ODC")) {
if (saleprice < 150) {
mos.write("OnDemandCrawlProducts", null, new Text(dr), "/Retail/OnDemandCrawlProducts");
} else if (str1[8].equalsIgnoreCase("IN_STOCK")) {
mos.write("AvailableProducts", null, new Text(dr), "/Retail/AvailableProducts");
} else {
mos.write("OtherProducts", null, new Text(dr), "/Retail/OtherProducts");
}
} else {
mos.write("OtherProducts", null, new Text(dr), "/Retail/OtherProducts");
}
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
EXECUTING
THE MAP REDUCE CODE
Goto Firefox and open name node
page by following command:
http://localhost:50070 and browse the file system , then
click on HealthCarePOC directory to check the files created.
3.
PIG SCRIPT
A = LOAD '/Retail/' USING PigStorage
('\t') AS (id:int, Name:chararray, crawl:chararray, produrl:chararray, tittle:chararray, sale:int, reg:int, rebate:int, stockinfo:chararray);
B = DISTINCT A;
DUMP B;
PigScript2.pig
A = LOAD '/Retail/' USING PigStorage ('\t') AS (id:int, Name:chararray, crawl:chararray, produrl:chararray, tittle:chararray, sale:int, reg:int, rebate:int, stockinfo:chararray);
B = DISTINCT A;
C = ORDER B BY id;
STORE C INTO '/RETAILPOC';
4.
EXPORT the PIG Output from HDFS to MySQL using SQOOP
sqoop eval --connect jdbc:mysql://localhost/
--username root --password root --query "create database if not exists
RETAIL;";
sqoop eval --connect jdbc:mysql://localhost/
--username root --password root --query "use RETAIL;";
sqoop eval --connect jdbc:mysql://localhost/ --username
root --password root --query "grant all privileges on RETAIL.* to ‘localhost’@’%’;”;
sqoop eval --connect jdbc:mysql://localhost/ --username
root --password root --query "grant all privileges on RETAIL.* to ‘’@’localhost’;”;
sqoop eval --connect jdbc:mysql://localhost/RETAIL --username root --password root --query "create table retailpoc(id int,
name varchar(50), crawl varchar(50), produrl varchar(200), tittle varchar(200), sale int, reg int, rebate int, stockinfo varchar(50));";
sqoop export --connect jdbc:mysql://localhost/RETAIL--table retailpoc --export-dir /RETAILPOC --fields-terminated-by '\t';
5.
STORE THE PIG OUTPUT IN A HIVE EXTERNAL TABLE
goto hive shell using command:
hive
show databases;
create database RetailPOC;
use RetailPOC;
create external table retailpoc(id int, Name string, crawl string, produrl string, tittle string, sale int, reg int, rebate int, stockinfo string)
row format delimited
fields terminated by '\t'
stored as textfile location '/RETAILPOC';
Hope you all understood the procedures...
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).
ALL D BEST...
What a great blog it is..Just need to follow blindly..Any person from any background can grasp the subject..Its great :) Cheers :D
ReplyDeleteIt's a great blog mahesh.... U have provided complete information.... Including input files... Which are provided by very few. Thank you yar.
ReplyDeleteThanks All...
ReplyDeleteI believe knowledge should be shared....
But it will be worthy if you try by yourself by understanding each concept...
python training in noida sector 63
ReplyDeletesap sd training in noida
devops training in noida
Email marketing training course in noida sector 62
devops training in center noida
blue prism training center in noida
cloud computing training in noida sector 15
hadoop training center in noida
python training in noida sector 15
sap sd training center in noida
linux training center in noida sector 62
android training center in noida sector 63
ReplyDeletedigital marketing training center in noida sector 16
Ui Path training center in Noida
digital marketing training center in noida sector 18
linux training center in noida sector 15
python training in noida sector 62
linux training center in noida sector 63
sap sd training center in noida
ReplyDeletelinux training center in noida sector 62
Email marketing training course in noida sector 15
salesforce training in noida sector 63
openstack training in noida sector 15
machine learning training in Noida sector 62
android training center in noida sector 15
ReplyDeletesalesforce training in noida sector 18
sap sd training course in noida
sap fico training in noida
hadoop training center in noida
data science training course in noida
data science training center in noida
php training center in noida
digital marketing training center in noida sector 18
ReplyDeletelinux training center in noida sector 15
python training in noida sector 62
linux training center in noida sector 63
android training center in noida sector 62
digital marketing training center in noida sector 15
python training in noida sector 63
sap sd training in noida
linux training center in noida sector 62
ReplyDeleteEmail marketing training course in noida sector 15
salesforce training in noida sector 63
openstack training in noida sector 15
machine learning training in Noida sector 62
salesforce training in noida sector 64
AWS training institute center in Noida sector 63
android training center in noida sector 15
salesforce training in noida sector 18
sap sd training course in noida
ReplyDeletesap fico training in noida
hadoop training center in noida
data science training course in noida
data science training center in noida
php training center in noida
php training course in noida
web design training course in noida
web design training center in noida
oracle training center in noida
oracle training course in noida
linux training center in noida sector 15
ReplyDeletepython training in noida sector 62
linux training center in noida sector 63
android training center in noida sector 62
digital marketing training center in noida sector 15
python training in noida sector 63
sap sd training in noida
devops training in noida
Email marketing training course in noida sector 62
devops training in center noida
Email marketing training course in noida sector 15
ReplyDeletesalesforce training in noida sector 63
openstack training in noida sector 15
machine learning training in Noida sector 62
salesforce training in noida sector 64
AWS training institute center in Noida sector 63
android training center in noida sector 15
android training center in noida sector 62
ReplyDeletedigital marketing training center in noida sector 15
python training in noida sector 63
sap sd training in noida
devops training in noida
Email marketing training course in noida sector 62
F064C0F2FF
ReplyDeletetwitter ucuz beğeni satın al
292195FC4F
ReplyDeleteinstagram türk beğeni