Monday, May 9, 2016

Generating GAPLESS SEQUENCE NUMBER in map reduce.

Lets say there is a large data file and using map reduce we would like to add a row number to the data. Following would be the visual representation of the approach


The following code is just representational and is not tested. 

SequenceZKMapReduce.java


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.*;
import java.util.HashMap;


public class SequenceZKMapReduce {

    //Enum for map reduce counters    public static enum SEQ_COUNTER {
        OUTPUT_RECORD    };

    //Mapper class that would output the mapper id as the key and the entire text as value    //Once the mapper finishes the work, using cleanup, its going to report the number of records to zookeper
    public static class SequenceMapper extends Mapper<Object, Text, IntWritable, Text> {

        private final IntWritable ConstKey = new IntWritable();

        //Intial setup        //Set the mapper id as the key for the data to be emmited from this mapper.        protected void setup(Context context){
            ConstKey.set(context.getTaskAttemptID().getTaskID().getId());
        }

        //Write the constant key which is the mapper id, and the value as the entire string        //Increase the counter for every record processed        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            context.write(ConstKey, value);
            context.getCounter(SEQ_COUNTER.OUTPUT_RECORD).increment(1);
        }

        //This method would be called at the close of the map task.        //We are going to use this method to write to zookeeper        protected void cleanup(Context context) {
            try {
                ZooKeeperMgr zk = new ZooKeeperMgr();
                //Connect to zookeeper, I am hardcoding the zookeeper host to localhost                zk.connect("localhost");
                //Get the existing data on the node and deserialize to a hashmap                byte[] dt = zk.readZnode();
                int ver = zk.getVersion();
                HashMap<Integer, Long> h = null;

                //If data is not empty deserialize to get the hashmap else create a new one.                if(dt!=null){
                    ByteArrayInputStream bis = new ByteArrayInputStream(dt);
                    ObjectInput in = new ObjectInputStream(bis);
                    h = (HashMap<Integer, Long>) in.readObject();
                }else{
                    h= new HashMap<Integer, Long>();
                }

                //Add the map key and the counter to the hashmap                h.put(ConstKey.get(), context.getCounter(SEQ_COUNTER.OUTPUT_RECORD).getValue());
                //Write the data back to the znode                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                ObjectOutput out = new ObjectOutputStream(bos);
                out.writeObject(h);
                zk.writeZnode(bos.toByteArray(), ver); // This would throw exception if version changed between read and write
            } catch (Exception e) {
                //If there is version mismatch write exception, call the method again.                //To make it safe, some counter should be established. Otherwise it may get into a loop.                this.cleanup(context);
            }
        }

    }

    //Reducer Class that spits out a sequence number and the text as it came    public static class ReorgReducer extends Reducer<IntWritable, Text, LongWritable, Text> {

        HashMap<Integer, Long> h;
        private long sequence = 0L;
        private LongWritable sequenceHolder = new LongWritable(1);

        //In the setup phase, read the znode and extract all ranges (hashmap)        protected void setup(Context context){
            try {
                ZooKeeperMgr zk = new ZooKeeperMgr();
                //Connect to zookeeper, I am hardcoding the zookeeper host to localhost                zk.connect("localhost");
                //Get the existing data on the node and deserialize to a hashmap                byte[] dt = zk.readZnode();
                ByteArrayInputStream bis = new ByteArrayInputStream(dt);
                ObjectInput in = new ObjectInputStream(bis);
                h = (HashMap<Integer, Long>) in.readObject();
            }catch(Exception e){

            }
        }

        public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //Get the start -1 position for the given key            sequence = getStartPosition(h, key.get());

            for (Text val : values) {
                sequence=sequence+1;
                sequenceHolder.set(sequence);
                context.write(sequenceHolder , val);
            }
        }

        //Given a key, this method looks for all the keys less than the given and sums all values.        //the idea is if a reducer if processing data from a mapper with id 102, it will first sum up the range for all ids        //less than 102 and start the sequencing after that.        private long getStartPosition(HashMap<Integer, Long> ranges, int currentKey){
            Long startPos=0L;
            for(int key : ranges.keySet()){
                if(key < currentKey) startPos = startPos + ranges.get(key);
            }
            return startPos;
        }


    }

    //Job Runner    public static int run(String inpPath, String outPath) throws Exception{

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Add Sequence Number with help of zoo keeper");

        job.setJarByClass(SequenceMapReduce.class);
        job.setMapperClass(SequenceMapper.class);
        job.setReducerClass(ReorgReducer.class);

        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(inpPath));
        FileOutputFormat.setOutputPath(job, new Path(outPath));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    //Main method    public static void main(String args[]) throws Exception{
        System.exit(run(args[0], args[1]));
    }
}


ZooKeeperMgr.java
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;

import java.io.IOException;

public class ZooKeeperMgr {

    String nodePath = "/sequenceapp";
    ZooKeeper zookeeper;
    CountDownLatch connectStatus = new CountDownLatch(1);


    //Connect to the zookeeper and get the handle    //This method should be called from the driver, mapper and reducer setups    public void connect(String host) throws IOException, InterruptedException {
        zookeeper = new ZooKeeper(host, 5000,
                new Watcher() {
                    public void process(WatchedEvent event) {
                        if (event.getState() == Event.KeeperState.SyncConnected) {
                            connectStatus.countDown();
                        }
                    }
                });
        connectStatus.await();
    }

    //Create znode    //This method should only be called from driver    public void createZnode() throws KeeperException, InterruptedException {
        try {
            zookeeper.create(nodePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch(InterruptedException inex) {
        } catch(KeeperException kex) {
        }
    }

    //Read from znode.    //This method to be used by mapper and reducer    public byte[] readZnode() throws KeeperException, InterruptedException {
        byte[] dt = null;
        try {
            dt = zookeeper.getData(nodePath, true, zookeeper.exists(nodePath, true));
        } catch(Exception ex) {
        }
        return dt;
    }

    //Get the znode data version    //This method to be used by mappers before writing the data    //Important to ensure that data is not overwrittten    public int getVersion() throws KeeperException, InterruptedException {
        Stat stat = zookeeper.exists(nodePath, true);
        return stat.getVersion();
    }

    //Write to znode    //This methoid to be used only by mapper    public void writeZnode(byte[] data, int version) throws KeeperException, InterruptedException {
        zookeeper.setData(nodePath, data, version);
    }

    //Delete znode    //Method to be only called by the driver to delete the node before the app closes    public void deleteNode()throws InterruptedException, KeeperException{
        zookeeper.delete(this.nodePath, -1);
    }


    //Close the connection    public void close() throws InterruptedException, KeeperException {
        zookeeper.close();
    }

    //This method is redundant. Keeping for just in case the zookeeper instance is required.    public ZooKeeper getZooKeeper() {
        if (zookeeper == null || !zookeeper.getState().equals(ZooKeeper.States.CONNECTED)) {
            throw new IllegalStateException("ZooKeeper is not connected.");
        }
        return zookeeper;
    }

}

Hello Hive UDF: Unique Identifier Generator using Hive UDF

In RDBMS world, we could generate unique numbers using sequences and other SQL approaches. In Hive a generated unique number should be unique in a distributed environment.

Trying to generate a unique number across nodes that synchronize data across nodes can be really slow. The alternative is to generate unique numbers on individual nodes in a way that the generated number is confirmed to be unique across nodes.

One of the approach that I have used, is to use a Hive UDF that generates unique numbers on individual nodes by using a combination of

  1. Node IP Address
  2. Job Start Timestamp
  3. Sequence number generated on the node
Following is a sample code. This code is not tested and possibly may be having some typos somewhere. The intent is to provide the approach

UniqueNumberGenerator.java

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;



@UDFType(deterministic = false, stateful = true)
public class UniqueNumberGenerator extends GenericUDF {

    private transient Long sequence;
    private transient String prefix;


    //Gets called once    @Override    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        // per-instance fields initialization        prefix = getPrefix();
        sequence = 0L;
        return PrimitiveObjectInspectorFactory.javaLongObjectInspector;
    }


    //Gets called on each call to function    @Override    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        sequence = sequence + 1;
        return Long.parseLong(prefix + sequence);
    }

    @Override    public String getDisplayString(String[] children) {
        return getStandardDisplayString("unique_number", children);
    }



    //Following are support functions
    //Create number prefix    public String getPrefix() {
        return getStartingTimestamp() + getLocalIPFormatted();
    }


    //Returns the last 2 parts of IP address left padded to 3 number with tota 6 digits    //The last 2 parts are chose to introduce maximum variability    public String getLocalIPFormatted() {
        InetAddress addr = null;
        String ip = null;

        try {
            addr = InetAddress.getLocalHost();
            ip = addr.getHostAddress();
        } catch (UnknownHostException e) {
            ip = "111.111.111.111";
        }

        String[] parts = ip.split("\\.");
        //Create number by reverse concatenation of the ip parts        return String.format("%3s", parts[3]).replace(' ', '0') + String.format("%3s", parts[2]).replace(' ', '0');
    }


    //Get the system time in millis and left pad with zeros to make the number future proof    public String getStartingTimestamp() {
        return String.format("%-13d0", System.currentTimeMillis()).replace(' ', '0');
    }

}





The key things to note are the parameters for UDFType (deterministic = false and stateful = true)

Deterministic : Evaluates once and uses across rows. Setting to false instructs the function to be called for each row
Stateful: Instructs to maintain the state of variables between UDF calls for each row. This is how the sequence number is incremented 


Why the variables are transient? : We don't want any variable value to persist if Hive serialize the UDF object and throws it across nodes.

Wednesday, December 30, 2015

SQL Hints - Map Joins for better performance

Thinking over this one a bit, it may make sense to build a little bit of background before jumping to map join directly.

Imagine if you have to write a program (assume we are in 1990 when no big data existed) to take 2 FILES and ready them and join and then write to another file. There may be better ways, but the most common approach that comes to mind is

  • Sort file A
  • Sort file B
  • Read A and B simultaneously and compare the keys and stall on the key which is lagging(both the files may not have same record count, like customer and order) and keep writing to an output file as keys match.

Key point is unless the files are sorted the join is not possible. This also extends to big data world, unless the files are sorted the join simply can not happen.

Remember: Hive underneath is map reduce.

Lets think of the above scenario in map-reduce world. A schematic representation would look like below


If you notice above Mappers as dealing with only individual files and sort and shuffle phase is where they come together, they get sorted and keys matched.  This is where it gets expensive.

A map join is basically a hint to do things little differently which is situational. i.e. one file is small and can be saved to distributed cache so that the join can happen in map phase. Confusing?... the following digram shows how it would look like


The distributed cache is a hash map that supports faster look up. The mapper 1 and 2 used the hash map in distributed cache to look up the values and join associated attributes to their own dataset before going through the sort and shuffle. This reduced the load on the sort and shuffle.

Thats a 2000ft view of map joins.

Important Note:

Recent versions of hive know to do automatic joins when the table statistics are known to the optimizer. However, if the statistics are not available for tables the optimizer won't know which table can be used as hash map. Also for subquery/intermediate datasets, the optimizer won't know during optimization phase how to map join (as the intermediate data sets are created in run time). This is where a humanly touch of giving map join hints possibly would save a lot of time.

Following is an example of map join

SELECT /*+ MAPJOIN(STORETABLE) */ COUNT(*) FROM
SALESTABLE JOIN STORETABLE ON (SALESTOREKEY = STOREKEY)

Map join is a hint keyword.... similar to hints in RDBMS world. If you are interested in understanding the concepts more in detail, it may be worthwhile to write a map reduce yourself that used distributed cache with a hash map.

Sunday, December 27, 2015

Hive Client Tools : Tools for query

Hive evolving as the simplest way to access the structured data from hadoop ecosystem, analysts often need a simpler interface to run queries and view results. Thankfully there are many FREE tools available to do so. Following are a list of tools that are free but not the entire list. I am not rating these tools either as there is always a possibility of a better one available somewhere that I am not aware of.

Hive Client Library: [Command Line]
This is a non-ui tool i.e. a command line interface and provides all the features that you need in hive environment. Just do a ssh to your cluster and hive command "hive" to start the hive shell.

Beeline: [Command Line]
This is based on SQLLine lib. Given the hive driver is class path and a JDBC URL, it can connect to hiveserver2 and run queries.

beeline> !connect jdbc:hive2://<host>:<port>/<db>;auth=noSasl hiveuser pass org.apache.hive.jdbc.HiveDriver

Hue: [Web Interface]
Hue has gotten really interesting over time. To get a feel of hue, head to the hue demo site.
Hortonworks sandbox comes preinstalled with Hue (though a good old version which can be updated) and runs on port 8000. like http://sandbox.hortoworks.com:8000/

Oracle SQL Developer: [Java Application]
Note: This only works with cloudier driver and with a  specific version. It does not appear that there is long term support to make it work with newer versions. If interested you can refer to this blog about how to setup the environment.

IBM Data Studio: [Java Application]
Built on top of Eclipse, I liked this tool for a short period that I worked with it. If interested to use, please head here

Squirrel: [Java Application]
This is one SQL Editor which supports almost all databases in market that supports JDBC.  Getting to work with Hive needs some amount of effort, buts its worth it considering Squirrel is written in Java and runs on Windows/Mac/Linux. For installation and configuration, I have posted some details here

Aginity Hadoop Workbench: [Windows Application]
This is windows native built application and my favorite. It does not provide connectivity to PIG, Oozie, etc like hue does. But for writing and running queries, it really excels. Aginity workbench also comes with a File shell that can help in managing files on HDFS. Its free, though registration is required. http://www.aginity.com/workbench/hadoop/



Using Squirrel to query Hive


To use squirrel download the following jar files (I am using hive 1.2.1). The first Jar file is the actual driver and others are the dependencies the driver has



Assuming you already have squirrel installed, (if not go to http://squirrel-sql.sourceforge.net/#installation ), Go to driver, click on the + sign to register a new driver.

























Add the downloaded Jar files here.
Click on hive-jdbc jar files and click "list drivers"
Give the example URL as jdbc:hive2://localhost:10000/default

And click Ok.

Now create an alias, and provide the server details and username and connect


















You are good to use Squirrel. I like the tool except for the fact that you can not have the object browser and SQL window side by side. (Or there may be a way, I am not aware of)

Improving Performance for Hive Queries

If hive is used as the interface for accessing TB and PB scale data, it is quite important to optimize the queries to get them to run faster. Optimizing the queries is directly related to infrastructure, size of data, organization of data, storage formats and the data readers/ processors. This opens up thousands of possibilities in terms of how the queries can be optimized.

Following are some suggestions that can increase the query performance without worrying too much about the data organization.


  • Set hive execution engine to spark
  • Set hive execution engine to Tez
  • Set table storage format to ORC
  • Use cost based query optimization
  • Use Vectorization for queries
  • Use Partition on data when possible
  • Force Situational Map Join. (More on this link)
Apart from these options there are tens of other ways to improve the performance that are scenario driven like map joins, bucketing and bucket sort the data, change the VM and YARN memory configurations, etc

Following are the details for some of the above options.


EXECUTION ENGINE: SPARK

Use spark as the query engine for Hive:  Currently the hadoop releases do not support a direct approach to setting the query engine to Spark (some level of effort is required in setting it up). Refer to this article if you are interested. Remember this is somewhat new and there are chances that some expected stuff may not work. So do a POC based on your needs before using in production.

EXECUTION ENGINE: TEZ
If setting up Spark is a difficult route, use tez as the query engine for hive.  Setting the query engine to Tez is as easy as giving the command
set hive.execution.engine=tez

Or you can make a permanent change by changing your hive-site.xml, usually located in [/usr/hdp/current/hive-server2/conf/] or by changing the "hive.execution.engine" property to "Tez" through  Ambari 

STORAGE FORMAT: ORC
ORC (Optimized Row Columnar) is an evolution to RC (Row Columnar) format. Data is stored in stripes and efficient for reading. You can lean more about ORC here.

A summary fo ORC: A collection of rows are stored in a single block of data, and inside the block the columns are stored together. i.e Depending on the row size, a block may have 20,000 rows in a block and within the block, column 1 data for all 20000 rows are stored together, column 2 data for all 20000 rows are stored together and so on... The advantage of this format is based on the columns selected in a query, the data is read only for the required columns in a block than the entire row data. In my experience, ORC format storage does not pose any challenges as against storage as text file in terms of SQL features. ORC just improves the performance.

About setting the storage to ORC, it can done in 3 ways.

>>While creating the table
create table myorctable (
  name string,
  age int

) stored as orc tblproperties ("orc.compress"="NONE");

>>By altering the table
ALTER TABLE myorctable SET FILEFORMAT ORC

>>By setting the default file format to ORC and then creating the tables.
SET hive.default.fileformat=Orc;
Setting the default file format can be done from the client application or can be applied in hive config file. When creating a table stored as ORC there are table properties that get applied like compression method. If you are planning to set the default storage as ORC, you may want to take a look at this link for setting other default table properties and their config names for setting the values in the hive config file.

AUTOMATIC QUERY PLAN OPTIMIZATION
A feature which is borrowed from RDBMS systems or MPP (Massively parallel processing) systems. This is predominantly helpful when the queries have joins or multiple stepped operations. Also commonly referred as COST BASED QUERY OPTIMIZATION

Using this feature has 2 parts.

  • Letting hive know that this feature should be used.
  • Making the table statistics available to hive.
>>Letting hive know that this feature should be used

Set the following parameters through the client or in hive configurations through Ambari.

set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;

set hive.stats.fetch.partition.stats=true;

>> Making the table statistics available to hive
For the tables involved in the queries that you wish to use the CBQO feature, run the following SQL

analyze table myorctable compute statistics for columns name, age;
--OR if using version 0.14 and beyond

analyze table myorctable compute statistics for columns;

Do this for all the tables involved in the query.


USE VECTORIZATION
This option helps only in certain scenarios. Vectorization basically stalls the rows and performs operation on then in batches of 1024 rows instead of processing 1 row at a time.

Vectorization can be enabled through the following settings

set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;

You can either give the above set commands through your client tool or enable them permanently in hive settings through Ambari.


USE DATA PARTITIONS

Hive partitions basically store the data in different folders based on the partition rules defined and accesses only required data when query filter are used. The most common partition used is date.

Unlike relational database, the partition columns being common to a dataset are not part of the columns in the data and as a result also helps in reducing the data volume.

2 important tips:

  • Do not over partition the data just because it reduces data volume.
  • Keep your commonly used/ most frequently used queries in mind while partitioning the data

Following is a sample DDL of how partitions are used.

CREATE TABLE MY_WEBLOG (
    SESSION_ID BIGINT,
    URL STRING,
    LOG_TIMESTAMP STRING
)

PARTITIONED BY (LOG_DATE STRING, LOG_HOUR STRING) ;

To insert into a partitioned tables, the insert statement goes like


INSERT INTO MY_WEBLOG
PARTITION (LOG_DATE = '2016-01-01', LOG_HOUR = '23')
SELECT  SESSION_ID, URL, LOG_TIMESTAMP
FROM    LOG_SOURCE

WHERE   LOG_DATE = '2016-01-01', LOG_HOUR = '23' ;

This is quirky to specify partition values every time there is an insert. Hive provides dynamic partition options in order for inserts to automatically partition the data.

INSERT INTO MY_WEBLOG
PARTITION (LOG_DATE, LOG_HOUR)
SELECT  SESSION_ID, URL, LOG_TIMESTAMP , LOG_DATE, LOG_HOUR
FROM    LOG_SOURCE;


Note: Its required to have the partition columns at the end of the insert columns. Remember, partition columns are not physical columns, they are grouping folders.



Friday, December 25, 2015

Hive External Tables with Multiple Character Delimiter

Hive external tables allow creating external reference tables without moving the data to a new location. I created a sample dataset with delimiter "|^|". However I soon realized that hive external tables only allow a single character as delimiter. i.e the following code does NOT work.


CREATE EXTERNAL TABLE TwitterFlatExternal
(
Column_1 STRING,
Column_2 STRING,
Column_3 STRING,
Column_4 STRING,
Column_5 STRING,
Column_6 STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|^|'
LINES TERMINATED BY '\n'

LOCATION '/user/dsarangi/twitterdataflat';


However it is possible to achieve the same results using RegexSerde. SerDe stands for serializer and deserializer. Read more about it on Hive Serde

The regular expression in regex SerDe basically would have to grab each column on a regex grouping. The following one is an example for 6 column table with delimiter = "|^|"

CREATE EXTERNAL TABLE TwitterFlatExternalReg
(
Column_1 STRING,
Column_2 STRING,
Column_3 STRING,
Column_4 STRING,
Column_5 STRING,
Column_6 STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = "^(.*)\\|\\^\\|(.*)\\|\\^\\|(.*)\\|\\^\\|(.*)\\|\\^\\|(.*)\\|\\^\\|(.*)$",
"output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s"
)
STORED AS TEXTFILE
LOCATION '/user/dsarangi/twitterdataflat';

Using regex is cool but has its coolest pain areas also. Its a non-definitive approach and the regex has to be really solid to make sure every row renders the required number of columns. You have to create the table with the regex and then do a select on the table to know that the regex is actually working to render the columns. I use regex101 site to test my regex

Most important: When using the regex in hive use \\ (double back slash) for escaping. Single does not work. 

So when testing the regex on a online site, convert the escape character from single backslash to double backslash before using in hive query.