Monday, May 9, 2016

Generating GAPLESS SEQUENCE NUMBER in map reduce.

Lets say there is a large data file and using map reduce we would like to add a row number to the data. Following would be the visual representation of the approach


The following code is just representational and is not tested. 

SequenceZKMapReduce.java


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.*;
import java.util.HashMap;


public class SequenceZKMapReduce {

    //Enum for map reduce counters    public static enum SEQ_COUNTER {
        OUTPUT_RECORD    };

    //Mapper class that would output the mapper id as the key and the entire text as value    //Once the mapper finishes the work, using cleanup, its going to report the number of records to zookeper
    public static class SequenceMapper extends Mapper<Object, Text, IntWritable, Text> {

        private final IntWritable ConstKey = new IntWritable();

        //Intial setup        //Set the mapper id as the key for the data to be emmited from this mapper.        protected void setup(Context context){
            ConstKey.set(context.getTaskAttemptID().getTaskID().getId());
        }

        //Write the constant key which is the mapper id, and the value as the entire string        //Increase the counter for every record processed        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            context.write(ConstKey, value);
            context.getCounter(SEQ_COUNTER.OUTPUT_RECORD).increment(1);
        }

        //This method would be called at the close of the map task.        //We are going to use this method to write to zookeeper        protected void cleanup(Context context) {
            try {
                ZooKeeperMgr zk = new ZooKeeperMgr();
                //Connect to zookeeper, I am hardcoding the zookeeper host to localhost                zk.connect("localhost");
                //Get the existing data on the node and deserialize to a hashmap                byte[] dt = zk.readZnode();
                int ver = zk.getVersion();
                HashMap<Integer, Long> h = null;

                //If data is not empty deserialize to get the hashmap else create a new one.                if(dt!=null){
                    ByteArrayInputStream bis = new ByteArrayInputStream(dt);
                    ObjectInput in = new ObjectInputStream(bis);
                    h = (HashMap<Integer, Long>) in.readObject();
                }else{
                    h= new HashMap<Integer, Long>();
                }

                //Add the map key and the counter to the hashmap                h.put(ConstKey.get(), context.getCounter(SEQ_COUNTER.OUTPUT_RECORD).getValue());
                //Write the data back to the znode                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                ObjectOutput out = new ObjectOutputStream(bos);
                out.writeObject(h);
                zk.writeZnode(bos.toByteArray(), ver); // This would throw exception if version changed between read and write
            } catch (Exception e) {
                //If there is version mismatch write exception, call the method again.                //To make it safe, some counter should be established. Otherwise it may get into a loop.                this.cleanup(context);
            }
        }

    }

    //Reducer Class that spits out a sequence number and the text as it came    public static class ReorgReducer extends Reducer<IntWritable, Text, LongWritable, Text> {

        HashMap<Integer, Long> h;
        private long sequence = 0L;
        private LongWritable sequenceHolder = new LongWritable(1);

        //In the setup phase, read the znode and extract all ranges (hashmap)        protected void setup(Context context){
            try {
                ZooKeeperMgr zk = new ZooKeeperMgr();
                //Connect to zookeeper, I am hardcoding the zookeeper host to localhost                zk.connect("localhost");
                //Get the existing data on the node and deserialize to a hashmap                byte[] dt = zk.readZnode();
                ByteArrayInputStream bis = new ByteArrayInputStream(dt);
                ObjectInput in = new ObjectInputStream(bis);
                h = (HashMap<Integer, Long>) in.readObject();
            }catch(Exception e){

            }
        }

        public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //Get the start -1 position for the given key            sequence = getStartPosition(h, key.get());

            for (Text val : values) {
                sequence=sequence+1;
                sequenceHolder.set(sequence);
                context.write(sequenceHolder , val);
            }
        }

        //Given a key, this method looks for all the keys less than the given and sums all values.        //the idea is if a reducer if processing data from a mapper with id 102, it will first sum up the range for all ids        //less than 102 and start the sequencing after that.        private long getStartPosition(HashMap<Integer, Long> ranges, int currentKey){
            Long startPos=0L;
            for(int key : ranges.keySet()){
                if(key < currentKey) startPos = startPos + ranges.get(key);
            }
            return startPos;
        }


    }

    //Job Runner    public static int run(String inpPath, String outPath) throws Exception{

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Add Sequence Number with help of zoo keeper");

        job.setJarByClass(SequenceMapReduce.class);
        job.setMapperClass(SequenceMapper.class);
        job.setReducerClass(ReorgReducer.class);

        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(inpPath));
        FileOutputFormat.setOutputPath(job, new Path(outPath));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    //Main method    public static void main(String args[]) throws Exception{
        System.exit(run(args[0], args[1]));
    }
}


ZooKeeperMgr.java
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;

import java.io.IOException;

public class ZooKeeperMgr {

    String nodePath = "/sequenceapp";
    ZooKeeper zookeeper;
    CountDownLatch connectStatus = new CountDownLatch(1);


    //Connect to the zookeeper and get the handle    //This method should be called from the driver, mapper and reducer setups    public void connect(String host) throws IOException, InterruptedException {
        zookeeper = new ZooKeeper(host, 5000,
                new Watcher() {
                    public void process(WatchedEvent event) {
                        if (event.getState() == Event.KeeperState.SyncConnected) {
                            connectStatus.countDown();
                        }
                    }
                });
        connectStatus.await();
    }

    //Create znode    //This method should only be called from driver    public void createZnode() throws KeeperException, InterruptedException {
        try {
            zookeeper.create(nodePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch(InterruptedException inex) {
        } catch(KeeperException kex) {
        }
    }

    //Read from znode.    //This method to be used by mapper and reducer    public byte[] readZnode() throws KeeperException, InterruptedException {
        byte[] dt = null;
        try {
            dt = zookeeper.getData(nodePath, true, zookeeper.exists(nodePath, true));
        } catch(Exception ex) {
        }
        return dt;
    }

    //Get the znode data version    //This method to be used by mappers before writing the data    //Important to ensure that data is not overwrittten    public int getVersion() throws KeeperException, InterruptedException {
        Stat stat = zookeeper.exists(nodePath, true);
        return stat.getVersion();
    }

    //Write to znode    //This methoid to be used only by mapper    public void writeZnode(byte[] data, int version) throws KeeperException, InterruptedException {
        zookeeper.setData(nodePath, data, version);
    }

    //Delete znode    //Method to be only called by the driver to delete the node before the app closes    public void deleteNode()throws InterruptedException, KeeperException{
        zookeeper.delete(this.nodePath, -1);
    }


    //Close the connection    public void close() throws InterruptedException, KeeperException {
        zookeeper.close();
    }

    //This method is redundant. Keeping for just in case the zookeeper instance is required.    public ZooKeeper getZooKeeper() {
        if (zookeeper == null || !zookeeper.getState().equals(ZooKeeper.States.CONNECTED)) {
            throw new IllegalStateException("ZooKeeper is not connected.");
        }
        return zookeeper;
    }

}

Hello Hive UDF: Unique Identifier Generator using Hive UDF

In RDBMS world, we could generate unique numbers using sequences and other SQL approaches. In Hive a generated unique number should be unique in a distributed environment.

Trying to generate a unique number across nodes that synchronize data across nodes can be really slow. The alternative is to generate unique numbers on individual nodes in a way that the generated number is confirmed to be unique across nodes.

One of the approach that I have used, is to use a Hive UDF that generates unique numbers on individual nodes by using a combination of

  1. Node IP Address
  2. Job Start Timestamp
  3. Sequence number generated on the node
Following is a sample code. This code is not tested and possibly may be having some typos somewhere. The intent is to provide the approach

UniqueNumberGenerator.java

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;



@UDFType(deterministic = false, stateful = true)
public class UniqueNumberGenerator extends GenericUDF {

    private transient Long sequence;
    private transient String prefix;


    //Gets called once    @Override    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        // per-instance fields initialization        prefix = getPrefix();
        sequence = 0L;
        return PrimitiveObjectInspectorFactory.javaLongObjectInspector;
    }


    //Gets called on each call to function    @Override    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        sequence = sequence + 1;
        return Long.parseLong(prefix + sequence);
    }

    @Override    public String getDisplayString(String[] children) {
        return getStandardDisplayString("unique_number", children);
    }



    //Following are support functions
    //Create number prefix    public String getPrefix() {
        return getStartingTimestamp() + getLocalIPFormatted();
    }


    //Returns the last 2 parts of IP address left padded to 3 number with tota 6 digits    //The last 2 parts are chose to introduce maximum variability    public String getLocalIPFormatted() {
        InetAddress addr = null;
        String ip = null;

        try {
            addr = InetAddress.getLocalHost();
            ip = addr.getHostAddress();
        } catch (UnknownHostException e) {
            ip = "111.111.111.111";
        }

        String[] parts = ip.split("\\.");
        //Create number by reverse concatenation of the ip parts        return String.format("%3s", parts[3]).replace(' ', '0') + String.format("%3s", parts[2]).replace(' ', '0');
    }


    //Get the system time in millis and left pad with zeros to make the number future proof    public String getStartingTimestamp() {
        return String.format("%-13d0", System.currentTimeMillis()).replace(' ', '0');
    }

}





The key things to note are the parameters for UDFType (deterministic = false and stateful = true)

Deterministic : Evaluates once and uses across rows. Setting to false instructs the function to be called for each row
Stateful: Instructs to maintain the state of variables between UDF calls for each row. This is how the sequence number is incremented 


Why the variables are transient? : We don't want any variable value to persist if Hive serialize the UDF object and throws it across nodes.