Tuesday, January 17, 2017

HADOOP POC ON EXCEL DATA WEATHER REPORT ANALYSIS

Hello Friends,


Glad to present this blog which is for analysis of Weather Report POC, which is in Excel Format. This POC  was given to me and asked by one of my friends to complete it.

Most of the time we get data in Excel Format and according to that we have to make changes in our coding. So, in this POC I have modified my previous code to accept the excel data, for the convenience of making you all understand the concept.

NOTE:- Though this POC is to read EXCEL data, I have not used the same in my coding but still it worked. (I have no idea how & why it happened. Kindly share if you know anything on the same.)
I worked out this POC on my previous POC's processed system.  So all required jar files for excel reading were already there in hadoop lib folder.
If you face any problem in reading the input file kindly use EXCEL INPUT FORMAT from my previous blog to read the data. 

UPDATE:- CORRECTION:- In this blog the Input file is not in Excel format. so it works directly without using Excel Input Format Class. (Please find the Excel Input File HERE and Compiled Coding Jar file HERE)

Problem Statement:


1. The system receives temperatures of various cities captured at regular intervals of time on each day in an input file.

2. All cities weather information for a week will be inputted to the system in a single input file.

3. System will process the input data file and generates a report with Maximum and Minimum temperatures of each day.

4. Generates a separate output report for each Month.

Ex: January-r-00000
February-r-00000
March-r-00000

5. Develop a PIG Script to filter the Map Reduce Output in the below fashion
- Provide the Unique data
- Sort the Unique data based on RETAIL_ID in DESC order

6. EXPORT the same PIG Output from HDFS to MySQL using SQOOP

7. Store the same PIG Output in a HIVE External Table.

Input File Format:- .xls (EXCEL Format)


This POC Input file and Problem statement was shared to me by Mr. Amol Wani which contains temperature statistics with time for multiple Months. Schema of record set is as shown in picture below :-





DOWNLOAD MY INPUT FILE FROM BELOW LINK:


https://drive.google.com/file/d/0BzYUKIo7aWL_WkFYdWU5QWdJLTA/view?usp=sharing

1. TO TAKE INPUT DATA ON HDFS


hadoop fs -mkdir /InputData
hadoop fs -put WeatherReport.txt /InputData
jar xvf WeatherPoc.jar 

(Please find my jar file HERE)



2.     MAP REDUCE CODES:-


WEATHER REPORT PROCESSOR 
(DRIVER CLASS)

NOTE:- If you face any problem in reading the input file kindly uncomment the following and add necessary class path & jar files.
// job.setInputFormatClass(ExcelInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);
// LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

Please go through my previous blog on Any Excel Data reading.

package com.poc.weather;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.poc.ExcelInputFormat;

public class WeatherReportProcessor {

public static String January = "January";
public static String February = "February";
public static String March = "March";
public static String April = "April";
public static String May = "May";
public static String June = "June";
public static String July = "July";
public static String August = "August";
public static String September = "September";
public static String October = "October";
public static String November = "November";
public static String December = "December";

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Weather Report");
job.setJarByClass(WeatherReportProcessor.class);

job.setMapperClass(WeatherMapper.class);
job.setReducerClass(WeatherReducer.class);

// job.setInputFormatClass(ExcelInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);
// LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

MultipleOutputs.addNamedOutput(job, January, TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, February, TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, March, TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, April, TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, May, TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, June, TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, July, TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, August, TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, September, TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, October, TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, November, TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, December, TextOutputFormat.class, Text.class, Text.class);
// job.setNumReduceTasks(0);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

WEATHER MAPPER 
(HAVING MAPPER LOGIC)

In Mapper, after reading input data from excel, I am removing the first two lines which doesn't contain any related data, and then splitting the entire data and taking only Date and Temperatures as my output from Mapper which be be used as input for Reducer.

package com.poc.weather;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WeatherMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
if (value.toString().contains("ID") || value.toString().contains("mm"))
return;
else {
String[] str = value.toString().split(" ");
String data = "";
for (int i = 0; i < str.length; i++) {
if (str[i] != null || str[i] != " ") {
data += (str[i] + " ");

}
}
String Trim = data.trim().replaceAll("\\s+", "\t");
String[] Split = Trim.toString().split("\t");
String Date = Split[1] + Split[2] + Split[3] + Split[4] + Split[5];
String Temp = Split[9] + "\t" + Split[10];
context.write(new Text(Date), new Text(Temp));

}
} catch (Exception e) {
e.printStackTrace();
}
}
}

WEATHER REDUCER 
(HAVING REDUCER LOGIC)

In Reducer phase taking the output from Mapper, I am splitting the temperatures to get max and min temp. and comparing them with other data of different hours from a single day to get the max and min temp of that day.
After getting the max and min temp, I am checking the date for sorting them into different months.

package com.poc.weather;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class WeatherReducer extends Reducer<Text, Text, Text, Text> {

MultipleOutputs<Text, Text> mos;

public void setup(Context context) {
mos = new MultipleOutputs<Text, Text>(context);
}

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
float f1 = 0, f2 = 50;
Text result = new Text();

while (values.iterator().hasNext()) {
String sr = values.iterator().next().toString();
String[] str1 = sr.split("\t");
float max = Float.parseFloat(str1[0]);
float min = Float.parseFloat(str1[1]);

if (max > f1) {
f1 = max;
} else if (min < f2) {
f2 = min;
}

}

result = new Text(Float.toString(f1) + "\t" + Float.toString(f2));

String fileName = "";
if (key.toString().contains("/01/")) {
fileName = WeatherReportProcessor.January;
} else if (key.toString().contains("/02/")) {
fileName = WeatherReportProcessor.February;
} else if (key.toString().contains("/03/")) {
fileName = WeatherReportProcessor.March;
} else if (key.toString().contains("/04/")) {
fileName = WeatherReportProcessor.April;
} else if (key.toString().contains("/05/")) {
fileName = WeatherReportProcessor.May;
} else if (key.toString().contains("/06/")) {
fileName = WeatherReportProcessor.June;
} else if (key.toString().contains("/07/")) {
fileName = WeatherReportProcessor.July;
} else if (key.toString().contains("/08/")) {
fileName = WeatherReportProcessor.August;
} else if (key.toString().contains("/09/")) {
fileName = WeatherReportProcessor.September;
} else if (key.toString().contains("/10/")) {
fileName = WeatherReportProcessor.October;
} else if (key.toString().contains("/11/")) {
fileName = WeatherReportProcessor.November;
} else if (key.toString().contains("/12/")) {
fileName = WeatherReportProcessor.December;
}
// String strArr[] = key.toString().split("_");
// key.set(strArr[1]);
mos.write(fileName, key, result);
}

@Override
public void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}

}



3. EXECUTING THE MAP REDUCE CODE


hadoop jar WeatherPoc.jar com.poc.weather.WeatherReportProcessor /InputData/WeatherReport.xls /WeatherOutput



We can clearly see that the input records is 8986 but the output is 365. ; It has sorted the data into number of days in a year which has been kept in different months as specified in coding.







4.     PIG SCRIPT

PigScript1.pig

A = LOAD '/WeatherReport/' USING PigStorage ('\t') AS (date:chararray, mintemp:float, maxtemp:float);

B = DISTINCT A;
DUMP B; 





PigScript2.pig

A = LOAD '/WeatherReport/' USING PigStorage ('\t') AS (date:chararray, mintemp:float, maxtemp:float);

B = DISTINCT A;
C = ORDER B BY date DESC;
STORE C INTO '/WeatherPOC'; 







5.     EXPORT the PIG Output from HDFS to MySQL using SQOOP

sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "create database if not exists WEATHERPOC;";


sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "use WEATHERPOC;";



sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "grant all privileges on WEATHERPOC.* to ‘localhost’@’%’;”;

sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "grant all privileges on WEATHERPOC.* to ‘’@’localhost’;”;


sqoop eval --connect jdbc:mysql://localhost/WEATHERPOC --username root --password root --query "create table weatherpoc(date varchar(50), mintemp float, maxtemp float);";


sqoop export --connect jdbc:mysql://localhost/WEATHERPOC --table weatherpoc --export-dir /WeatherPOC --fields-terminated-by '\t';



6.     STORE THE PIG OUTPUT IN A HIVE EXTERNAL TABLE

Goto hive shell using command:

hive

show databases;
create database WeatherPOC;
use WeatherPOC;



create external table weatherpoc(Name string, mintemp float, maxtemp float)
row format delimited
fields terminated by '\t'
stored as textfile location '/WeatherPOC';






Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).

ALL D BEST...






Monday, January 16, 2017

HADOOP (PROOF OF CONCEPTS) WEATHER REPORT ANALYSIS

Hello Friends,


Welcome back... This blog is for analysis of Weather Report POC which was given to me and asked by one of my friends to complete it. While searching for the same I came across a very good website which I just can't wait to share with you all.. In this POC I have modified and used both for convenience of making you all understand the concept.

Problem Statement:


1. The system receives temperatures of various cities captured at regular intervals of time on each day in an input file.

2. All cities weather information for a week will be inputted to the system in a single input file.

3. System will process the input data file and generates a report with Maximum and Minimum temperatures of each day.

4. Generates a separate output report for each city.

Ex: California-r-00000
Newjersy-r-00000
Newyork-r-00000

5. Develop a PIG Script to filter the Map Reduce Output in the below fashion
- Provide the Unique data
- Sort the Unique data based on RETAIL_ID in DESC order

6. EXPORT the same PIG Output from HDFS to MySQL using SQOOP

7. Store the same PIG Output in a HIVE External Table.

Input File Format:- .txt


This POC Input file and Problem statement was shared to me by Mr. Amol Wani which contains temperature statistics with time for multiple cities.Schema of record set:-

CA_25-Jan-2014 00:12:345 15.7 01:19:345 23.1 02:34:542 12.3 03:12:187 16 04:00:093 -14 05:12:345 35.7 06:19:345 23.1 07:34:542 12.3 08:12:187 16 09:00:093 -7 10:12:345 15.7 11:19:345 23.1 12:34:542 -22.3 13:12:187 16 14:00:093 -7 15:12:345 15.7 16:19:345 23.1 19:34:542 12.3 20:12:187 16 22:00:093 -7

CA is city code, here it stands for California followed by date. After that each pair of values represent time and temperature.



DOWNLOAD MY INPUT FILE FROM BELOW LINK:



1. TO TAKE INPUT DATA ON HDFS


hadoop fs -mkdir /InputData
hadoop fs -put weather_report.txt /InputData
jar xvf WeatherReportPoc.jar 




2.     MAP REDUCE CODES:-


WEATHER REPORT PROCESSOR 
(DRIVER CLASS)

package com.poc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WeatherReportProcessor {

public static String caOutputName = "California";
public static String nyOutputName = "Newyork";
public static String njOutputName = "Newjersy";
public static String ausOutputName = "Austin";
public static String bosOutputName = "Boston";
public static String balOutputName = "Baltimore";

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Weather Report");
job.setJarByClass(WeatherReportProcessor.class);

job.setMapperClass(WeatherMapper.class);
job.setReducerClass(WeatherReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FloatWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

MultipleOutputs.addNamedOutput(job, caOutputName, TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, nyOutputName, TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, njOutputName, TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, bosOutputName, TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, ausOutputName, TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, balOutputName, TextOutputFormat.class, Text.class, Text.class);
// job.setNumReduceTasks(0);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

WEATHER MAPPER 
(HAVING MAPPER LOGIC)

package com.poc;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WeatherMapper extends Mapper<Object, Text, Text, FloatWritable> {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text dayReport, Context context)
throws IOException, InterruptedException {
StringTokenizer st2 = new StringTokenizer(dayReport.toString(), "\t");

int counter = 0;
String cityDateString = "";
String maxTempTime = "";
String minTempTime = "";
String curTime = "";
float curTemp = 0;
float minTemp = Float.MAX_VALUE;
float maxTemp = Float.MIN_VALUE;

while (st2.hasMoreElements()) {
if (counter == 0) {
cityDateString = st2.nextToken();
} else {
if (counter % 2 == 1) {
curTime = st2.nextToken();
} else if (counter % 2 == 0) {
curTemp = Float.parseFloat(st2.nextToken());
if (minTemp > curTemp) {
minTemp = curTemp;
minTempTime = curTime;
} else if (maxTemp < curTemp) {
maxTemp = curTemp;
maxTempTime = curTime;
}
}
}
counter++;
}

FloatWritable fValue = new FloatWritable();
Text cityDate = new Text();

fValue.set(maxTemp);
cityDate.set(cityDateString);
context.write(cityDate, fValue);

fValue.set(minTemp);
cityDate.set(cityDateString);
context.write(cityDate, fValue);
}
}


WEATHER REDUCER 
(HAVING REDUCER LOGIC)

package com.poc;

import java.io.IOException;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class WeatherReducer extends Reducer<Text, FloatWritable, Text, Text> {

MultipleOutputs<Text, Text> mos;

public void setup(Context context) {
mos = new MultipleOutputs<Text, Text>(context);
}

public void reduce(Text key, Iterable<FloatWritable> values, Context context)
throws IOException, InterruptedException {
int counter = 0;
float f1 = 0, f2 = 0;
Text result = new Text();

for (FloatWritable value : values) {
if (counter == 0)
f1 = value.get();
else
f2 = value.get();

counter = counter + 1;
}
if (f1 > f2) {
result = new Text(Float.toString(f2) + "\t" + Float.toString(f1));
} else {
result = new Text(Float.toString(f1) + "\t" + Float.toString(f2));
}
String fileName = "";
if (key.toString().contains("CA")) {
fileName = WeatherReportProcessor.caOutputName;
} else if (key.toString().contains("NY")) {
fileName = WeatherReportProcessor.nyOutputName;
} else if (key.toString().contains("NJ")) {
fileName = WeatherReportProcessor.njOutputName;
} else if (key.toString().substring(0, 3).equals("AUS")) {
fileName = WeatherReportProcessor.ausOutputName;
} else if (key.toString().substring(0, 3).equals("BOS")) {
fileName = WeatherReportProcessor.bosOutputName;
} else if (key.toString().substring(0, 3).equals("BAL")) {
fileName = WeatherReportProcessor.balOutputName;
}

String strArr[] = key.toString().split("_");
key.set(strArr[1]);
mos.write(fileName, key, result);
}

@Override
public void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}

}

3. EXECUTING THE MAP REDUCE CODE


hadoop jar WeatherReportPoc.jar com.poc.WeatherReportProcessor /InputData/weather_report.txt /WeatherOutput






Explanation:- 


In map method, we are parsing each input line and maintains a counter for extracting date and each temperature & time information.For a given input line, first extract date(counter ==0) and followed by alternatively extract time(counter%2==1) since time is on odd number position like (1,3,5....) and get temperature otherwise. Compare for max & min temperature and store it accordingly. Once while loop terminates for a given input line, write maxTempTime and minTempTime with date.

In reduce method, for each reducer task, setup method is executed and create MultipleOutput object. For a given key, we have two entry (maxtempANDTime and mintempANDTime). Iterate values list , split value and get temperature & time value. Compare temperature value and create actual value sting which reducer write in appropriate file.

In main method,a instance of Job is created with Configuration object. Job is configured with mapper, reducer class and along with input and output format. MultipleOutputs information added to Job to indicate file name to be used with input format.

4.     PIG SCRIPT

PigScript1.pig

A = LOAD '/WeatherOutput/' USING PigStorage ('\t') AS (date:chararray, mintemp:float, maxtemp:float);

B = DISTINCT A;
DUMP B; 



(In the output we can clearly see that it is reading all files and as we have given DISTINCT command it is removing duplicate entries)

PigScript2.pig

A = LOAD '/WeatherOutput/' USING PigStorage ('\t') AS (date:chararray, mintemp:float, maxtemp:float);

B = DISTINCT A;
C = ORDER B BY date DESC;
STORE C INTO '/WEATHERPOC'; 


NOTE:- Don't give DISTINCT command if you want to export all entries.






5.     EXPORT the PIG Output from HDFS to MySQL using SQOOP


sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "create database if not exists WEATHER;";




sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "use WEATHER;";



sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "grant all privileges on WEATHER.* to ‘localhost’@’%’;”;

sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "grant all privileges on WEATHER.* to ‘’@’localhost’;”;



sqoop eval --connect jdbc:mysql://localhost/WEATHER --username root --password root --query "create table weatherpoc(date varchar(50), mintemp float, maxtemp float);";


sqoop export --connect jdbc:mysql://localhost/WEATHER--table weatherpoc --export-dir /WEATHERPOC --fields-terminated-by '\t';



6.     STORE THE PIG OUTPUT IN A HIVE EXTERNAL TABLE

Goto hive shell using command:

hive

show databases;
create database WeatherPOC;
use WeatherPOC;


create external table weatherpoc(Name string, mintemp float, maxtemp float)
row format delimited
fields terminated by '\t'
stored as textfile location '/WEATHERPOC';



Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).
ALL D BEST...