Saturday, June 24, 2017

CRUNCH YOUR WAY IN HADOOP

 Welcome Back Friends...


Sorry.... It took me some time for posting my blog & deleting few of my previous blogs. But don't worry I am sure you will like this blog as I am sure this will create a new view towards working out MR (MapReduce) programming in Hadoop.

In this blog we will learn about Apache Crunch and will workout a use-case for more understanding.

The Apache Crunch Java library provides a framework for writing, testing, and running MapReduce pipelines. Its goal is to make pipelines that are composed of many user-defined functions simple to write, easy to test, and efficient to run.

As a Hadoop Developer and working with different Companies, I found that a wide range of industries use Apache's, Cloudera's or Hortonwork's Hadoop to solve their business problems (Though some of them uses Spark Programming on top of YARN). For any of our data processing from many of the solutions we create involves a multi-stage pipelines of MapReduce jobs that joins, cleans, aggregates, and analyzes enormous amounts of data. When working with log files or relational database tables, we use high-level tools like Pig and Hive for their convenient and powerful support for creating pipelines over structured and semi-structured records. 

For all Newbies Apache Crunch User Guide (Link HERE)  provides a base understanding on how to develop a Pipeline program, use of DoFn's, Joins, etc... We will also see a small use-case here to have a small understanding on the same. Though it provides different library functions, which is much easier to write than MR programs, yet it works like same as hadoop command but far more faster.

Why we use Crunch???


1. Type-safety makes it much less likely to make mistakes in your code, which are very costly when running across a large Hadoop cluster
2. High performance, especially compared to the performance of Python over Hadoop Streaming
3. Higher-level abstractions such as filters, joins and aggregations instead of having to think of everything in terms of MapReduce
4. First-class Avro support lets us work with strongly-typed data files with the ability to evolve schemas over time
5. Pluggable execution engines such as MapReduce and Spark which let us keep up to date with new technology advancements in the big data space without having to rewrite all of our pipelines with each increment
6. Simple powerful testing using the supplied MemPipline for fast in-memory unit tests

I hope you all enjoy working on crunch.

Below is the Hadoop wordcount program which you can get from almost all website. Though this is the same program but still it is used for an ease of understanding the basic working of CRUNCH...

POM file for Maven Dependencies:-

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>crunch</groupId>
    <artifactId>crunch</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>crunch</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.crunch</groupId>
            <artifactId>crunch-core</artifactId>
            <version>0.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.crunch</groupId>
            <artifactId>crunch-test</artifactId>
            <version>0.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.crunch</groupId>
            <artifactId>crunch-archetype</artifactId>
            <version>0.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.crunch</groupId>
            <artifactId>crunch-contrib</artifactId>
            <version>0.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>0.20.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.0.5-alpha</version>
        </dependency>

    </dependencies>
</project>

HADOOP WORDCOUNT PROGRAM


import com.cloudera.crunch.DoFn;
import com.cloudera.crunch.Emitter;
import com.cloudera.crunch.PCollection;
import com.cloudera.crunch.PTable;
import com.cloudera.crunch.Pipeline;
import com.cloudera.crunch.impl.mr.MRPipeline;
import com.cloudera.crunch.lib.Aggregate;
import com.cloudera.crunch.type.writable.Writables;

public class WordCount {
  public static void main(String[] args) throws Exception {
    // Create an object to coordinate pipeline creation and execution.
    Pipeline pipeline = new MRPipeline(WordCount.class);
    // Reference a given text file as a collection of Strings.
    PCollection<String> lines = pipeline.readTextFile(args[0]);

    // Define a function that splits each line in a PCollection of Strings into a
    // PCollection made up of the individual words in the file.
    PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
      public void process(String line, Emitter<String> emitter) {
        for (String word : line.split("\\s+")) {
          emitter.emit(word);
        }
      }
    }, Writables.strings()); // Indicates the serialization format

    // The Aggregate.count method applies a series of Crunch primitives and returns
    // a map of the unique words in the input PCollection to their counts.
    // Best of all, the count() function doesn't need to know anything about
    // the kind of data stored in the input PCollection.
    PTable<String, Long> counts = Aggregate.count(words);
   // Instruct the pipeline to write the resulting counts to a text file.
    pipeline.writeTextFile(counts, args[1]);
    // Execute the pipeline as a MapReduce.
    pipeline.done();
  }
}

Now Let's understand what is happening in the above code...

public class WordCount {
  public static void main(String[] args) throws Exception {
    // Create an object to coordinate pipeline creation and execution.
    Pipeline pipeline = new MRPipeline(WordCount.class);
    // Reference a given text file as a collection of Strings.
    PCollection<String> lines = pipeline.readTextFile(args[0]);

The following line Pipeline pipeline = new MRPipeline(WordCount.class); creates an object to coordinate pipeline creation and execution.

MRPipeline is the oldest implementation of the Pipeline interface and compiles and executes the DAG of PCollections into a series of MapReduce jobs.

SparkPipeline is the newest implementation of the Pipeline interface which takes a Spark connection for processing.

MemPipeline implementation of Pipeline has a few interesting properties. First, unlike MRPipeline, MemPipeline is a singleton; you don't create a MemPipeline, you just get a reference to it via the static MemPipeline.getInstance() method. Second, all of the operations in the MemPipeline are executed completely in-memory, there is no serialization of data to disk by default, and PType usage is fairly minimal.
  PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
      public void process(String line, Emitter<String> emitter) {
        for (String word : line.split("\\s+")) {
          emitter.emit(word);
        }
      }
    }, Writables.strings()); // Indicates the serialization format


A PCollection<T> represents a distributed, immutable collection of elements of type T. For example, we represent a text file as a PCollection<String> object. PCollection<T> provides a method, parallelDo, that applies a DoFn to each element in the PCollection<T> in parallel, and returns a new PCollection<U> as its result.

Here in above code we are using this to split the line and emit (return) each word.

    PTable<String, Long> counts = Aggregate.count(words);

Now we are doing count for each words which will give us our desired results and use the following code for saving it back.
    pipeline.writeTextFile(counts, args[1]);
    // Execute the pipeline as a MapReduce.
    pipeline.done();


HOW TO RUN THE PROGRAM IN HADOOP


This Crunch MapReduce is ran as any other normal MapReduce job is executed using the hadoop jar command.

   hadoop jar path/to/jar/crunch-example-hadoop.jar \
   com.hadoop.crunch.Driver \
   /examples/word_data.txt \CountOut

SPARK WORDCOUNT PROGRAM


As said earlier that for running the crunch program in Spark we have to mention sparkPipeline while instantiating the pipeline function. This will be more clear when we will see the program below:-

package com.bigdata.crunch;

import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.spark.SparkPipeline;
import org.apache.crunch.lib.Aggregate;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * A word count example for Apache Crunch, based on Crunch's example projects.
 */
public class WordCount extends Configured implements Tool {

public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new WordCount(), args);
}

@SuppressWarnings("serial")
public int run(String[] args) throws Exception {

if (args.length != 2) {
System.err
.println("Usage: hadoop jar crunch-demo-1.0-SNAPSHOT-job.jar" + " [generic options] input output");
System.err.println();
GenericOptionsParser.printGenericCommandUsage(System.err);
return 1;
}

String inputPath = args[0];
String outputPath = args[1];

// Create an object to coordinate pipeline creation and execution.
String master = getConf().get("spark.master", "local");
System.out.println("Connecting to spark at = " + master);
Pipeline pipeline = new SparkPipeline(master, "sparktest", WordCount.class);

// Reference a given text file as a collection of Strings.

// Define a function that splits each line in a PCollection of Strings
// into
// a PCollection made up of the individual words in the file.
// The second argument sets the serialization format.
PCollection<String> lines = pipeline.readTextFile(inputPath);

// Define a function that splits each line in a PCollection of Strings
// into a
// PCollection made up of the individual words in the file.
PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
public void process(String line, Emitter<String> emitter) {
for (String word : line.split("\\s+")) {
emitter.emit(word);
}
}
}, Writables.strings()); // Indicates the serialization format

// The Aggregate.count method applies a series of Crunch primitives and
// returns
// a map of the unique words in the input PCollection to their counts.
// Best of all, the count() function doesn't need to know anything about
// the kind of data stored in the input PCollection.
PTable<String, Long> counts = Aggregate.count(words);
// Instruct the pipeline to write the resulting counts to a text file.
pipeline.writeTextFile(counts, outputPath);
// Execute the pipeline as a MapReduce.
pipeline.done();
return 0;
}
}

HOW TO RUN THE CRUNCH_SPARK WORDCOUNT PROGRAM


The project can be launched on the cluster using spark-submit.
If you do not have a cluster but want to quickly test or run in local mode you can use the following:

bin/spark-submit --class com.mkwhitacre.spark.scala.LegoCalculationSparkDriver \
--master local /path/to/spark-crunch-example-1.0-SNAPSHOT-hadoop2.jar /path/to/lego-data/*/*


References

https://crunch.apache.org/user-guide.html
https://labs.spotify.com/2014/11/27/crunch/
http://blog.cloudera.com/blog/2011/10/introducing-crunch/
https://github.com/mkwhitacre/spark-examples/tree/master/spark-crunch-example

Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed workout with screenshots of each program to be added soon).


ALL D BEST...

Tuesday, April 4, 2017

HIVE INTERVIEW RELATED PREPARATION

Dear Friends....

Few days I spent preparing and giving interviews for job change in HADOOP and few HIVE questions were like most common for almost every interview I faced. Few questions were being asked by few of my friends for which I decided to give a showcase for practical understanding.

Here, with my practicals I am showing various concept on HIVE.

1. Difference between External & Managed Tables:-


Ans:> Almost asked by all Interviewers to explain about external and manged tables. Though its very common to answer but what I observed was that all it matters on how you answer the same in Interview.

Answering like External table is on HDFS and Managed table is on HIVE's Datawarehouse is simply not enough to crack the interview. With a brief discussion with my friends what I found that to really impress the interviewers you have step beyond your comfort zone and try to explain more the same.

Here is how I suggest anyone to answer the same.

First of all understand the concept of the both, As both tables reside in HDFS here is basic differences:-

1. Managed Tables:- A managed table is the default table created inside the HIVE's datawarehouse which is managed by a external database such as derby, etc. Thus while creating a table it will always use such database and create the table inside that. You don't have to give any extra command or keyword for the creation of this table. 
Now the most discussed & practical part is that let's take an example where the tabled is dropped by mistake.. Then the schema is dropped as default but along with schema the data that you have loaded is also get deleted from the database. So retriving the data in real-time is difficult after dropping the table.

Command> create table Test (Id int, Name string, Salary int)
                > row format delimited
                > fields terminated by '\t'
                > stored as textfile;


2. External Table:- An External table created in HIVE always takes/stores data from/in HDFS location instead of HIVE's Warehouse. Thus while creating the External Table we have to mention the keyword 'EXTERNAL' and while storing we have to give keyword 'LOCATION' with HDFS path.
Now for practical part when the External Table is dropped by mistake then the schema of the table from the HIVE's Metastore is deleted where as the actual data in HDFS remains. So, retrieving of data and creating another table with different schema having same table name is possible. 

Command> create EXTERNAL table Test (Id int, Name string, Salary int)
                > row format delimited
                > fields terminated by '\t'
                > stored as textfile LOCATION '/HDFS PATH';

2. Different JOINS in HIVE:-


Ans:> This is a bit tricky question... Here by this question they don't mean the simple joins like outer join, inner join, etc. They are actually looking for performance optimization by different joins.. 

Hive joins are executed by MapReduce jobs through different execution engines like for example Tez, Spark or MapReduce. JOINs even of multiple tables can be achieved by one job only. Many optimizations have been added to Hive giving users various options for query improvements of joins.

Understanding how joins are implemented with MapReduce helps to recognize the different optimization techniques in Hive today. Below are details few JOINS which is used for performance optimization in HIVE:-

2.1. MAP JOIN (Broadcast Join):-


> A simple JOIN operation (Shuffle JOIN) will always follow the basic concept of MR ie; Starting Map phase then Sort&Shuffle and finally Reduce phase to give the JOIN output, which is very time taking for a simple query. MAP JOIN in other case uses only mapper phase to perform the JOIN operation thus removing the other 2 phase of MR and produce the output in much faster time. As no reducers are necessary, map joins are way faster than the regular joins.


The limitation to this JOIN is that it allows a table to be loaded into memory so that a (very fast) join could be performed entirely within a mapper without having to use the entire Map/Reduce steps. If your queries frequently rely on small table joins (e.g. cities or countries, etc.) you might see a very substantial speed-up from using mapjoins. But for large files this will not work efficiently. 

Hive supports MAPJOINs, which are well suited for this scenario – at least for dimensions small enough to fit in memory. A MAPJOIN could be invoked either through an optimizer hint:

The C-style comment " /*+ MAPJOIN(aliasname)*/ " should be placed immediately following the SELECT. It directs Hive to load aliasname (which is a table or alias of the query) into memory.

Command > SELECT /*+ MAPJOIN(c) */ * FROM orders o JOIN cities c ON (o.city_id = c.id);

OR via auto join conversion:

Command > set hive.auto.convert.join=true;                        ( before performing join query)
                  > SELECT * FROM orders o JOIN cities c ON (o.city_id = c.id);

OR activating the same from conf file by setting the name as hive.auto.convert.join and value as true in hive-site.xml in conf directory.

2.2 Hive Bucket Map Join:-


> As the name suggests it is performed on buckets of a HIVE table. Bucketing is a performance enhancer in HIVE where a large dataset is divided into bucket and querying a Bucket Map JOIN will not only use mapper phase only but will perform on specific bucket, thus reducing the latency.

The limitation to this JOIN is that it can be performed on tables which should be bucketed and before bucketing don't forget to set "hive.enforce.bucketing=true" before inserting data. The number of buckets in one table is a multiple of the number of buckets in the other table.So that we can apply MAPJOIN on smaller table by taking it into memory.

For example:-

create table Test1(Id int,Name string,Address string,Salary int)
clustered by (Id) into 4 buckets;
create table Test2(Id int, State string,Pf int, bonus int)
clustered by (Id) into 2 buckets;

set hive.enforce.bucketing = true; 
Insert OVERWRITE  table Test1 select * From Empdata;
Insert OVERWRITE  table Test2 select * From Pfdata;

Now to perform JOIN operation:-

command > set hive.optimize.bucketmapjoin=true;
                > select /*+ MAPJOIN(Test2) */ Test1.* from Test1,Test2 where Test1.Id=Test2.Id;

2.3 Sort Merge Bucket(SMB) Map Join:-


Similar to Bucket Map Join, in this join both tables have same number of buckets. The tables need to be created bucketed and sorted on the same join columns and also data need to be bucketed when inserting.


For example:-

create table Test1(Id int,Name string,Address string,Salary int)
clustered by (Id) sorted by (Id) into 4 buckets;
create table Test2(Id int, State string,Pf int, bonus int)
clustered by (Id) sorted by (Id) into 4 buckets;

set hive.enforce.bucketing = true; 
Insert OVERWRITE  table Test1 select * From Empdata;
Insert OVERWRITE  table Test2 select * From Pfdata;

Now to perform JOIN operation:-

set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;

select /*+ MAPJOIN(Test2) */ Test1.* from Test1,Test2 where Test1.Id=Test2.Id;

2.4 Skew Join:-


When in our data we have very large number of records associated with one(or more) key, then this kind of data leads to skew issue and the data is said to be skewed on that key.  That is in simple wording, 1 key having large number of dataset. For such kind of file when we perform JOIN operation reducers are started according to keys and the reducer having skewed key will take more time to finish while other reducer will finish early and will be in idle state, waiting for the skewed reducer to finish for displaying/storing results.

In this case using Skew JOIN we can mention the skewed key which can be performed by Mapper join while rest other keys will go through a single MapReduce operation.

To perform this JOIN we have to set parameters ie;

set hive.optimize.skewjoin=true;
 (To enable skew join optimization and let hive server optimize the join where there is skew. We need to set it to true.)

set hive.skewjoin.key = 100000
(The value of this property determines which key is a skew key. During join for any particular key if the number of rows is more than this specified number, then that key is considered as a skew join key.)

set hive.skewjoin.mapjoin.map.tasks = 10000
To set the number of map tasks to use in the map join job to perform the skew join. This property needs to be used in association with the hive.skewjoin.mapjoin.min.split.

set hive.skewjoin.mapjoin.min.split = 33554432
To set the minimum split size and hence calculate the maximum number of mappers to be used for the map join job for a skew join. This property should be used with hive.skewjoin.mapjoin.map.tasks for an efficient control.

For Example:-

CREATE TABLE A (ID int,Name string, Salary int)
  SKEWED BY (ID) ON (1,5,6);

Suppose we have table A with a key column, "id" which has values 1, 2, 3 and 4, and table B with a similar column, which has values 1, 2 and 3.
We want to do a JOIN corresponding to the following query
select A.id from A join B on A.id = B.id
A set of Mappers read the tables and gives them to Reducers based on the keys. e.g., rows with key 1 go to Reducer R1, rows with key 2 go to Reducer R2 and so on. These Reducers do a cross product of the values from A and B, and write the output. The Reducer R4 gets rows from A, but will not produce any results.
Now let's assume that A was highly skewed in favor of id = 1. Reducers R2 and R3 will complete quickly but R1 will continue for a long time, thus becoming the bottleneck. If the user has information about the skew, the bottleneck can be avoided manually as follows:
Do two separate queries
select A.id from A join B on A.id = B.id where A.id <> 1;
select A.id from A join B on A.id = B.id where A.id = 1 and B.id = 1;
The first query will not have any skew, so all the Reducers will finish at roughly the same time. If we assume that B has only few rows with B.id = 1, then it will fit into memory. So the join can be done efficiently by storing the B values in an in-memory hash table. This way, the join can be done by the Mapper itself and the data do not have to go to a Reducer. The partial results of the two queries can then be merged to get the final results.



3. CAN WE UPDATE A DATA IN HIVE


Ans> You must be thinking, is the interviewer has gone mad by asking silly question or is this a trick question.. Believe me if you are a newbie then you would answer like most of my friends, that HIVE is a component of HADOOP which uses HDFS and update of a data cannot be done in HDFS so we can't Update a data in HIVE also....

All Wrong.... 

We have a concept of UPSERT (UPdate & inSERT) in HIVE and from 0.14 version onwards a UPDATE feature is added in HIVE.

In UPSERT we first create a new table from old table using CTAS (Create Table As Select) where we remove/skip data we want to update and then insert the new data to the new table. This is a long method, but using UPDATE concept now we can directly change any value in the HIVE table data.

Standard Command:

command> UPDATE tablename SET column = value [, column = value ...] [WHERE expression] 

Parameters to be set before performing UPDATE:-

set hive.support.concurrency=true;

The limitation of using this concept is that the values of column data by which partitioning or bucketing is done can't be updated..

4. INDEXING IN HIVE:


Ans> Hive indexing is to improve the speed of query lookup on certain columns of a table. Without an index, queries with predicates like 'WHERE tab1.col1 = 10' load the entire table or partition and process all the rows. But if an index exists for col1, then only a portion of the file needs to be loaded and processed. An Index acts as a reference to the records. Instead of searching all the records, we can refer to the index to search for a particular record. Indexes maintain the reference of the records. So that it is easy to search for a record with minimum overhead.

Types of Indexes in Hive:-

1. Compact Indexing
2. Bitmap Indexing

Bit map indexing is commonly used for columns with distinct values.

The main difference is the storing of the mapped values of the rows in the different blocks. When the data inside a Hive table is stored by default in the HDFS, they are distributed across the nodes in a cluster. There needs to be a proper identification of the data, like the data in block indexing. This data will be able to identity which row is present in which block, so that when a query is triggered it can go directly into that block. So, while performing a query, it will first check the index and then go directly into that block.

Compact indexing stores the pair of indexed column’s value and its blockid.

Bitmap indexing stores the combination of indexed column value and list of rows as a bitmap.

Syntax/Command:-

CREATE INDEX index_name ON TABLE base_table_name (col_name, ...)
AS 'index.handler.class.name'
[WITH DEFERRED REBUILD]
[IDXPROPERTIES (property_name=property_value, ...)]
[IN TABLE index_table_name]
[PARTITIONED BY (col_name, ...)]
[
   [ ROW FORMAT ...] STORED AS ...
   | STORED BY ...
]
[LOCATION hdfs_path]
[TBLPROPERTIES (...)]

For example:-

> CREATE INDEX table02_index ON TABLE table02 (column3) AS 'COMPACT' WITH DEFERRED REBUILD;
> ALTER INDEX table02_index ON table2 REBUILD;
> SHOW FORMATTED INDEX ON table02;
> DROP INDEX table02_index ON table02;

The following link describes about the indexing with examples.


5. Windowing Functions In Hive:


Ans> Windowing allows you to create a window on a set of data further allowing aggregation surrounding that data. Windowing in Hive is introduced from Hive 0.11. 

Windowing in Hive includes the following functions

5.1 Lead

The number of rows to lead can optionally be specified. If the number of rows to lead is not specified, the lead is one row.

Returns null when the lead for the current row extends beyond the end of the window.

5.2 Lag

The number of rows to lag can optionally be specified. If the number of rows to lag is not specified, the lag is one row.

Returns null when the lag for the current row extends before the beginning of the window.

FIRST_VALUE

LAST_VALUE

The OVER clause

OVER with standard aggregates:
COUNT
SUM
MIN
MAX
AVG
OVER with a PARTITION BY statement with one or more partitioning columns.

OVER with PARTITION BY and ORDER BY with one or more partitioning and/or ordering columns.
Analytics functions

RANK
ROW_NUMBER
DENSE_RANK
CUME_DIST
PERCENT_RANK
NTILE

Refer to the below link for more description and example on the same:-

https://acadgild.com/blog/windowing-functions-in-hive/

6. QUERY VECTORIZATION:- 

(Only the ORC file format is supported in the current implementation.)

Ans> Vectorized query execution is a Hive feature that greatly reduces the CPU usage for typical query operations like scans, filters, aggregates, and joins.. Vectorization allows Hive to process a batch of rows together instead of processing one row at a time.

A standard query execution system processes one row at a time. This involves long code paths and significant metadata interpretation in the inner loop of execution. Vectorized query execution streamlines operations by processing a block of 1024 rows at a time. Within the block, each column is stored as a vector (an array of a primitive data type). Simple operations like arithmetic and comparisons are done by quickly iterating through the vectors in a tight loop, with no or very few function calls or conditional branches inside the loop. These loops compile in a streamlined way that uses relatively few instructions and finishes each instruction in fewer clock cycles, on average, by effectively using the processor pipeline and cache memory.

Enabling vectorized execution

To use vectorized query execution, you must store your data in ORC format, and set the following variable as shown in Hive:-

set hive.vectorized.execution.enabled = true;

Vectorized execution is off by default, so your queries only utilize it if this variable is turned on. To disable vectorized execution and go back to standard execution, do the following:

set hive.vectorized.execution.enabled = false;

Will update & refine more when I will come across more interview related questions in HIVE. Feel free to post me more question / suggestions on the same which I will include in this blog....

References:

Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
ALL D BEST...