Lets say there is a large data file and using map reduce we would like to add a row number to the data. Following would be the visual representation of the approach
SequenceZKMapReduce.java
ZooKeeperMgr.java
The following code is just representational and is not tested.
SequenceZKMapReduce.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.*; import java.util.HashMap; public class SequenceZKMapReduce { //Enum for map reduce counters public static enum SEQ_COUNTER { OUTPUT_RECORD }; //Mapper class that would output the mapper id as the key and the entire text as value //Once the mapper finishes the work, using cleanup, its going to report the number of records to zookeper public static class SequenceMapper extends Mapper<Object, Text, IntWritable, Text> { private final IntWritable ConstKey = new IntWritable(); //Intial setup //Set the mapper id as the key for the data to be emmited from this mapper. protected void setup(Context context){ ConstKey.set(context.getTaskAttemptID().getTaskID().getId()); } //Write the constant key which is the mapper id, and the value as the entire string //Increase the counter for every record processed public void map(Object key, Text value, Context context) throws IOException, InterruptedException { context.write(ConstKey, value); context.getCounter(SEQ_COUNTER.OUTPUT_RECORD).increment(1); } //This method would be called at the close of the map task. //We are going to use this method to write to zookeeper protected void cleanup(Context context) { try { ZooKeeperMgr zk = new ZooKeeperMgr(); //Connect to zookeeper, I am hardcoding the zookeeper host to localhost zk.connect("localhost"); //Get the existing data on the node and deserialize to a hashmap byte[] dt = zk.readZnode(); int ver = zk.getVersion(); HashMap<Integer, Long> h = null; //If data is not empty deserialize to get the hashmap else create a new one. if(dt!=null){ ByteArrayInputStream bis = new ByteArrayInputStream(dt); ObjectInput in = new ObjectInputStream(bis); h = (HashMap<Integer, Long>) in.readObject(); }else{ h= new HashMap<Integer, Long>(); } //Add the map key and the counter to the hashmap h.put(ConstKey.get(), context.getCounter(SEQ_COUNTER.OUTPUT_RECORD).getValue()); //Write the data back to the znode ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutput out = new ObjectOutputStream(bos); out.writeObject(h); zk.writeZnode(bos.toByteArray(), ver); // This would throw exception if version changed between read and write } catch (Exception e) { //If there is version mismatch write exception, call the method again. //To make it safe, some counter should be established. Otherwise it may get into a loop. this.cleanup(context); } } } //Reducer Class that spits out a sequence number and the text as it came public static class ReorgReducer extends Reducer<IntWritable, Text, LongWritable, Text> { HashMap<Integer, Long> h; private long sequence = 0L; private LongWritable sequenceHolder = new LongWritable(1); //In the setup phase, read the znode and extract all ranges (hashmap) protected void setup(Context context){ try { ZooKeeperMgr zk = new ZooKeeperMgr(); //Connect to zookeeper, I am hardcoding the zookeeper host to localhost zk.connect("localhost"); //Get the existing data on the node and deserialize to a hashmap byte[] dt = zk.readZnode(); ByteArrayInputStream bis = new ByteArrayInputStream(dt); ObjectInput in = new ObjectInputStream(bis); h = (HashMap<Integer, Long>) in.readObject(); }catch(Exception e){ } } public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //Get the start -1 position for the given key sequence = getStartPosition(h, key.get()); for (Text val : values) { sequence=sequence+1; sequenceHolder.set(sequence); context.write(sequenceHolder , val); } } //Given a key, this method looks for all the keys less than the given and sums all values. //the idea is if a reducer if processing data from a mapper with id 102, it will first sum up the range for all ids //less than 102 and start the sequencing after that. private long getStartPosition(HashMap<Integer, Long> ranges, int currentKey){ Long startPos=0L; for(int key : ranges.keySet()){ if(key < currentKey) startPos = startPos + ranges.get(key); } return startPos; } } //Job Runner public static int run(String inpPath, String outPath) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Add Sequence Number with help of zoo keeper"); job.setJarByClass(SequenceMapReduce.class); job.setMapperClass(SequenceMapper.class); job.setReducerClass(ReorgReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(inpPath)); FileOutputFormat.setOutputPath(job, new Path(outPath)); return job.waitForCompletion(true) ? 0 : 1; } //Main method public static void main(String args[]) throws Exception{ System.exit(run(args[0], args[1])); } }
ZooKeeperMgr.java
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.util.concurrent.CountDownLatch; import java.io.IOException; public class ZooKeeperMgr { String nodePath = "/sequenceapp"; ZooKeeper zookeeper; CountDownLatch connectStatus = new CountDownLatch(1); //Connect to the zookeeper and get the handle //This method should be called from the driver, mapper and reducer setups public void connect(String host) throws IOException, InterruptedException { zookeeper = new ZooKeeper(host, 5000, new Watcher() { public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { connectStatus.countDown(); } } }); connectStatus.await(); } //Create znode //This method should only be called from driver public void createZnode() throws KeeperException, InterruptedException { try { zookeeper.create(nodePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch(InterruptedException inex) { } catch(KeeperException kex) { } } //Read from znode. //This method to be used by mapper and reducer public byte[] readZnode() throws KeeperException, InterruptedException { byte[] dt = null; try { dt = zookeeper.getData(nodePath, true, zookeeper.exists(nodePath, true)); } catch(Exception ex) { } return dt; } //Get the znode data version //This method to be used by mappers before writing the data //Important to ensure that data is not overwrittten public int getVersion() throws KeeperException, InterruptedException { Stat stat = zookeeper.exists(nodePath, true); return stat.getVersion(); } //Write to znode //This methoid to be used only by mapper public void writeZnode(byte[] data, int version) throws KeeperException, InterruptedException { zookeeper.setData(nodePath, data, version); } //Delete znode //Method to be only called by the driver to delete the node before the app closes public void deleteNode()throws InterruptedException, KeeperException{ zookeeper.delete(this.nodePath, -1); } //Close the connection public void close() throws InterruptedException, KeeperException { zookeeper.close(); } //This method is redundant. Keeping for just in case the zookeeper instance is required. public ZooKeeper getZooKeeper() { if (zookeeper == null || !zookeeper.getState().equals(ZooKeeper.States.CONNECTED)) { throw new IllegalStateException("ZooKeeper is not connected."); } return zookeeper; } }