Hadoop through Examples : Finding User Access Info from Logs

In most of Machine Learning problems we have to deal with large data-sets. This generally requires some kind of Distributed Computing platform. One of the leaders in this space is Hadoop. In brief, we can consider Hadoop as an ecosystem providing us a layer of abstraction to easily and reliably perform large-scale distributed computing. Though Hadoop infrastructure consists of several components, I would like to introduce you to couple of its core components: HDFS and MapReduce and how we can solve complex problems in an elegant manner.

Problem Context We have a social web application accessed by large number of users. We need to determine number of users and numbers of times these users accessed the system within a given time period. We have information about any user login along with the time of their access in our log files. For easy understanding, let’s take this sample log file :

user1 123
user2 123
user3 123
user4 124
user5 124
user6 125
user7 125
user8 125
....

Every line in the log file represents a successful login by a given user along with System time in milliseconds. It’s not really important, how we may be storing the information in our log files and we will see a bit later, that it’s just a matter of few lines of code changes to adjust to different formats.

Expected Output We would be providing startTime and endTime between which we want to have desired information. An expected output would be something like:

user4	2
user6	2
user7	1
user8	1
user9	1
....

Note : All the steps and code mentioned below have been tested to work on Hadoop 1.0.3 (the latest stable release) at the time of writing the blog. Changes may be required while working with other versions

Step1 Download and Install Hadoop. You can find complete information about the same here

Step2 Verify Installation and run a simple wordcount example by following instructions available here

Step3 Copy your log files to Hadoop HDFS file system. Let’s say you have files accesslog1 and accesslog2 in your current working directory which you want processed.

hadoop fs -ls #Lists the current directory tree of your HDFS file system
-rw-r--r--   1 nkumar supergroup         29 2012-07-19 17:43 /user/nkumar/README
drwxr-xr-x   - nkumar supergroup          0 2012-07-20 10:48 /user/nkumar/id.out
drwxr-xr-x   - nkumar supergroup          0 2012-07-19 13:46 /user/nkumar/input
-rw-r--r--   1 nkumar supergroup         10 2012-08-09 09:53 /user/nkumar/test.txt
drwxr-xr-x   - nkumar supergroup          0 2012-07-19 17:42 /user/nkumar/test2

hadoop fs -mkdir /user/nkumar/mapreduce #Creates a directory under top level folder of HDFS file system. /user/nkumar will need to be changed according to your directory structure

hadoop fs -mkdir /user/nkumar/mapreduce/useraccesslog/input #Creates a folder for storing information related to this program

hadoop fs -mkdir /user/nkumar/mapreduce/useraccesslog/input #Creates a folder for storing your accesslog files to be processed later

hadoop fs -put accesslog1 /user/nkumar/mapreduce/useraccesslog/input/ #Copies file accesslog1 from current working directory to HDFS at specified directory

hadoop fs -put accesslog2 /user/nkumar/mapreduce/useraccesslog/input/ #Copies file accesslog2 from current working directory to HDFS at specified directory

hadoop fs -ls /user/nkumar/mapreduce/useraccesslog/input #Should look something like below

-rw-r--r--   1 nkumar supergroup        233 2012-08-01 19:53 /user/nkumar/mapreduce/useraccesslog/input/accesslog1
-rw-r--r--   1 nkumar supergroup        100 2012-08-01 19:53 /user/nkumar/mapreduce/useraccesslog/input/accesslog2

Step4: Implement Mapper As you might be aware, MapReduce paradigm consists of breaking down the problem in 2 broad steps : Map and Reduce. As this blog is not an explanation of MapReduce, I will just share how different components are implemented here. For detailed introduction about MapReduce, best place to get started would be a tutorial here

  public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> {
    private Text word = new Text();
    public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException {
      String line = value.toString();
      String[] inputArray = line.split("\\s+");
      word.set(inputArray[0]);
      output.collect(word, new LongWritable(Long.parseLong(inputArray[1])));
    }
  }

Mapper is the first step of any MapReduce program. It gets the contents to work upon from Hadoop and emits Key-Value pairs as output. The job of distribution of files, reliability and fail-overs are the responsibility of the platform. This is the real power of Hadoop as it lets us concentrate on implementing the business logic without getting worried about underlying details of distribution and scalability. In our case, Mapper gets the contents of the uploaded files with one line at a time. As you might recall, each line has user name and his/her access time separated by white space. We split the contents of the line across white space and provide it to OutputCollector in the form of Key-Value pairs. So typical output of our Map program would be :

user1 123
user2 123
user18 127
....

Step5: Implement Reducer Reducer will receive output of one/multiple Map programs as input and results in key-value pairs as it’s output. Here is our implementation of Reducer:


  public static class Reduce extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> {

    public static long startTime;
    public static long endTime;
    
    @Override
    public void configure(JobConf jobConf) {
      startTime = Long.parseLong(jobConf.get("startTime"));
      endTime = Long.parseLong(jobConf.get("endTime"));
    }
    
    public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        long userLoggedInTime = values.next().get();
        if (userLoggedInTime >= startTime && userLoggedInTime <= endTime) {
          ++sum;
        }
      }
      if (sum > 0) {
        output.collect(key, new LongWritable(sum));
      }
    }
  }

For a given key which in our case is username, we can have 0 or more values i.e. one or more time-stamps when user has accessed the system. As the user-access information can spread across multiple files and thus multiple map outputs, it’s again the responsibility of Hadoop platform to combine the information together and give it to Reducer. Imagine, user1 has accessed the system 3 times at 123, 126 and 127. Following will be the information received by Reducer:

user1 123, 126, 127

You can see from the code snippet above, we reject the values which are less than the startTime or greater than the endTime. startTime and endTime are the parameters passed to the program which can be retrieved by Reducer through configure method. So typical output of reducer would be :

user1 2
user3 1
user5 7
...

As mentioned earlier, depending upon the way information is stored in our log files, we can easily change the implementation of our Reducer to parse the incoming values for e.g. Date instead of System time and the entire application will behave accordingly.

Step6: Job Configuration Everything in a MapReduce paradigm is defined in terms of Jobs and tasks. A job can consist of one or multiple Map/Reduce tasks. We need to configure a job appropriately by passing relevant parameters. Typical configuration parameters of a Job are Map/Reduce classes which will be executing the tasks, Input/Output Format Map/Reduce classes will be working upon and key-value pair types they will be producing. Along with this, we can pass parameters to our Mapper/Reducers through JobConfiguration. In our case, we have provided startTime and endTime which get used in our Reducer implementation. Code snippet of our Job Configuration is:


    JobConf conf = new JobConf(UserAccessCount.class);

    if (args.length != 4) {
      System.err.println("startTime and endTime parameters must be specified");
      System.exit(0);      
    }

    String startTime = args[2];
    String endTime = args[3];

    conf.set("startTime", startTime);
    conf.set("endTime", endTime);
    conf.setJobName("useraccesscount");

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(LongWritable.class);

    conf.setMapperClass(Map.class);
    conf.setReducerClass(Reduce.class);

    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    JobClient.runJob(conf);

Step7: Execution and Getting Results We now need to compile together all these components and execute the program on our Hadoop environment. After successful execution, you will get results in a text file again stored in HDFS file-system. It would look something like:

user1	2
user10	1
user11	1
user12	1
user13	1
user14	1
user15	2
...

Entire code along with sample user access log files and README containing instructions about executing the program are provided at GitHub Repo here. Feel free to check it out and let me know your comments or if you face any issues.

Conclusion Hadoop is a very powerful platform which can allow us solve problems involving large data in a simple and elegant manner. Though understanding of its different components and concepts have a steep learning curve for newcomers to Distributed Computing. But it’s a powerful tool while dealing with several Machine Learning problems. There is another very useful library Apache-Mahout which has been designed with Hadoop and distributed computing in mind. It combines different Machine Learning algorithms and leverages Hadoop’s distributed computing capabilities.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s