Friday, December 30, 2016

HADOOP (PROOF OF CONCEPT) HEALTHCARE POC BY MAHESH CHANDRA MAHARANA

INDUSTRY: HEALTHCARE

DATA INPUT FORMAT :- PDF (My Input Data is in PDF Format)
Like this below created 3000 tab separated records on my own:-

ATTRIBUTES are like:-
  1.  PATIENT_ID
  2. PATINENT_NAME 
  3. AGE   
  4. GENDER      
  5. DISEASE_INFO
  6. HOSPITAL_NAME
  7. ADMITED_DATE
  8. ADDRESSOFPATIENT

EXAMPLE:-

100001            Sarath_Sexsena           39 Male Cancer Care Hospital 2015-15-04    2ndCross,BTRoad ,Mumbai


DOWNLOAD MY INPUT FILE FROM BELOW LINK:

https://drive.google.com/file/d/0BzYUKIo7aWL_c3ZDZHZGaFhfOUk/view?usp=sharing

PROBLEM STATEMENT: -

      1)      Take the complete PDF Input data on HDFS.
      2)      Develop a Map Reduce Use Case to get the below filtered results from the HDFS Input data (PDF data).
2.1)            If age > 35 && Gender is ‘Male’.
2.1.1)      If DISEASE_INFO is Cancer OR TB  --> store the results in  "EmergencyCare".
2.1.2)      If ‘ADMITED_DATE’ IN BETWEEN 2015-04-01 to2015-07-31  --> store "SeasonalCareSection”.
2.2)            If age >50  && Gender ‘Male’ or ‘Female’.
2.2.1)      If address is ‘Jharkhand’; --> store "Dengue-Care Section".
2.2.2)      If ‘ADMITED_DATE’ IN BETWEEN 2015-09-01 to2015-12-31 --> ‘WinderSeasonal-Care Section’
    ELSE
2.3)            Store in "General Care Section"
  NOTE: In the mentioned file names, only 5 outputs have to be generated
       3)      Develop a PIG Script to filter the Map Reduce Output in the below fashion.
3.1)            Provide the Unique data
3.2)            Sort the Unique data based on Patient_ID.
      4.)    EXPORT the same PIG Output from HDFS to MySQL using SQOOP.
      5.)    Store the same PIG Output in a HIVE External Table

NOTE:- For this POC I have used custom input format to read PDF files using itextpdf. So the corresponding jar files itextpdf 5.1.jar to be added during coding and to the lib directory of hadoop for successful execution. You can use pdfbox for the same but the coding will be different.

POC Processing Details



MAP REDUCE PROCESS IN DETAILS:-




1.     TO TAKE PDF INPUT DATA ON HDFS

COMMANDS:-

hadoop fs -mkdir /Pocinput           (To create a directory in HDFS)
hadoop fs -put POC.pdf /Pocinput       (To put/load the PDF file in HDFS)
jar xvf POC.jar                               (To see the classes used in eclipse jar file and use the driver class)


1.     MAP REDUCE CODES:-

Below is the main class of the MR coding where a job configuration is declared, driver class jar are set along with mapper & reducer classes. Both Input file format and Output file format along with keys are set. Since this coding is using multiple output method to produce output in multiple file we are using multiple output object variable. The MR Process will start from here.

PDF INPUT DRIVER (DRIVER CLASS)


package com.poc;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PdfInputDriver {
            static public int count = 0;
            public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
                        Configuration conf = new Configuration();
                        GenericOptionsParser parser = new GenericOptionsParser(conf, args);
                        args = parser.getRemainingArgs();
                        Job job = new Job(conf, "Pdfwordcount");
                        job.setJarByClass(PdfInputDriver.class);
                        job.setOutputKeyClass(Text.class);
                        job.setOutputValueClass(Text.class);

                        job.setInputFormatClass(PdfInputFormat.class);
                        job.setOutputFormatClass(TextOutputFormat.class);

                        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
                        FileInputFormat.addInputPath(job, new Path(args[0]));
                        FileOutputFormat.setOutputPath(job, new Path(args[1]));
                        // job.setNumReduceTasks(0);

                        job.setMapperClass(PdfMapper.class);
                        job.setReducerClass(PdfReducer.class);

                        MultipleOutputs.addNamedOutput(job, "EmergencyCare", TextOutputFormat.class, Text.class, Text.class);
                         MultipleOutputs.addNamedOutput(job, "SeasonalCareSection", TextOutputFormat.class, Text.class, Text.class);
                        MultipleOutputs.addNamedOutput(job, "DengueCareSection", TextOutputFormat.class, Text.class, Text.class);
                        MultipleOutputs.addNamedOutput(job, "WinderSeasonalCareSection", TextOutputFormat.class, Text.class, Text.class);
                        MultipleOutputs.addNamedOutput(job, "General", TextOutputFormat.class, Text.class, Text.class);
                        System.exit(job.waitForCompletion(true) ? 0 : 1);
            }
}


PDF INPUT FORMAT (CUSTOM INPUT FORMAT TO READ PDF FILES)


This is the Input format class declared in the Driver program which extends the basic File Input Format of MapReduce. This program reads the data from Record reader which send the values in Key & Value format.

package com.poc;
import java.io.IOException;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class PdfInputFormat extends FileInputFormat {
            @Override
            public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,
                                    InterruptedException {
                        return new PdfRecordReader();
            }
}

PDF RECORD READER (TO READ PDF FILE AND SEND AS KEY, VALUE FORMAT)


Below is the customized program which uses Itext to read the data from the PDF file and send it to the File Input Format in the form of Key,Value format. the Key is the ByteOffSet Longwriteable key and all the text in a line is the value.

package com.poc;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import com.itextpdf.text.pdf.PdfReader;
import com.itextpdf.text.pdf.parser.PdfReaderContentParser;
import com.itextpdf.text.pdf.parser.SimpleTextExtractionStrategy;
import com.itextpdf.text.pdf.parser.TextExtractionStrategy;

public class PdfRecordReader extends RecordReader<LongWritable, Text> {
            private int flag = 0;
            private LongWritable key = null;
            private Text value = null;
            private PdfReader reader;
            private PdfReaderContentParser parser;
            private TextExtractionStrategy strategy;
            private FSDataInputStream fileIn;
            private List<String> records = new ArrayList<String>();
            public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
                        FileSplit split = (FileSplit) genericSplit;
                        Configuration conf = context.getConfiguration();
                        final Path file = split.getPath();
                        FileSystem fs = file.getFileSystem(conf);
                        this.fileIn = fs.open(split.getPath());
                        this.reader = new PdfReader(fileIn);
                        this.parser = new PdfReaderContentParser(reader);
                        readRecords();
            }
            public synchronized boolean nextKeyValue() throws IOException {
                        System.out.println("Executing nextKey........Total Records : " + records.size() + "; Flag : " + (flag++));
                        int index = 0;
                        if (key == null) {
                                    key = new LongWritable(index);
                        } else {
                                    index = (int) key.get();
                                    key.set(++index);
                        }
                        if (value == null) {
                                    value = new Text(records.get(index));
                        } else {
                                    value.set(records.get(index));
                        }
                        if (flag == records.size()) {
                                    return false;
                        } else {
                                    return true;
                        }
            }
            @Override
            public LongWritable getCurrentKey() {
                        return key;
            }
            @Override
            public Text getCurrentValue() {
                        return value;
            }
            public float getProgress() {
                        return 0;
            }
            public synchronized void close() throws IOException {
                        if (fileIn != null) {
                                    fileIn.close();
                        }
            }
            private void readRecords() throws IOException {
                        if (reader != null) {
                                    for (int i = 1; i <= reader.getNumberOfPages(); i++) {
                                                strategy = parser.processContent(i, new SimpleTextExtractionStrategy());
                                                if (strategy != null) {
                                                            StringTokenizer tokens = new StringTokenizer(strategy.getResultantText(), "\n");
                                                            while (tokens.hasMoreTokens()) {
                                                                        records.add(tokens.nextToken());
                                                            }
                                                }
                                    }
                                    reader.close();
                        }
                        return;
            }
}

PDF MAPPER (HAVING MAPPER LOGIC)

This is the Mapper class of the programming which checks the first line and if it is a Header then removes it and splits the entire line in TSV (Tab Separated Value) before sending it to next phase.

package com.poc;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class PdfMapper extends Mapper<LongWritable, Text, Text, Text> {
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                        try {
                                    if (value.toString().contains("PATIENT"))
                                                return;
                                    else {
                                                String[] str = value.toString().split(" ");
                                                String data = "";
                                                for (int i = 0; i < str.length; i++) {
                                                            if (str[i] != null || str[i] != " ") {
                                                                        data += (str[i] + " ");
                                                            }
                                                }
                                                String dr = data.trim().replaceAll("\\s+", "\t");
                                                context.write(new Text(""), new Text(dr));
                                    }
                        } catch (Exception e) {
                                    e.printStackTrace();
                        }
            }
}

PDF REDUCER (HAVING REDUCER LOGIC)

This is the reducer programming which takes the value, splits it and checks according to the condition and if the condition is satisfied then places the entire value to respective output by using the multiple output format reference object.

package com.poc;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class PdfReducer extends Reducer<Text, Text, IntWritable, Text> {
            MultipleOutputs<IntWritable, Text> mos;
            @Override
            public void setup(Context context) {
                        mos = new MultipleOutputs<IntWritable, Text>(context);
            }
            @Override
            public void reduce(Text k1, Iterable<Text> k2, Context context) throws IOException, InterruptedException {
                        while (k2.iterator().hasNext()) {
                                    String dr = k2.iterator().next().toString();
                                    String[] str1 = dr.split("\t");
                                    int age = Integer.parseInt(str1[2]);
                                    String s = "Jharkhand";
                                    SimpleDateFormat formatter = new SimpleDateFormat("dd/mm/yyyy");
                                    String dateInString = str1[6];
                                    String date1 = "01/04/2015";
                                    String date2 = "31/07/2015";
                                    String date3 = "01/09/2015";
                                    String date4 = "31/12/2015";

                                    try {
                                                Date date = formatter.parse(dateInString);
                                                Date dateTime1 = formatter.parse(date1);
                                                Date dateTime2 = formatter.parse(date2);
                                                Date dateTime3 = formatter.parse(date3);
                                                Date dateTime4 = formatter.parse(date4);
                                                if (age > 35 && str1[3].equalsIgnoreCase("Male")) {
                                                            if (str1[4].equalsIgnoreCase("Cancer") || str1[4].equalsIgnoreCase("TB")) {
                        mos.write("EmergencyCare", null, new Text(dr), "/HealthCarePOC/EmergencyCare");
                                                            } else if (date.after(dateTime1) && (date.before(dateTime2))) {
                        mos.write("SeasonalCareSection", null, new Text(dr), "/HealthCarePOC/SeasonalCareSection");
                                                            } else {
                        mos.write("General", null, new Text(dr), "/HealthCarePOC/GeneralSection");
                                                            }
                                                } else if (age > 50) {
                                                            if (dr.toLowerCase().contains(s.toLowerCase())) {
                        mos.write("DengueCareSection", null, new Text(dr), "/HealthCarePOC/DenguCare");
                                                            } else if (date.after(dateTime3) && (date.before(dateTime4))) {
mos.write("WinderSeasonalCareSection", null, new Text(dr),
                                                                                                "/HealthCarePOC/WinderSeasonalCareSection");
                                                            } else {
                        mos.write("General", null, new Text(dr), "/HealthCarePOC/GeneralSection");
                                                            }
                                                } else {
                        mos.write("General", null, new Text(dr), "/HealthCarePOC/GeneralSection");
                                                }
                                    } catch (Exception e) {
                                                // TODO Auto-generated catch block
                                                e.printStackTrace();
                                    }
                        }
            }

            @Override
            protected void cleanup(Context context) throws IOException, InterruptedException {
                        mos.close();
            }
}

EXECUTING THE MAP REDUCE CODE

To execute a MR program the basic command is:-
hadoop jar <Jar File Name with extesion .jar> <Full package name & Driver class name without extension> <Input File Path> <Output File Path>

If any of the above is missed out then the MR program will giver errors. (Make sure to place all related jar files used in Program compilation is placed in lib directory of Hadoop).

hadoop jar POC.jar com.poc.PdfInputDriver /Pocinput/POC.pdf /Pdf



Goto Firefox and open name node page by following command:

http://localhost:50070 and browse the file system , then click on HealthCarePOC directory to check the files that have been created.




3.     PIG SCRIPT

Pig Script is written in any Text editor and is saved with the extension as .pig.

To execute Pig Script file from any directory in terminal type the following command:-

1. For LFS file:
pig -x local <Scriptname.pig>

2. For HDFS files:
pig <Scriptname.pig>

In this case since our data is in HDFS the following command should be used:

pig <PigScript.pig>
 

PigScript1.pig
# In this pig script we are loading the file according the column and removing duplicate entries by using DISTINCT command and displaying the output in console.

A = LOAD '/ HealthCarePOC /' USING PigStorage ('\t') AS (id:int, Name:chararray, age:int, gender:chararray, disease:chararray,hospital:chararray,admitdate:chararray,address:chararray);
B = DISTINCT A;
DUMP B; 



PigScript2.pig

# In this pig script we are loading the file from HDFS and storing it in an output directory after removing duplicates and ordering it by Id. 

A = LOAD '/ HealthCarePOC /' USING PigStorage ('\t') AS (id:int, Name:chararray, age:int, gender:chararray, disease:chararray, hospital:chararray, admitdate:chararray, address:chararray);
B = DISTINCT A;
C = ORDER B by Sid;
STORE C INTO '/POCdata';






4.     EXPORT the PIG Output from HDFS to MySQL using SQOOP

Sqoop is used for making a link to the Database and performing tasks such as Import of files, Export of files and perform queries. In our use case we will create a database and table in MySQL and upload the Pig output to the same. Sqoop Eval is used for linking to the MySQL for performing Queries on MySQL.

sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "create database if not exists HEALTHCARE;";



sqoop eval --connect jdbc:mysql://localhost/HEALTHCARE --username root --password root query "use HEALTHCARE;";

sqoop eval --connect jdbc:mysql://localhost/HEALTHCARE --username root --password root --query "grant all privileges on HEALTHCARE.* to ‘localhost’@’%’;”;



sqoop eval --connect jdbc:mysql://localhost/HEALTHCARE --username root --password root --query "grant all privileges on HEALTHCARE.* to ‘’@’localhost’;”;


sqoop eval --connect jdbc:mysql://localhost/ HEALTHCARE username root password root query "create table poc(id int primary key, name varchar(50), age int, gender varchar(50), disease varchar(50), hospital varchar(50), admitdate varchar(50), address varchar(250));”;



sqoop export --connect jdbc:mysql://localhost/HEALTHCARE --table poc --export-dir /POCData --fields-terminated-by '\t';


5.     STORE THE PIG OUTPUT IN A HIVE EXTERNAL TABLE

Hive uses Hadoop environment to run, so make sure that Hadoop daemons are up and running before executing Hive commands. In our usecase since the data is already stored in HDFS directory, we are using Hive external table to point it to that directory, so that we don't have to load the data additionally.

Go to hive shell using command:

hive

After entering the hive shell give the following commands one after another:-

show databases;
create database healthcare;
use healthcare;

create external table poc(id int,Name string,age int,gender string,disease string,hospital string,admitdate string,address string)
row format delimited
fields terminated by '\t'
stored as textfile location '/POCdata';




Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).
ALL D BEST...


20 comments:

  1. Good one! Thanks for sharing. By the way What's the benifit of investing in funds over the individual stocks and bonds?

    Sensex
    Sensitive Index
    BSE Sensex

    ReplyDelete
  2. It's looks very awesome blog,keep sharing more posts with us.
    thank you....

    hadoop admin online course

    ReplyDelete
  3. hi, facing problems with the compilation of jar file. How do i get the dependency jar for hadoop mapreduce client core which is required for the MultipleOutputs.

    ReplyDelete