INDUSTRY: SENSEX LOG
DATA INPUT FORMAT :- .xls (My Input Data is
in excel 97-2003 Format)
Kindly check my blog to read any kind of Excel sheet and use the Excel Input format, record reader and excel parser given in that blog. Please find link to my blog below:
https://hadoop-poc-mahesh.blogspot.in/2017/01/hadoop-excel-input-format-to-read-any.html
https://hadoop-poc-mahesh.blogspot.in/2017/01/hadoop-excel-input-format-to-read-any.html
Like this below created 3000 records on my own:-
ATTRIBUTES are like:-
1)
SENSEX_ID
2)
SENSEX_NAME
3)
TYPE_OF_TRADING
4)
SENSEX_LOCATION
5)
OPENING_BAL
6)
CLOSING_BAL
7)
FLUCTUATION_RATE
EXAMPLE:
100001 BSE SIP NEW_YORK 23486 24876 28
DOWNLOAD MY INPUT FILE FROM BELOW LINK:
https://drive.google.com/file/d/0BzYUKIo7aWL_cHN3Q2lIVHB0NTg/view?usp=sharing
https://drive.google.com/file/d/0BzYUKIo7aWL_cHN3Q2lIVHB0NTg/view?usp=sharing
PROBLEM STATEMENT: -
1)
Take the complete EXCEL Input data on HDFS.
2)
Develop a Map
Reduce Use Case to get the below filtered results from the HDFS Input data (.xls EXCEL data).
2.1)
If TYPEOFTRADING
is -->'SIP'.
2.1.1)
If OPEN_BALANCE
> 25000 & FLTUATION_RATE > 10 --> store
"HighDemandMarket".
2.1.2)
If CLOSING_BALANCE<22000
& FLTUATION_RATE IN BETWEEN 20 - 30
--> store "OnGoingMarketStretegy".
2.2)
If TYPEOFTRADING
is -->'SHORTTERM’.
2.2.1)
If OPEN_BALANCE < 5000 --> store
"WealthyProducts".
2.2.2)
If SensexLoc
--> "NewYork OR California"
--> “ReliableProducts”
ELSE
2.3)
store in
"OtherProducts".
NOTE: In the mentioned file names, only
5 outputs have to be generated
Example: “WealthyProducts-r-00000”
3)
Develop a PIG
Script to filter the Map Reduce Output in the below fashion.
3.1) Provide the Unique data
3.2)
Sort the Unique
data based on Patient_ID.
4.)
EXPORT the same
PIG Output from HDFS to MySQL using SQOOP.
5.)
Store the same PIG
Output in a HIVE External Table
NOTE:- For this POC I have used custom input format to read EXCEL files using external jar. So the corresponding jar files to be added during coding and to the lib directory of hadoop for successful execution. You can use poi-xml jar for the reading .xlsx file (2010 onwards excel format ).
Below is the steps to make it work...
1. Download and Install ant from below link.
http://muug.ca/mirror/apache-dist//ant/binaries/apache-ant-1.9.8-bin.tar.gz
tar -xzvf <apache ant Path>
3. Update bashrc:-
nano ~/.bashrc
Add below two lines:-
export ANT_HOME=${ant_dir}
export PATH=${ANT_HOME}/bin
Now Source bashrc by command:
source ~/.bashrc
4. Then restart the system. (Very Important for the effect to take place)
5. Download the required Jar files from below link:
Place both jar files during Eclipse compilation and only SNAPSHOT.jar in hadoop lib directory.
6. If still not working try to add CLASSPATH:
export CLASSPATH=.:$CLASSPATH:<Path to the jar file 1>:<Path to jar file 2>
Hope it will work now.
POC
Processing Details
MAP
REDUCE PROCESS IN DETAILS:-
1.
TO TAKE XLS INPUT DATA ON HDFS
hadoop
fs -mkdir /Pocinput
hadoop
fs -put SENSEX.xls /Pocinput
jar
xvf POC.jar
2.
MAP REDUCE CODES:-
EXCEL INPUT DRIVER
(MAIN DRIVER CLASS)
(MAIN DRIVER CLASS)
package com.poc;
import
java.io.IOException;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import
org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import
org.apache.hadoop.util.GenericOptionsParser;
public
class PocDriver {
static public int count = 0;
public static void main(String[]
args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new
Configuration();
GenericOptionsParser
parser = new GenericOptionsParser(conf, args);
args =
parser.getRemainingArgs();
Job job = new Job(conf,
"Sensex_Log");
job.setJarByClass(PocDriver.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(ExcelInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
LazyOutputFormat.setOutputFormatClass(job,
TextOutputFormat.class);
FileInputFormat.addInputPath(job,
new Path(args[0]));
FileOutputFormat.setOutputPath(job,
new Path(args[1]));
job.setMapperClass(PocMapper.class);
job.setReducerClass(PocReducer.class);
MultipleOutputs.addNamedOutput(job,
"HighDemandMarket", TextOutputFormat.class, IntWritable.class,
Text.class);
MultipleOutputs.addNamedOutput(job,
"OnGoingMarketStretegy", TextOutputFormat.class, IntWritable.class,Text.class);
MultipleOutputs.addNamedOutput(job,
"WealthyProducts", TextOutputFormat.class, IntWritable.class,
Text.class);
MultipleOutputs.addNamedOutput(job,
"ReliableProducts", TextOutputFormat.class, IntWritable.class,
Text.class);
MultipleOutputs.addNamedOutput(job,
"OtherProducts", TextOutputFormat.class, IntWritable.class,
Text.class);
System.exit(job.waitForCompletion(true)
? 0 : 1);
}
}
EXCEL INPUT FORMAT
(CUSTOM INPUT FORMAT TO READ EXCEL FILES)
(CUSTOM INPUT FORMAT TO READ EXCEL FILES)
package com.poc;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class ExcelInputFormat extends
FileInputFormat<LongWritable, Text> {
@Override
public
RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
TaskAttemptContext context)
throws
IOException, InterruptedException {
return
new ExcelRecordReader();
}
}
EXCEL RECORD READER
(TO READ EXCEL FILE AND SEND AS KEY, VALUE FORMAT)
(TO READ EXCEL FILE AND SEND AS KEY, VALUE FORMAT)
package com.poc;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import com.sreejithpillai.excel.parser.ExcelParser;
public class ExcelRecordReader extends
RecordReader<LongWritable, Text> {
private
LongWritable key;
private
Text value;
private
InputStream is;
private
String[] strArrayofLines;
@Override
public
void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws
IOException, InterruptedException {
FileSplit
split = (FileSplit) genericSplit;
Configuration
job = context.getConfiguration();
final
Path file = split.getPath();
FileSystem
fs = file.getFileSystem(job);
FSDataInputStream
fileIn = fs.open(split.getPath());
is
= fileIn;
String
line = new ExcelParser().parseExcelData(is);
this.strArrayofLines
= line.split("\n");
}
@Override
public
boolean nextKeyValue() throws IOException, InterruptedException {
if
(key == null) {
key
= new LongWritable(0);
value
= new Text(strArrayofLines[0]);
}
else {
if
(key.get() < (this.strArrayofLines.length - 1)) {
long
pos = (int) key.get();
key.set(pos
+ 1);
value.set(this.strArrayofLines[(int)
(pos + 1)]);
pos++;
}
else {
return
false;
}
}
if
(key == null || value == null) {
return
false;
}
else {
return
true;
}
}
@Override
public
LongWritable getCurrentKey() throws IOException, InterruptedException {
return
key;
}
@Override
public
Text getCurrentValue() throws IOException, InterruptedException {
return
value;
}
@Override
public
float getProgress() throws IOException, InterruptedException {
return
0;
}
@Override
public
void close() throws IOException {
if
(is != null) {
is.close();
}
}
}
EXCEL PARSER
(TO PARSE EXCEL SHEET)
(TO PARSE EXCEL SHEET)
package com.poc;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import
org.apache.commons.logging.LogFactory;
import
org.apache.poi.hssf.usermodel.HSSFSheet;
import
org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
public
class ExcelParser {
private static final Log LOG =
LogFactory.getLog(ExcelParser.class);
private StringBuilder currentString
= null;
private long bytesRead = 0;
public String
parseExcelData(InputStream is) {
try {
HSSFWorkbook
workbook = new HSSFWorkbook(is);
HSSFSheet
sheet = workbook.getSheetAt(0);
Iterator<Row>
rowIterator = sheet.iterator();
currentString
= new StringBuilder();
while
(rowIterator.hasNext()) {
Row
row = rowIterator.next();
Iterator<Cell>
cellIterator = row.cellIterator();
while
(cellIterator.hasNext()) {
Cell
cell = cellIterator.next();
switch
(cell.getCellType()) {
case
Cell.CELL_TYPE_BOOLEAN:
bytesRead++;
currentString.append(cell.getBooleanCellValue()
+ "\t");
break;
case
Cell.CELL_TYPE_NUMERIC:
bytesRead++;
currentString.append(cell.getNumericCellValue()
+ "\t");
break;
case
Cell.CELL_TYPE_STRING:
bytesRead++;
currentString.append(cell.getStringCellValue()
+ "\t");
break;
}
}
currentString.append("\n");
}
is.close();
} catch (IOException e)
{
LOG.error("IO
Exception : File not found " + e);
}
return
currentString.toString();
}
public long getBytesRead() {
return bytesRead;
}
}
EXCEL MAPPER
(HAVING MAPPER LOGIC)
(HAVING MAPPER LOGIC)
package com.poc;
import
java.io.IOException;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
public
class PocMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key,
Text value, Context context) throws IOException, InterruptedException {
try {
if
(value.toString().contains("SENSEX_NAME") &&
value.toString().contains("TYPE_OF_TRADING"))
return;
else {
String[]
str = value.toString().split(" ");
String
data = "";
for
(int i = 0; i < str.length; i++) {
if
(str[i] != null || str[i] != " ") {
data
+= (str[i] + " ");
}
}
String
dr1 = data.trim().replaceAll("\\s+", "\t");
String[]
str1 = dr1.toString().split("\t");
int
id = (int)Double.parseDouble(str1[0]);
int
flucrate = (int)Double.parseDouble(str1[6]);
int
openbal = (int)Double.parseDouble(str1[4]);
int
closebal = (int)Double.parseDouble(str1[5]);
String
dr = Integer.toString(id)+"\t"+str1[1]+"\t"+str1[2]+"\t"+str1[3]+"\t"+Integer.toString(openbal)+"\t"+Integer.toString(closebal)+"\t"+Integer.toString(flucrate);
context.write(new
Text(""), new Text(dr));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
EXCEL REDUCER
(HAVING REDUCER LOGIC)
(HAVING REDUCER LOGIC)
package com.poc;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class PocReducer extends Reducer<Text, Text,
IntWritable, Text> {
MultipleOutputs<IntWritable,
Text> mos;
@Override
public
void setup(Context context) {
mos
= new MultipleOutputs<IntWritable, Text>(context);
}
@Override
public
void reduce(Text k1, Iterable<Text> k2, Context context) throws
IOException, InterruptedException {
while
(k2.iterator().hasNext()) {
String
sr = k2.iterator().next().toString();
String
sr1 = sr.trim().replaceAll("\\s+", "\t");
String[]
str1 = sr1.split("\t");
int
id = (int)Double.parseDouble(str1[0]);
String
s = "NEW_YORK";
String
s1 = "CALIFORNIA";
int
flucrate = (int)Double.parseDouble(str1[6]);
int
openbal = (int)Double.parseDouble(str1[4]);
int
closebal = (int)Double.parseDouble(str1[5]);
String
dr =
Integer.toString(id)+"\t"+str1[1]+"\t"+str1[2]+"\t"+str1[3]+"\t"+Integer.toString(openbal)+"\t"+Integer.toString(closebal)+"\t"+Integer.toString(flucrate);
if
(str1[2].equalsIgnoreCase("SIP")) {
if
(openbal > 25000 && flucrate > 10) {
mos.write("HighDemandMarket",
null, new Text(dr), "/SensexLog/HighDemandMarket");
}
else if (closebal < 22000) {
mos.write("OnGoingMarketStretegy",
null, new Text(dr), "/SensexLog/OnGoingMarketStretegy");
}
else {
mos.write("OtherProducts",
null, new Text(dr), "/SensexLog/OtherProducts");
}
}
else if (str1[2].equalsIgnoreCase("SHORTTERM")) {
if
(openbal > 5000) {
mos.write("WealthyProducts",
null, new Text(dr), "/SensexLog/WealthyProducts");
}
else if (str1[3].toLowerCase().contains(s.toLowerCase()) ||
str1[3].toLowerCase().contains(s1.toLowerCase())) {
mos.write("ReliableProducts",
null, new Text(dr), "/SensexLog/ReliableProducts");
}
else {
mos.write("OtherProducts",
null, new Text(dr), "/SensexLog/OtherProducts");
}
}
else {
mos.write("OtherProducts",
null, new Text(dr), "/SensexLog/OtherProducts");
}
}
}
@Override
protected
void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
EXECUTING
THE MAP REDUCE CODE
hadoop jar SensexLog.jar com.poc.PocDriver /Input/SENSEX.xls /Sensex
Goto Firefox and open name node
page by following command:
http://localhost:50070 and browse the file system , then
click on HealthCarePOC directory to check the files created.
3.
PIG SCRIPT
To execute Pig Script file from any directory in terminal type the following command:-
1. For LFS file:
pig -x local <Scriptname.pig>
2. For HDFS files:
pig <Scriptname.pig>
In this case since our data is in HDFS the follwing command should be used:
pig PigScript.pig
PigScript1.pig
A = LOAD '/
SensexLog /' USING PigStorage
('\t') AS (id:int, Name:chararray, age:int, gender:chararray, disease:chararray,
hospital:chararray, admitdate:chararray, address:chararray);
B = DISTINCT A;
DUMP B;
PigScript2.pig
A = LOAD '/ SensexPOC /' USING PigStorage ('\t') AS
(id:int, Name:chararray, trade:chararray, openbal:int, closebal:int,
flucrate:int);
B = DISTINCT A;
C = ORDER B by Sid;
STORE C INTO '/SensexPOC';
4.
EXPORT the PIG Output from HDFS to MySQL using SQOOP
sqoop eval --connect jdbc:mysql://localhost/
--username root --password root --query "create database if not exists
SENSEX;";
sqoop eval --connect jdbc:mysql://localhost/
--username root --password root --query "use SENSEX;";
sqoop eval --connect jdbc:mysql://localhost/SENSEX --username
root --password root --query "grant all privileges on HEALTHCARE.* to ‘localhost’@’%’;”;
sqoop eval --connect jdbc:mysql://localhost/SENSEX --username
root --password root --query "grant all privileges on HEALTHCARE.* to ‘’@’localhost’;”;
sqoop eval --connect jdbc:mysql://localhost/SENSEX
--username root --password root --query "create table sensexlogpoc(id int, name varchar(50), trade varchar(50), loc varchar(200), openbal
int, closebal int, flucrate int);";
sqoop export --connect jdbc:mysql://localhost/SENSEX
--table sensexlogpoc --export-dir /SensexPOC --fields-terminated-by '\t';
5.
STORE THE PIG OUTPUT IN A HIVE EXTERNAL TABLE
goto hive shell using command:
hive
show databases;
create database SensexPOC;
use SensexPOC;
create external table sensexpoc(id int, Name string, trading
string, loc string, openbal int, closebal int, flucrate int)
row format delimited
fields terminated by '\t'
stored as textfile location '/SensexPOC';
select * from sensexlogpoc;
Hope you all understood the procedures...
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).
ALL D BEST...