Saturday, June 24, 2017

CRUNCH YOUR WAY IN HADOOP

 Welcome Back Friends...


Sorry.... It took me some time for posting my blog & deleting few of my previous blogs. But don't worry I am sure you will like this blog as I am sure this will create a new view towards working out MR (MapReduce) programming in Hadoop.

In this blog we will learn about Apache Crunch and will workout a use-case for more understanding.

The Apache Crunch Java library provides a framework for writing, testing, and running MapReduce pipelines. Its goal is to make pipelines that are composed of many user-defined functions simple to write, easy to test, and efficient to run.

As a Hadoop Developer and working with different Companies, I found that a wide range of industries use Apache's, Cloudera's or Hortonwork's Hadoop to solve their business problems (Though some of them uses Spark Programming on top of YARN). For any of our data processing from many of the solutions we create involves a multi-stage pipelines of MapReduce jobs that joins, cleans, aggregates, and analyzes enormous amounts of data. When working with log files or relational database tables, we use high-level tools like Pig and Hive for their convenient and powerful support for creating pipelines over structured and semi-structured records. 

For all Newbies Apache Crunch User Guide (Link HERE)  provides a base understanding on how to develop a Pipeline program, use of DoFn's, Joins, etc... We will also see a small use-case here to have a small understanding on the same. Though it provides different library functions, which is much easier to write than MR programs, yet it works like same as hadoop command but far more faster.

Why we use Crunch???


1. Type-safety makes it much less likely to make mistakes in your code, which are very costly when running across a large Hadoop cluster
2. High performance, especially compared to the performance of Python over Hadoop Streaming
3. Higher-level abstractions such as filters, joins and aggregations instead of having to think of everything in terms of MapReduce
4. First-class Avro support lets us work with strongly-typed data files with the ability to evolve schemas over time
5. Pluggable execution engines such as MapReduce and Spark which let us keep up to date with new technology advancements in the big data space without having to rewrite all of our pipelines with each increment
6. Simple powerful testing using the supplied MemPipline for fast in-memory unit tests

I hope you all enjoy working on crunch.

Below is the Hadoop wordcount program which you can get from almost all website. Though this is the same program but still it is used for an ease of understanding the basic working of CRUNCH...

POM file for Maven Dependencies:-

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>crunch</groupId>
    <artifactId>crunch</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>crunch</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.crunch</groupId>
            <artifactId>crunch-core</artifactId>
            <version>0.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.crunch</groupId>
            <artifactId>crunch-test</artifactId>
            <version>0.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.crunch</groupId>
            <artifactId>crunch-archetype</artifactId>
            <version>0.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.crunch</groupId>
            <artifactId>crunch-contrib</artifactId>
            <version>0.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>0.20.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.0.5-alpha</version>
        </dependency>

    </dependencies>
</project>

HADOOP WORDCOUNT PROGRAM


import com.cloudera.crunch.DoFn;
import com.cloudera.crunch.Emitter;
import com.cloudera.crunch.PCollection;
import com.cloudera.crunch.PTable;
import com.cloudera.crunch.Pipeline;
import com.cloudera.crunch.impl.mr.MRPipeline;
import com.cloudera.crunch.lib.Aggregate;
import com.cloudera.crunch.type.writable.Writables;

public class WordCount {
  public static void main(String[] args) throws Exception {
    // Create an object to coordinate pipeline creation and execution.
    Pipeline pipeline = new MRPipeline(WordCount.class);
    // Reference a given text file as a collection of Strings.
    PCollection<String> lines = pipeline.readTextFile(args[0]);

    // Define a function that splits each line in a PCollection of Strings into a
    // PCollection made up of the individual words in the file.
    PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
      public void process(String line, Emitter<String> emitter) {
        for (String word : line.split("\\s+")) {
          emitter.emit(word);
        }
      }
    }, Writables.strings()); // Indicates the serialization format

    // The Aggregate.count method applies a series of Crunch primitives and returns
    // a map of the unique words in the input PCollection to their counts.
    // Best of all, the count() function doesn't need to know anything about
    // the kind of data stored in the input PCollection.
    PTable<String, Long> counts = Aggregate.count(words);
   // Instruct the pipeline to write the resulting counts to a text file.
    pipeline.writeTextFile(counts, args[1]);
    // Execute the pipeline as a MapReduce.
    pipeline.done();
  }
}

Now Let's understand what is happening in the above code...

public class WordCount {
  public static void main(String[] args) throws Exception {
    // Create an object to coordinate pipeline creation and execution.
    Pipeline pipeline = new MRPipeline(WordCount.class);
    // Reference a given text file as a collection of Strings.
    PCollection<String> lines = pipeline.readTextFile(args[0]);

The following line Pipeline pipeline = new MRPipeline(WordCount.class); creates an object to coordinate pipeline creation and execution.

MRPipeline is the oldest implementation of the Pipeline interface and compiles and executes the DAG of PCollections into a series of MapReduce jobs.

SparkPipeline is the newest implementation of the Pipeline interface which takes a Spark connection for processing.

MemPipeline implementation of Pipeline has a few interesting properties. First, unlike MRPipeline, MemPipeline is a singleton; you don't create a MemPipeline, you just get a reference to it via the static MemPipeline.getInstance() method. Second, all of the operations in the MemPipeline are executed completely in-memory, there is no serialization of data to disk by default, and PType usage is fairly minimal.
  PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
      public void process(String line, Emitter<String> emitter) {
        for (String word : line.split("\\s+")) {
          emitter.emit(word);
        }
      }
    }, Writables.strings()); // Indicates the serialization format


A PCollection<T> represents a distributed, immutable collection of elements of type T. For example, we represent a text file as a PCollection<String> object. PCollection<T> provides a method, parallelDo, that applies a DoFn to each element in the PCollection<T> in parallel, and returns a new PCollection<U> as its result.

Here in above code we are using this to split the line and emit (return) each word.

    PTable<String, Long> counts = Aggregate.count(words);

Now we are doing count for each words which will give us our desired results and use the following code for saving it back.
    pipeline.writeTextFile(counts, args[1]);
    // Execute the pipeline as a MapReduce.
    pipeline.done();


HOW TO RUN THE PROGRAM IN HADOOP


This Crunch MapReduce is ran as any other normal MapReduce job is executed using the hadoop jar command.

   hadoop jar path/to/jar/crunch-example-hadoop.jar \
   com.hadoop.crunch.Driver \
   /examples/word_data.txt \CountOut

SPARK WORDCOUNT PROGRAM


As said earlier that for running the crunch program in Spark we have to mention sparkPipeline while instantiating the pipeline function. This will be more clear when we will see the program below:-

package com.bigdata.crunch;

import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.spark.SparkPipeline;
import org.apache.crunch.lib.Aggregate;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * A word count example for Apache Crunch, based on Crunch's example projects.
 */
public class WordCount extends Configured implements Tool {

public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new WordCount(), args);
}

@SuppressWarnings("serial")
public int run(String[] args) throws Exception {

if (args.length != 2) {
System.err
.println("Usage: hadoop jar crunch-demo-1.0-SNAPSHOT-job.jar" + " [generic options] input output");
System.err.println();
GenericOptionsParser.printGenericCommandUsage(System.err);
return 1;
}

String inputPath = args[0];
String outputPath = args[1];

// Create an object to coordinate pipeline creation and execution.
String master = getConf().get("spark.master", "local");
System.out.println("Connecting to spark at = " + master);
Pipeline pipeline = new SparkPipeline(master, "sparktest", WordCount.class);

// Reference a given text file as a collection of Strings.

// Define a function that splits each line in a PCollection of Strings
// into
// a PCollection made up of the individual words in the file.
// The second argument sets the serialization format.
PCollection<String> lines = pipeline.readTextFile(inputPath);

// Define a function that splits each line in a PCollection of Strings
// into a
// PCollection made up of the individual words in the file.
PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
public void process(String line, Emitter<String> emitter) {
for (String word : line.split("\\s+")) {
emitter.emit(word);
}
}
}, Writables.strings()); // Indicates the serialization format

// The Aggregate.count method applies a series of Crunch primitives and
// returns
// a map of the unique words in the input PCollection to their counts.
// Best of all, the count() function doesn't need to know anything about
// the kind of data stored in the input PCollection.
PTable<String, Long> counts = Aggregate.count(words);
// Instruct the pipeline to write the resulting counts to a text file.
pipeline.writeTextFile(counts, outputPath);
// Execute the pipeline as a MapReduce.
pipeline.done();
return 0;
}
}

HOW TO RUN THE CRUNCH_SPARK WORDCOUNT PROGRAM


The project can be launched on the cluster using spark-submit.
If you do not have a cluster but want to quickly test or run in local mode you can use the following:

bin/spark-submit --class com.mkwhitacre.spark.scala.LegoCalculationSparkDriver \
--master local /path/to/spark-crunch-example-1.0-SNAPSHOT-hadoop2.jar /path/to/lego-data/*/*


References

https://crunch.apache.org/user-guide.html
https://labs.spotify.com/2014/11/27/crunch/
http://blog.cloudera.com/blog/2011/10/introducing-crunch/
https://github.com/mkwhitacre/spark-examples/tree/master/spark-crunch-example

Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed workout with screenshots of each program to be added soon).


ALL D BEST...