Dear Friends,
Welcome back, after a long time. I was asked by one of my friend to explain about XML processing in hadoop.
I went through many articles, weblinks, etc in search of that answer and now I am ready to showcase the same in this blog.
PROBLEM
Working with XML is painful. XML structure is variable by design, which means no universal mapping to native Pig data structures. This is the price we pay for such a flexible, robust markup, but, as Software Developers, we can’t continue to ignore this problem. There’s XML data everywhere just waiting for us to crack it open and extract value for analysis.
XML processing is quite different then other formats such as Word, Excel or PDFs as it contains tags which is different in different files along with number of subtags. XML is semi-structured file and since the structure of XML is variable by design, we cannot have defined mapping. Thus, to process the XML in Hadoop, you need to know the tags required to extract the data. One has to define every time according to the different data contents.
SOLUTION
There are 3 ways of processing xml files in Hadoop:-
1. PIG:- Using classes from Piggybank jar file.
2. HIVE:- Using SerDe (Serialization Deserialization) Method.
3. MapRedude Coding:- Lengthy coding using classes from OOXML jar files
In my usecase I will be using simple.xml file.
<breakfast-menu>
<food>
<name>Belgian Waffles</name>
<price>$5.95</price>
<description>two of our famous Belgian Waffles with plenty of real maple syrup</description>
<calories>650</calories>
</food>
<food>
<name>Strawberry Belgian Waffles</name>
<price>$7.95</price>
<description>light Belgian waffles covered with strawberrys and whipped cream</description>
<calories>900</calories>
</food>
<food>
<name>Berry-Berry Belgian Waffles</name>
<price>$8.95</price>
<description>light Belgian waffles covered with an assortment of fresh berries and whipped cream</description>
<calories>900</calories>
</food>
<food>
<name>French Toast</name>
<price>$4.50</price>
<description>thick slices made from our homemade sourdough bread</description>
<calories>600</calories>
</food>
<food>
<name>Homestyle Breakfast</name>
<price>$6.95</price>
<description>two eggs, bacon or sausage, toast, and our ever-popular hash browns</description>
<calories>950</calories>
</food>
</breakfast-menu>
1. XML processing using PIG
Apache Pig is a tool that can be used to analyse XML, and it represents them as data flows. Pig Latin is a scripting language that can do the operations of Extract, Transform, Load (ETL), ad hoc data analysis and iterative processingcan be easily achieved. Pig is an abstraction over MapReduce. In other words, all Pig scripts internally are converted into Map and Reduce tasks to get the task done. Pig was built to make programming MapReduce applications easier. The Pig scripts are internally converted to MapReduce jobs. Pig scripts are procedural and implement lazy evaluation, i.e., unless an output is required, the steps aren’t executed.
The XMLLoader currently included with Pig allows us to specify the tags that delimit our documents. This is nice. What it returns, though, is raw XML, which will invariably require a pile of custom UDFs to handle parsing and semantics. To process XMLs in Pig, piggybank.jar is essential. This jar contains a UDF called XMLLoader() that will be used to load the XML document.
Below is the flow diagram to describe the complete flow.
Step 1:- Download and register Jar
To use Piggybank jar in XML, first download the jar and register the path of the jar in Pig.
(Download the Piggybank jar file from HERE)
Use the following command for registering the jar file:
Command > register piggybank.jar;
Load the document using XMLLoader() into a char array. Specify the parent tag to be extracted. If all the elements are defined under root_element without a parent tag, then the root element will be loaded using the XMLLoader()
In the simple.xml file, breakfast_menu is the root element and the tag to be extracted is food.
If all the elements are defined under root_element without parent tag, then the root element will be loaded using the XMLLoader()
Command > A = LOAD '/xmlfile/simple.xml' using org.apache.pig.piggybank.storage.XMLLoader ('breakfast-menu') as (x:chararray);
Step 3:- Extracting data from the tags
To extract data from XML tags in Pig, there are two methods:
1. Using regular expressions
2. Using XPath
1. Using regular expressions
Use the regular expressions to extract the data between the tags. Regular expressions can be used to determine simple tags in the document. [Tag <title> in the document]
For nested tags, writing regular expression will be tedious because if any small character is missed in the expression, it will give null output.
Command > B = FOREACH A GENERATE FLATTEN (REGEX_EXTRACT_ALL(x,'(?s)<breakfast-menu>.*?<name>([^>]*?)</name>.*?</breakfast-menu>'));
OR
Command > B = FOREACH A GENERATE FLATTEN (REGEX_EXTRACT_ALL(x,'(?s)<breakfast-menu>\\s*<calories>(.*)</calories>\\s*<price>(.*)</price>\\s*</breakfast-menu>'));
Then DUMP B to see Results.
2. Using XPath
XPath uses path expressions to access a node.
The function for XPath UDF consists of a long string:org.apache.pig.piggybank.evaluation.xml. Thus, you should define a small temporary function name for simplicity and ease of use.
To access a particular element, start from loading the parent node and navigate to the required tag.
Note that every repeating parent and child nodes become separate rows and columns respectively. In the above file, the tag<IntervalReading> repeats in the file, thus, upon extraction, each tag <IntervalTag> becomes a new row with the tags under it becoming attributes.
Piggybank.jar doesn't have the class XPath so, Download the piggybank-0.15.0 Jar file HERE.
Register the above jar file.
Load the XML file.
Use the XPath class to get the element in the title.
Command > B = FOREACH A GENERATE (XPath (x, 'food/name')), (XPath (x, 'food/price')), (XPath (x, 'food/calories'));
Then DUMP or STORE the result.
NOTE:- In both of the above process you have to mention the titles that you want to be displayed or stored using the same coding by adding additional title.
2. XML Processing Using HIVE
To process XML files in HIVE there are two methods 1. Using UDFs. 2. Using SerDe..
In this blog I am using SerDe method to process XML file.
Hive SerDe method uses a class from hivexmlserde jar file which can be downloaded from HERE.
Step 1: Add the SerDe Jar file to Hive Path.
Step 1: Add the SerDe Jar file to Hive Path.
Command > add jar /home/gopal/Desktop/hivexmlserde-1.0.5.3.jar;
The ADD JAR statement ensures the document reader is available for job execution. On successfull addition you will get below message
Added /home/gopal/Desktop/hivexmlserde-1.0.5.3.jar to class path
Added resource: /home/gopal/Desktop/hivexmlserde-1.0.5.3.jar
Step 2:- Create an External table using SerDe class and giving the existing XML file location.
Command > CREATE EXTERNAL TABLE xml (name string, price string, description string, calories string)
> ROW FORMAT SERDE 'com.ibm.spss.hive.serde2.xml.XmlSerDe'
> WITH SERDEPROPERTIES ("column.xpath.name"="/food/name/text()",
> "column.xpath.price"="/food/price/text()",
> "column.xpath.description"="/food/description/text()",
> "column.xpath.calories"="/food/calories/text()")
> STORED AS INPUTFORMAT 'com.ibm.spss.hive.serde2.xml.XmlInputFormat'
> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
> LOCATION '/xmlfile/'
> TBLPROPERTIES ("xmlinput.start"="<food","xmlinput.end"="</food>");
> ROW FORMAT SERDE 'com.ibm.spss.hive.serde2.xml.XmlSerDe'
> WITH SERDEPROPERTIES ("column.xpath.name"="/food/name/text()",
> "column.xpath.price"="/food/price/text()",
> "column.xpath.description"="/food/description/text()",
> "column.xpath.calories"="/food/calories/text()")
> STORED AS INPUTFORMAT 'com.ibm.spss.hive.serde2.xml.XmlInputFormat'
> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
> LOCATION '/xmlfile/'
> TBLPROPERTIES ("xmlinput.start"="<food","xmlinput.end"="</food>");
Explanation:
The INPUTFORMAT option allows for the definition of the required document reader. The OUTPUTFORMAT specified is the default for Hive. In this example I have defined an EXTERNAL table over the directory containing the extracted XML; independently copied to the Hadoop cluster.
3. XML Processing Using MapReduce
XML processing using MapReduce needs custom XML Input Format which will read XML files using a custom XML RecordReader method. XML files have tags ie; start-tag and end-tag, you have to mention that in your driver class to identify the same and inside mapper you have to mention the sub-tag and elements inside it.
(Kindly change the tag names accordingly in Driver and Mapper class)
Please find below coding for custom File format and record reader for XML processing.
To load the data in HDFS use the following command:-
Command > hadoop fs -mkdir /xmlfile
Command > hadoop fs -put simple.xml /xmlfile
package com.poc.xml;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class XmlInputFormat extends TextInputFormat {
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new XmlRecordReader();
}
}
package com.poc.xml;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class XmlRecordReader extends RecordReader<LongWritable, Text> {
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";
private LongWritable key = new LongWritable();
private Text value = new Text();
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
FileSplit fileSplit = (FileSplit) split;
// open the file and seek to the start of the split
start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
fsin = fs.open(fileSplit.getPath());
fsin.seek(start);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
key.set(fsin.getPos());
value.set(buffer.getData(), 0, buffer.getLength());
return true;
}
} finally {
buffer.reset();
}
}
}
return false;
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public void close() throws IOException {
fsin.close();
}
@Override
public float getProgress() throws IOException {
return (fsin.getPos() - start) / (float) (end - start);
}
private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
// end of file:
if (b == -1)
return false;
// save to buffer:
if (withinBlock)
buffer.write(b);
// check if we're matching:
if (b == match[i]) {
i++;
if (i >= match.length)
return true;
} else
i = 0;
// see if we've passed the stop point:
if (!withinBlock && i == 0 && fsin.getPos() >= end)
return false;
}
}
}
(Kindly change the tag names accordingly in Driver and Mapper class)
Please find below coding for custom File format and record reader for XML processing.
FIRST LOAD THE DATA IN HDFS
To load the data in HDFS use the following command:-
Command > hadoop fs -mkdir /xmlfile
Command > hadoop fs -put simple.xml /xmlfile
XML INPUT FORMAT
package com.poc.xml;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class XmlInputFormat extends TextInputFormat {
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new XmlRecordReader();
}
}
XML RECORD READER
package com.poc.xml;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class XmlRecordReader extends RecordReader<LongWritable, Text> {
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";
private LongWritable key = new LongWritable();
private Text value = new Text();
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
FileSplit fileSplit = (FileSplit) split;
// open the file and seek to the start of the split
start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
fsin = fs.open(fileSplit.getPath());
fsin.seek(start);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
key.set(fsin.getPos());
value.set(buffer.getData(), 0, buffer.getLength());
return true;
}
} finally {
buffer.reset();
}
}
}
return false;
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public void close() throws IOException {
fsin.close();
}
@Override
public float getProgress() throws IOException {
return (fsin.getPos() - start) / (float) (end - start);
}
private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
// end of file:
if (b == -1)
return false;
// save to buffer:
if (withinBlock)
buffer.write(b);
// check if we're matching:
if (b == match[i]) {
i++;
if (i >= match.length)
return true;
} else
i = 0;
// see if we've passed the stop point:
if (!withinBlock && i == 0 && fsin.getPos() >= end)
return false;
}
}
}
XML PROCESSING
Driver class
package com.poc.xml;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class XMLProcessing {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("xmlinput.start", "<breakfast-menu>");
conf.set("xmlinput.end", "</breakfast-menu>");
Job job = new Job(conf);
job.setJarByClass(XMLProcessing.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(XmlInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
XML MAPPER CLASS
package com.poc.xml;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
//import mrdp.logging.LogWriter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
public class Map extends Mapper<LongWritable, Text, Text, NullWritable> {
private static final Log LOG = LogFactory.getLog(Map.class);
// Fprivate Text videoName = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
InputStream is = new ByteArrayInputStream(value.toString().getBytes());
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
Document doc = dBuilder.parse(is);
doc.getDocumentElement().normalize();
NodeList nList = doc.getElementsByTagName("food");
for (int temp = 0; temp < nList.getLength(); temp++) {
Node nNode = nList.item(temp);
if (nNode.getNodeType() == Node.ELEMENT_NODE) {
Element eElement = (Element) nNode;
String name = eElement.getElementsByTagName("name").item(0).getTextContent();
String price = eElement.getElementsByTagName("price").item(0).getTextContent();
String calories = eElement.getElementsByTagName("calories").item(0).getTextContent();
context.write(new Text(name + "," + price + "," + calories), NullWritable.get());
}
}
}
}
}
NOTE:- I am using only mapper class to give the output from XML to CSV format (I used "," for getting CSV format, you can use tab for the same). After that based on the requirement we can use the reducer class for reducer logic.
Now go ahead and feed XML to your PIG and Store XML honey in your HIVE.
References:-
Hope you all understood the procedures...
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).
ALL D BEST...
I am so happy after reading your blog. It’s very useful blog for us.
ReplyDeleteBig Data Hadoop in-house Corporate training in Nigeria
Understanding Hadoop By Mahesh Maharana: Xml File Processing In Hadoop >>>>> Download Now
ReplyDelete>>>>> Download Full
Understanding Hadoop By Mahesh Maharana: Xml File Processing In Hadoop >>>>> Download LINK
>>>>> Download Now
Understanding Hadoop By Mahesh Maharana: Xml File Processing In Hadoop >>>>> Download Full
>>>>> Download LINK Qf