Use Ruby Gems With Hadoop Streaming

For some data processing we do at work, I wanted to use Hadoop Streaming to run through a file with my ruby mapper code. I've used ruby and Hadoop Streaming in the past, but normally just for simple scripts that didn't require anything but the standard packages that come with every installation of ruby.

This time, I had to use some gems, and my code was a bit larger, so I got into some corners of different ruby versions (e.g. Array#to_h only exists from ruby 2.1). While I could work out around the ruby versions and write my code so it's compatible with ruby 1.8.7 (which is normally installed by default), I had to use the external packages and wanted to 'do it right'. No hacks, no manual installations on the cluster and nothing that can't be easily reproduced on other clusters (we use EMR so I spin clusters almost every day).

Thanks go to Ido Hadanny who did the same for Python and therefore can be considered as the 'spiritual father' of this post.

UPDATE 23/11/15: Thanks to jaycclee for letting me know that my scripts don't run well on the new EMR-4.2.0 machines. It's fixed now. A launch.rb script was added to the repo to start an EMR cluster and send the example job.


For the purpose of this post, we'll use the geokit gem that gives us the distance between two geo locations (long+lat). We'll run a Hadoop MR Streaming job to find for each country in the world, the city that is the closest to its geographical center (calculated from the max/min long/lat). This is obviously a stupid project that can run on my old cellphone's CPU, but it'll do the trick for this post. If you want to follow the steps, clone the Github repo.

Notice you'll need ruby 2.1 or higher to run this project

$ git clone git@github.com:zachmoshe/zachmoshe.com-use-ruby-gems-with-hadoop-streaming.git
...
$ cd zachmoshe.com-use-ruby-gems-with-hadoop-streaming
$ ./download_datasources.sh
...
$ ./create_input_file.rb
Done. input.txt is ready

Requirements

1. Enforce a specific ruby version

Replacing Amazon's image seemed like an overkill to me, but I wanted to make sure I know what ruby version I'm running with (and since I'm living in the 21st century, I'd like it to be at least 2.1.0...)

2. Cluster should be agnostic to the jobs

First, the 'cluster creation' part involves CloudFormation and DataPipeline - both are parts of my system that I don't really want to change frequently, especially not when an applicative code changes a bit and requires a new package.

Second, the same cluster serves few jobs (all doing parts of the data processing) and obvisouly I didn't want a centric place that 'knows' about all dependencies and installs them on the machines.

The MR Job

The mapper code and the whole mr-job directory are also available in the Github repo. The input file was generated by the create_input_file.rb script you've just run, and has the following fields:

  1. country_code
  2. city
  3. city_lat
  4. city_long
  5. country_center_lat
  6. country_center_long

We'd like to generate the following output fields: country_code, closest_city_to_center, distance

mapper.rb:

#! /usr/bin/env ruby
gem 'geokit', '1.5.0'  # later versions have some problem with OpenSSL which are not related to this demonstration
require 'geokit'

ARGF.each_line do |line|
    country_code, city, city_lat, city_long, country_center_lat, country_center_long = line.split("\t")
    distance = GeoKit::LatLng.distance_between [city_lat, city_long], [country_center_lat, country_center_long], {units: :kms}
    puts "#{country_code}\t#{city}\t#{distance}"
end

The mapper output will have the following fields:

  1. country_code
  2. city
  3. distance

Which we can later on process with a simple reducer to get what we want:

Installing Ruby On The Cluster

It's pretty straight-forward, I've used EMR's bootstrap actions to run a custom script when machines boot and used rbenv and the rbenv build plugin to install ruby.

bootstrap.sh:

#! /bin/bash

# first, install git and clone rbenv and rbenv-build
sudo yum install git readline-devel -y
git clone https://github.com/sstephenson/rbenv.git ~/.rbenv
git clone https://github.com/sstephenson/ruby-build.git ~/.rbenv/plugins/ruby-build

echo '' >> ~/.bash_profile
echo 'export PATH="$HOME/.rbenv/bin:$PATH"' >> ~/.bash_profile
echo 'eval "$(rbenv init -)"' >> ~/.bash_profile

source ~/.bash_profile

# install ruby and bundler
rbenv install 2.2.3
rbenv global 2.2.3

gem install bundler

Yes, I know.. It is kind of bundling the cluster to the jobs (two different jobs might want to run with different ruby versions), but that seemed to me like a reasonable compromise and if you really insist, you can overcome this limitation.

Packaging code and dependencies using Bundler

Bundler is a great tool for dependency management in the ruby eco-system. It's used by rails so most developers are familiar with its concepts even without knowing (the Gemfile file for example). If you're not familiar with Bundler I'd recommend having a look at their website.

Jobs are required to declare their dependencies in the Gemfile file. Then, we can use Bundler to package the whole job, including it's dependencies and deploy that to the hadoop machines. bundle install takes care of resolving all dependencies and installs the missing ones and the '--deployment' flag causes it to install all packages into the ./vendor/bundle directory and not to the global directories. It also adds a ./.bundle directory with a configuration file so bundle exec will use the locally stored gems when running (on the cluster machines). We'll use this feature to make sure all our dependencies are packaged together with our job when we submit it to the cluster.

Make sure to use the same ruby version on your dev machine as you use in production when packaging gems into 'vendor/bundle'.

Install all dependencies locally:

$ cd mr-job
$ bundle install --deployment
 Using builder 3.2.2
 Using multi_json 1.10.1
 ... (more gems)
 Your bundle is complete!
 It was installed into ./vendor/bundle

Package the code, dependencies and configuration together:

(inside mr-job directory)
$ tar zcf ../mr-job.tar.gz .

Submitting the job to the cluster

Hadoop streaming supports making archives available to tasks, according to the documentation, it extracts files from the archives and make them accessible to the hadoop user when running the jobs. We'll use this feature with our tar.gz archive so it has our code and all dependencies ready to use.

For some reason, we can't use a mapper from inside an archive. I'm not sure about the exact reason, but that means we also want to use the '--files' flag with a small script that just runs our code with bundle exec. Here is a generic one, so something like bundler_run.sh my_app run_me.rb param1 param2 will work:

bundler_run.sh:

#! /bin/bash

# For some reason the HOME env variable is set to '/home/' (while user is 'hadoop')
# It also seems that something has changed since past versions and .bash_profile isn't run
HOME=/home/$USER
. $HOME/.bash_profile

APP_NAME=$1
EXEC=$2
shift 2
PARAMS=$@

cd $APP_NAME
bundle exec ./$EXEC $PARAMS

To work with EMR itself, I'll use the AWS Ruby API, you're free to use any other API/CLI you prefer, they all basically look the same.

First, let's upload the relevant files to S3:

$ aws s3 cp bootstrap.sh s3://<YOUR BUCKET>/
$ aws s3 cp bundler_run.sh s3://<YOUR BUCKET>/
$ aws s3 cp mr-job.tar.gz s3://<YOUR BUCKET>/
$ aws s3 cp input.txt s3://<YOUR BUCKET>/

Then, create a cluster (for your convenience, the following also available as a script in the GitHub repo):

Notice: The IAM roles (EMR_DefaultRole and EMR_EC2_DefaultRole) can be created automatically with aws emr create-default-roles from commandline, or manually by you for more control on permissions.

require 'aws-sdk-core'

...SET VARIABLES...

Aws.config[:region] = 'eu-west-1'
Aws.config[:credentials] = ...

emr = Aws::EMR::Client.new

res = emr.run_job_flow(
    name: 'my-test-cluster',
    service_role: 'EMR_DefaultRole',
    job_flow_role: 'EMR_EC2_DefaultRole',
    release_label: 'emr-4.2.0',
    log_uri: LOG_URI,

    instances: {
        keep_job_flow_alive_when_no_steps: true,
        instance_count: 1,
        master_instance_type: 'm3.xlarge',
        slave_instance_type: 'm3.xlarge',
        ec2_key_name: EC2_KEY_NAME,
    },
    bootstrap_actions: [
        {
            name: 'Installing Ruby',
            script_bootstrap_action: { path: "s3://#{S3_BUCKET}/bootstrap.sh", args: [] },
        }
    ],
    steps: [
        {
            name: 'Setup Hadoop Debugging',
            action_on_failure: 'TERMINATE_CLUSTER',
            hadoop_jar_step: {
                jar: 'command-runner.jar',
                args: [ "state-pusher-script" ]
            },
        }
    ],
)
cluster_id = res.job_flow_id

And finally, submit the job:

emr.add_job_flow_steps(
    job_flow_id: cluster_id,
    steps: [
        {
            name: 'Running mr-job',
            action_on_failure: 'CONTINUE',
            hadoop_jar_step: {
                jar: '/usr/lib/hadoop/hadoop-streaming.jar',
                args:[
                    "-files", "s3://#{S3_BUCKET}/bundler_run.sh",
                    "-archives", "s3://#{S3_BUCKET}/mr-job.tar.gz#app",
                    "-mapper", "bash bundler_run.sh app ./mapper.rb",
                    "-reducer", "bash bundler_run.sh app ./reducer.rb",
                    "-input", "s3://#{S3_BUCKET}/input.txt",
                    "-output", "s3://#{S3_BUCKET}/output.txt"
                ],
            },
        },
    ]
)

And just in case you've wondered, here is some of the output:

AE  english,Abu Dhabi   41.746498331523206
AT  latin,Sankt Wolfgang    8.356556799708905
AW  latin,Oranjestad    7.424982490226065
AZ  latin,Zərdab   7.420578362453282
BD  latin,Munshiganj    6.89075419081171
BG  latin,Kazanlŭk 14.128393348556308
BJ  latin,Parakou   35.20297750381964
BM  latin,Brackish Pond 2.101755422455864
...

By the way, the country with the closest city to its center is San Marino. The city of Borgo Maggiore is about 200m from the country center.

comments powered by Disqus