Friday, February 10, 2017

MULTIPLE OUTPUT WITH MULTIPLE INPUT FILE NAME

Dear Friends,


I was being asked to solve how to process different files at a time and store the same under each file name. Its a real-time problem where say for example, you have log files from different places and you have to process the  same logic on all but have to store it in different file name. How to do this????

In this Blog, I will take you through how to do the same using simple multiple output method in  MapReduce program. Here I am using wordcount program logic.

Problem Statement is as below.
1. N no.of input files will be in HDFS. Each input file is having list of sentences/words.
2. Write a Mapreduce program which will give wordcount of each input file in corresponding part-r file. Where part-r filename has to be <input file name> -r-0000.

The problem statement though looks difficult yet very easy to understand and implement. (Just think simple and logically).

Solution:-

The simple logical solution is:-
1. Extract the name of each file using FileSplit method.
2. Give output of the each file after processing as the name extracted by FileSplit using multiple output method.



DOWNLOAD MY INPUT FILE FROM BELOW LINK:

https://drive.google.com/file/d/0BzYUKIo7aWL_M0s2UFRKS2xoMVE/view?usp=sharing




1. TO TAKE INPUT DATA ON HDFS


hadoop fs -mkdir /Input
hadoop fs -put Input* /Input
jar xvf mulout.jar 






2. MAP REDUCE CODES:-


DRIVER CLASS


package com.mulout.wordcount;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Multiwordcnt {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

Configuration conf = new Configuration();
Job myJob = new Job(conf, "Multiwordcnt");
args = new GenericOptionsParser(conf, args).getRemainingArgs();
FileSystem fs = FileSystem.get(new Configuration());
fs.delete(new Path("/NewOut/"), true);

myJob.setJarByClass(Multiwordcnt.class);
myJob.setMapperClass(MyMapper.class);
myJob.setReducerClass(MyReducer.class);
myJob.setMapOutputKeyClass(Text.class);
myJob.setMapOutputValueClass(IntWritable.class);
// myJob.setNumReduceTasks(0);
myJob.setOutputKeyClass(Text.class);
myJob.setOutputValueClass(IntWritable.class);
LazyOutputFormat.setOutputFormatClass(myJob, TextOutputFormat.class);

myJob.setInputFormatClass(TextInputFormat.class);
myJob.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(myJob, new Path(args[0]));
FileOutputFormat.setOutputPath(myJob, new Path(args[1]));

System.exit(myJob.waitForCompletion(true) ? 0 : 1);
}

}


EXPLANATION:- In driver class LazyOutputFormat is used to store the file in -r-0000 format, without using the same we will not get output.
(Here I have used delete syntax to delete if the existing folder is there in HDFS.)

MAPPER CLASS


package com.mulout.wordcount;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

Text emitkey = new Text();
IntWritable emitvalue = new IntWritable(1);

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String filePathString = ((FileSplit) context.getInputSplit()).getPath().getName().toString();
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {

String filepathword = filePathString + "*" + tokenizer.nextToken();
emitkey.set(filepathword);
context.write(emitkey, emitvalue);
}
}
}

EXPLANATION:- In Mapper class we took the File Input Name using FileSplit menthod and combined that with the individual word and kept as output key. Then we assinged 1 for each word as output value for futher processing in reducer

REDUCER CLASS


package com.mulout.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
Text emitkey = new Text();
IntWritable emitvalue = new IntWritable();
private MultipleOutputs<Text, IntWritable> multipleoutputs;

public void setup(Context context) throws IOException, InterruptedException {
multipleoutputs = new MultipleOutputs<Text, IntWritable>(context);
}

public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;

for (IntWritable value : values) {
sum = sum + value.get();
}
String pathandword = key.toString();
String[] splitted = pathandword.split("\\*");
String path = splitted[0];
String word = splitted[1];
emitkey.set(word);
emitvalue.set(sum);
System.out.println("word:" + word + "\t" + "sum:" + sum + "\t" + "path:  " + path);
multipleoutputs.write(emitkey, emitvalue, ("/NewOut/"+path));
}

public void cleanup(Context context) throws IOException, InterruptedException {
multipleoutputs.close();
}
}

EXPLANATION:- In reducer class we splitted the key containing Input File Name and added all 1 to get sum of number of times the word occurred and then used multiple output method with 3 parameters <Key,Value,Path> to display our result in individual File Name.
(Here I have used additional output folder "/NewOut/" for storing my results.)



3. EXECUTING THE MAP REDUCE CODE


Command > hadoop jar mulout.jar com/mulout/wordcount/Multiwordcnt /Input /Out1







That's all....

Now you can take N number of Input files and process it and store it in same File name.



Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).
ALL D BEST...


3 comments:

  1. Understanding Hadoop By Mahesh Maharana: Multiple Output With Multiple Input File Name >>>>> Download Now

    >>>>> Download Full

    Understanding Hadoop By Mahesh Maharana: Multiple Output With Multiple Input File Name >>>>> Download LINK

    >>>>> Download Now

    Understanding Hadoop By Mahesh Maharana: Multiple Output With Multiple Input File Name >>>>> Download Full

    >>>>> Download LINK

    ReplyDelete
  2. Understanding Hadoop By Mahesh Maharana: Multiple Output With Multiple Input File Name >>>>> Download Now

    >>>>> Download Full

    Understanding Hadoop By Mahesh Maharana: Multiple Output With Multiple Input File Name >>>>> Download LINK

    >>>>> Download Now

    Understanding Hadoop By Mahesh Maharana: Multiple Output With Multiple Input File Name >>>>> Download Full

    >>>>> Download LINK 44

    ReplyDelete