Tuesday, May 27, 2014

Map Reduce Program: Loading data from HDFS to HBase


This blog talks on - How to load the data from HDFS to HBase using the Map Reduce API.

Prerequisite:
1) Dataset used in this example: http://files.grouplens.org/datasets/movielens/ml-100k/u.user
2) Download the u.user filr and upload to HDFS.
3) Create an HBase Table - create 'User', 'usr'
4) Required jars : hadoop-core-2.2.0.jar & hbase-0.96.0.jar

This program loads the user profile from HDFS to HBast Table (User). The file u.user stores  demographic information abt the user in a tab separated list

user id | age | gender | occupation | zip code

Sample Data:
1|24|M|technician|85711
 2|24|M|Scientist|85712

In Mapper class, we get the data using org.apache.hadoop.io.Text, Split it using the | and use Put API (org.apache.hadoop.hbase.client.Put) to put it to HBase Table.

Mapper Code:

package com.test.HBaseLoadFromHDFS;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class HBaseLoadFromHDFSMapper extends
        Mapper<IntWritable, Text, ImmutableBytesWritable, Put> {

    @Override
    protected void map(
            IntWritable key,
            Text value,
            org.apache.hadoop.mapreduce.Mapper<IntWritable, Text, ImmutableBytesWritable, Put>.Context context)
            throws IOException, InterruptedException {

        String line = value.toString();
        // Sample line is - 1|24|M|technician|85711 (user id | age | gender |
        // occupation | zip code )
        String fields[] = line.split("|");
      
        byte row[] = Bytes.toBytes(Integer.parseInt(fields[0]));
      
        Put put = new Put(row);
        put.add(Bytes.toBytes("usr"), Bytes.toBytes("age"),
                Bytes.toBytes(Integer.parseInt(fields[1])));
        put.add(Bytes.toBytes("usr"), Bytes.toBytes("gender"),
                Bytes.toBytes(fields[2]));
        put.add(Bytes.toBytes("usr"), Bytes.toBytes("occupation"),
                Bytes.toBytes(fields[3]));
        put.add(Bytes.toBytes("usr"), Bytes.toBytes("zipCode"),
                Bytes.toBytes(Integer.parseInt(fields[4])));

        context.write(new ImmutableBytesWritable(row), put);

    }

}
We do not need a Reducer for this use case. We mention the HDFS Path for u.usr file and HBase Table as part of the Main code.

Main Code:

package com.test.HBaseLoadFromHDFS;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * Loads the data from CSV to HBase table
 *
 * @author Nisanth Simon
 *
 */
public class HBaseLoadFromHDFSMain {

    public static void main(String arg[]) throws Exception {

        String hdfsInputPath = "/home/usr1/u.user";

        Configuration config = new Configuration();
        Job job = new Job(config, "Loading the User Profile");
        job.setJarByClass(HBaseLoadFromHDFSMain.class);

        FileInputFormat.setInputPaths(job, new Path(hdfsInputPath));

        job.setInputFormatClass(TextInputFormat.class);

        job.setOutputFormatClass(TextOutputFormat.class);

        job.setMapperClass(HBaseLoadFromHDFSMapper.class);

        TableMapReduceUtil.initTableReducerJob("User", null, job);

        job.setNumReduceTasks(0);

        job.waitForCompletion(true);

    }

}
In my next Blog, I will cover How to copy data from one HBase Table to other HBase Table


Wednesday, May 21, 2014

Flume Errors and Fixes

This Blog talks on error related to Flume

Issue 1:  Loading data using Flume 1.3.1 to HBase 0.96.x throws  "The znode for the -ROOT- region doesn't exist!"

When we use Flume 1.3.1 to ingest data to HBase using org.apache.flume.sink.hbase. AsyncHBaseSink, we get the error

20 Apr 2014 10:08:50,069 ERROR [lifecycleSupervisor-1-1-EventThread] (org.hbase.async.HBaseClient$ZKClient$2.processResult:2403) - The znode for the
ROOTregion doesn't exist!

Fix:  Upgrade Asynchbase to 1.5.0