INDUSTRY: HEALTHCARE
DATA INPUT FORMAT :- PDF (My Input Data is in
PDF Format)
Like this below created 3000 tab separated records on my own:-
ATTRIBUTES are like:-
- PATIENT_ID
- PATINENT_NAME
- AGE
- GENDER
- DISEASE_INFO
- HOSPITAL_NAME
- ADMITED_DATE
- ADDRESSOFPATIENT
EXAMPLE:-
100001 Sarath_Sexsena 39 Male Cancer Care Hospital
2015-15-04 2ndCross,BTRoad ,Mumbai
DOWNLOAD MY INPUT FILE FROM BELOW LINK:
https://drive.google.com/file/d/0BzYUKIo7aWL_c3ZDZHZGaFhfOUk/view?usp=sharing
PROBLEM STATEMENT: -
https://drive.google.com/file/d/0BzYUKIo7aWL_c3ZDZHZGaFhfOUk/view?usp=sharing
PROBLEM STATEMENT: -
1)
Take the complete
PDF Input data on HDFS.
2)
Develop a Map
Reduce Use Case to get the below filtered results from the HDFS Input data (PDF
data).
2.1)
If age > 35
&& Gender is ‘Male’.
2.1.1)
If DISEASE_INFO is
Cancer OR TB --> store the results
in "EmergencyCare".
2.1.2)
If ‘ADMITED_DATE’
IN BETWEEN 2015-04-01 to2015-07-31
--> store "SeasonalCareSection”.
2.2)
If age >50 && Gender ‘Male’ or ‘Female’.
2.2.1)
If address is ‘Jharkhand’;
--> store "Dengue-Care Section".
2.2.2)
If ‘ADMITED_DATE’
IN BETWEEN 2015-09-01 to2015-12-31 --> ‘WinderSeasonal-Care Section’
ELSE
2.3)
Store in
"General Care Section"
NOTE: In the mentioned file names, only
5 outputs have to be generated
3)
Develop a PIG
Script to filter the Map Reduce Output in the below fashion.
3.1)
Provide the Unique
data
3.2)
Sort the Unique
data based on Patient_ID.
4.)
EXPORT the same
PIG Output from HDFS to MySQL using SQOOP.
5.)
Store the same PIG
Output in a HIVE External Table
NOTE:- For this POC I have used custom input format to read PDF files using itextpdf. So the corresponding jar files itextpdf 5.1.jar to be added during coding and to the lib directory of hadoop for successful execution. You can use pdfbox for the same but the coding will be different.
POC
Processing Details
MAP
REDUCE PROCESS IN DETAILS:-
1.
TO TAKE PDF INPUT DATA ON HDFS
COMMANDS:-
hadoop
fs -mkdir /Pocinput (To create a directory in HDFS)
hadoop
fs -put POC.pdf /Pocinput (To put/load the PDF file in HDFS)
jar
xvf POC.jar (To see the classes used in eclipse jar file and use the driver class)
1.
MAP REDUCE CODES:-
Below is the main class of the MR coding where a job configuration is declared, driver class jar are set along with mapper & reducer classes. Both Input file format and Output file format along with keys are set. Since this coding is using multiple output method to produce output in multiple file we are using multiple output object variable. The MR Process will start from here.
PDF INPUT DRIVER (DRIVER CLASS)
package com.poc;
import java.io.IOException;
import
org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import
org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import
org.apache.hadoop.util.GenericOptionsParser;
public class PdfInputDriver {
static
public int count = 0;
public
static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException
{
Configuration
conf = new Configuration();
GenericOptionsParser
parser = new GenericOptionsParser(conf, args);
args
= parser.getRemainingArgs();
Job
job = new Job(conf, "Pdfwordcount");
job.setJarByClass(PdfInputDriver.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(PdfInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
LazyOutputFormat.setOutputFormatClass(job,
TextOutputFormat.class);
FileInputFormat.addInputPath(job,
new Path(args[0]));
FileOutputFormat.setOutputPath(job,
new Path(args[1]));
//
job.setNumReduceTasks(0);
job.setMapperClass(PdfMapper.class);
job.setReducerClass(PdfReducer.class);
MultipleOutputs.addNamedOutput(job,
"EmergencyCare", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job,
"SeasonalCareSection", TextOutputFormat.class, Text.class,
Text.class);
MultipleOutputs.addNamedOutput(job,
"DengueCareSection", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job,
"WinderSeasonalCareSection", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job,
"General", TextOutputFormat.class, Text.class, Text.class);
System.exit(job.waitForCompletion(true)
? 0 : 1);
}
}
PDF INPUT FORMAT
(CUSTOM INPUT FORMAT TO READ PDF FILES)
This is the Input format class declared in the Driver program which extends the basic File Input Format of MapReduce. This program reads the data from Record reader which send the values in Key & Value format.
package com.poc;
import java.io.IOException;
import
org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import
org.apache.hadoop.mapreduce.TaskAttemptContext;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class PdfInputFormat extends
FileInputFormat {
@Override
public
RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException,
InterruptedException
{
return
new PdfRecordReader();
}
}
PDF RECORD READER
(TO READ PDF FILE AND SEND AS KEY, VALUE FORMAT)
Below is the customized program which uses Itext to read the data from the PDF file and send it to the File Input Format in the form of Key,Value format. the Key is the ByteOffSet Longwriteable key and all the text in a line is the value.
package com.poc;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.FSDataInputStream;
import
org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.InputSplit;
import
org.apache.hadoop.mapreduce.RecordReader;
import
org.apache.hadoop.mapreduce.TaskAttemptContext;
import
org.apache.hadoop.mapreduce.lib.input.FileSplit;
import
com.itextpdf.text.pdf.PdfReader;
import
com.itextpdf.text.pdf.parser.PdfReaderContentParser;
import
com.itextpdf.text.pdf.parser.SimpleTextExtractionStrategy;
import
com.itextpdf.text.pdf.parser.TextExtractionStrategy;
public class PdfRecordReader
extends RecordReader<LongWritable, Text> {
private
int flag = 0;
private
LongWritable key = null;
private
Text value = null;
private
PdfReader reader;
private
PdfReaderContentParser parser;
private
TextExtractionStrategy strategy;
private
FSDataInputStream fileIn;
private
List<String> records = new ArrayList<String>();
public
void initialize(InputSplit genericSplit, TaskAttemptContext context) throws
IOException {
FileSplit
split = (FileSplit) genericSplit;
Configuration
conf = context.getConfiguration();
final
Path file = split.getPath();
FileSystem
fs = file.getFileSystem(conf);
this.fileIn
= fs.open(split.getPath());
this.reader
= new PdfReader(fileIn);
this.parser
= new PdfReaderContentParser(reader);
readRecords();
}
public
synchronized boolean nextKeyValue() throws IOException {
System.out.println("Executing
nextKey........Total Records : " + records.size() + "; Flag : "
+ (flag++));
int
index = 0;
if
(key == null) {
key
= new LongWritable(index);
}
else {
index
= (int) key.get();
key.set(++index);
}
if
(value == null) {
value
= new Text(records.get(index));
}
else {
value.set(records.get(index));
}
if
(flag == records.size()) {
return
false;
}
else {
return
true;
}
}
@Override
public
LongWritable getCurrentKey() {
return
key;
}
@Override
public
Text getCurrentValue() {
return
value;
}
public
float getProgress() {
return
0;
}
public
synchronized void close() throws IOException {
if
(fileIn != null) {
fileIn.close();
}
}
private
void readRecords() throws IOException {
if
(reader != null) {
for
(int i = 1; i <= reader.getNumberOfPages(); i++) {
strategy
= parser.processContent(i, new SimpleTextExtractionStrategy());
if
(strategy != null) {
StringTokenizer
tokens = new StringTokenizer(strategy.getResultantText(), "\n");
while
(tokens.hasMoreTokens()) {
records.add(tokens.nextToken());
}
}
}
reader.close();
}
return;
}
}
PDF MAPPER (HAVING
MAPPER LOGIC)
This is the Mapper class of the programming which checks the first line and if it is a Header then removes it and splits the entire line in TSV (Tab Separated Value) before sending it to next phase.
package com.poc;
import java.io.IOException;
import
org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
public class PdfMapper extends
Mapper<LongWritable, Text, Text, Text> {
public
void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
try
{
if
(value.toString().contains("PATIENT"))
return;
else
{
String[]
str = value.toString().split(" ");
String
data = "";
for
(int i = 0; i < str.length; i++) {
if
(str[i] != null || str[i] != " ") {
data
+= (str[i] + " ");
}
}
String
dr = data.trim().replaceAll("\\s+", "\t");
context.write(new
Text(""), new Text(dr));
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
PDF REDUCER
(HAVING REDUCER LOGIC)
This is the reducer programming which takes the value, splits it and checks according to the condition and if the condition is satisfied then places the entire value to respective output by using the multiple output format reference object.
package com.poc;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import
org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class PdfReducer extends
Reducer<Text, Text, IntWritable, Text> {
MultipleOutputs<IntWritable,
Text> mos;
@Override
public
void setup(Context context) {
mos
= new MultipleOutputs<IntWritable, Text>(context);
}
@Override
public
void reduce(Text k1, Iterable<Text> k2, Context context) throws
IOException, InterruptedException {
while
(k2.iterator().hasNext()) {
String
dr = k2.iterator().next().toString();
String[]
str1 = dr.split("\t");
int
age = Integer.parseInt(str1[2]);
String
s = "Jharkhand";
SimpleDateFormat
formatter = new SimpleDateFormat("dd/mm/yyyy");
String
dateInString = str1[6];
String
date1 = "01/04/2015";
String
date2 = "31/07/2015";
String
date3 = "01/09/2015";
String
date4 = "31/12/2015";
try
{
Date
date = formatter.parse(dateInString);
Date
dateTime1 = formatter.parse(date1);
Date
dateTime2 = formatter.parse(date2);
Date
dateTime3 = formatter.parse(date3);
Date
dateTime4 = formatter.parse(date4);
if
(age > 35 && str1[3].equalsIgnoreCase("Male")) {
if
(str1[4].equalsIgnoreCase("Cancer") ||
str1[4].equalsIgnoreCase("TB")) {
mos.write("EmergencyCare",
null, new Text(dr), "/HealthCarePOC/EmergencyCare");
}
else if (date.after(dateTime1) && (date.before(dateTime2))) {
mos.write("SeasonalCareSection",
null, new Text(dr), "/HealthCarePOC/SeasonalCareSection");
}
else {
mos.write("General",
null, new Text(dr), "/HealthCarePOC/GeneralSection");
}
}
else if (age > 50) {
if
(dr.toLowerCase().contains(s.toLowerCase())) {
mos.write("DengueCareSection",
null, new Text(dr), "/HealthCarePOC/DenguCare");
}
else if (date.after(dateTime3) && (date.before(dateTime4))) {
mos.write("WinderSeasonalCareSection",
null, new Text(dr),
"/HealthCarePOC/WinderSeasonalCareSection");
}
else {
mos.write("General",
null, new Text(dr), "/HealthCarePOC/GeneralSection");
}
}
else {
mos.write("General",
null, new Text(dr), "/HealthCarePOC/GeneralSection");
}
}
catch (Exception e) {
//
TODO Auto-generated catch block
e.printStackTrace();
}
}
}
@Override
protected
void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
EXECUTING
THE MAP REDUCE CODE
To execute a MR program the basic command is:-
hadoop jar <Jar File Name with extesion .jar> <Full package name & Driver class name without extension> <Input File Path> <Output File Path>
If any of the above is missed out then the MR program will giver errors. (Make sure to place all related jar files used in Program compilation is placed in lib directory of Hadoop).
hadoop jar POC.jar
com.poc.PdfInputDriver /Pocinput/POC.pdf /Pdf
Goto Firefox and open name node
page by following command:
http://localhost:50070 and browse the file system , then
click on HealthCarePOC directory to check the files that have been created.
3.
PIG SCRIPT
Pig Script is written in any Text editor and is saved with the extension as .pig.
To execute Pig Script file from any directory in terminal type the following command:-
1. For LFS file:
pig -x local <Scriptname.pig>
2. For HDFS files:
pig <Scriptname.pig>
In this case since our data is in HDFS the following command should be used:
pig <PigScript.pig>
PigScript1.pig
# In this pig script we are loading the file according the column and removing duplicate entries by using DISTINCT command and displaying the output in console.
A = LOAD '/
HealthCarePOC /' USING PigStorage
('\t') AS
(id:int, Name:chararray, age:int, gender:chararray, disease:chararray,hospital:chararray,admitdate:chararray,address:chararray);
B = DISTINCT A;
DUMP B;
PigScript2.pig
# In this pig script we are loading the file from HDFS and storing it in an output directory after removing duplicates and ordering it by Id.
A = LOAD '/ HealthCarePOC /' USING PigStorage ('\t')
AS (id:int, Name:chararray, age:int, gender:chararray, disease:chararray, hospital:chararray, admitdate:chararray, address:chararray);
B = DISTINCT A;
C = ORDER B by Sid;
STORE C INTO '/POCdata';
4.
EXPORT the PIG Output from HDFS to MySQL using SQOOP
Sqoop is used for making a link to the Database and performing tasks such as Import of files, Export of files and perform queries. In our use case we will create a database and table in MySQL and upload the Pig output to the same. Sqoop Eval is used for linking to the MySQL for performing Queries on MySQL.
sqoop eval --connect jdbc:mysql://localhost/ --username
root --password root --query "create database if not exists HEALTHCARE;";
sqoop eval --connect jdbc:mysql://localhost/HEALTHCARE
--username root --password root query "use HEALTHCARE;";
sqoop eval --connect jdbc:mysql://localhost/HEALTHCARE
--username root --password root --query "grant all privileges on
HEALTHCARE.* to ‘localhost’@’%’;”;
sqoop eval --connect jdbc:mysql://localhost/HEALTHCARE
--username root --password root --query "grant all privileges on
HEALTHCARE.* to ‘’@’localhost’;”;
sqoop eval --connect jdbc:mysql://localhost/ HEALTHCARE
username root password root query "create table poc(id int primary key, name
varchar(50), age int, gender varchar(50), disease varchar(50), hospital varchar(50),
admitdate varchar(50), address varchar(250));”;
sqoop export --connect jdbc:mysql://localhost/HEALTHCARE
--table poc --export-dir /POCData --fields-terminated-by '\t';
5.
STORE THE PIG OUTPUT IN A HIVE EXTERNAL TABLE
Hive uses Hadoop environment to run, so make sure that Hadoop daemons are up and running before executing Hive commands. In our usecase since the data is already stored in HDFS directory, we are using Hive external table to point it to that directory, so that we don't have to load the data additionally.
Go to hive shell using command:
hive
After entering the hive shell give the following commands one after another:-
show databases;
create database healthcare;
use healthcare;
create external table poc(id int,Name string,age
int,gender string,disease string,hospital string,admitdate string,address
string)
row format delimited
fields terminated by '\t'
stored as textfile location '/POCdata';
Hope you all understood the procedures...
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).
ALL D BEST...
Good one! Thanks for sharing. By the way What's the benifit of investing in funds over the individual stocks and bonds?
ReplyDeleteSensex
Sensitive Index
BSE Sensex
It's looks very awesome blog,keep sharing more posts with us.
ReplyDeletethank you....
hadoop admin online course
hi, facing problems with the compilation of jar file. How do i get the dependency jar for hadoop mapreduce client core which is required for the MultipleOutputs.
ReplyDeleteMalatya
ReplyDeleteKırıkkale
Aksaray
Bitlis
Manisa
70GB
van
ReplyDeleteerzincan
sivas
ağrı
manisa
6FU
kars
ReplyDeletesinop
sakarya
ankara
çorum
P1F
görüntülüshow
ReplyDeleteücretli show
OLJW
Kırşehir Lojistik
ReplyDeleteHakkari Lojistik
Kars Lojistik
Konya Lojistik
Kilis Lojistik
YY8
B37FB
ReplyDeleteŞırnak Evden Eve Nakliyat
Kocaeli Lojistik
Malatya Lojistik
Diyarbakır Parça Eşya Taşıma
Iğdır Evden Eve Nakliyat
67492
ReplyDeletemasteron
order boldenone
buy dianabol methandienone
İstanbul Evden Eve Nakliyat
order sarms
Silivri Evden Eve Nakliyat
halotestin
order anapolon oxymetholone
order halotestin
CB2BB
ReplyDeleteBingöl Lojistik
Gate io Güvenilir mi
Çanakkale Lojistik
Tekirdağ Boya Ustası
Siirt Şehir İçi Nakliyat
Mardin Lojistik
Mardin Parça Eşya Taşıma
Bitlis Evden Eve Nakliyat
Edirne Parça Eşya Taşıma
DDF5C
ReplyDeletebolu telefonda rastgele sohbet
yozgat canlı sohbet odası
nevşehir en iyi ücretsiz sohbet siteleri
denizli mobil sohbet bedava
ordu canlı görüntülü sohbet odaları
parasız görüntülü sohbet uygulamaları
ağrı chat sohbet
amasya en iyi ücretsiz görüntülü sohbet siteleri
ücretsiz görüntülü sohbet
8FDB0
ReplyDeletesafepal
quickswap
arbitrum
arculus
raydium
layerzero
defilama
metamask
poocoin
FVBHFGNH
ReplyDeleteشركة تسليك مجاري
شركة مكافحة الفئران بالاحساء uOicRBpEXA
ReplyDeleteشركة مكافحة الحمام بالاحساء cv6GHJ8yQx
ReplyDelete90CB070E89
ReplyDeletewww.ijuntaxmedikal.store
görüntülü show
şov
cialis
steroid satın al
steroid satın al
شركة تنظيف فلل بالدمام dZd9AmHT90
ReplyDeleteDE5C0EBA6B
ReplyDeletetiktok takipçi
65AAAB12DB
ReplyDeleteınstagram takipçi