For some data processing we do at work, I wanted to use Hadoop Streaming to run through a file with my ruby mapper code. I’ve used ruby and Hadoop Streaming in the past, but normally just for simple scripts that didn’t require anything but the standard packages that come with every installation of ruby.
This time, I had to use some gems, and my code was a bit larger, so I got into some corners of different ruby versions (e.g. Array#to_h
only exists from ruby 2.1). While I could work out around the ruby versions and write my code so it’s compatible with ruby 1.8.7 (which is normally installed by default), I had to use the external packages and wanted to ‘do it right’. No hacks, no manual installations on the cluster and nothing that can’t be easily reproduced on other clusters (we use EMR so I spin clusters almost every day).
Thanks go to Ido Hadanny who did the same for Python and therefore can be considered as the ‘spiritual father’ of this post.
UPDATE 23/11/15: Thanks to jaycclee for letting me know that my scripts don’t run well on the new EMR-4.2.0
machines. It’s fixed now. A launch.rb
script was added to the repo to start an EMR cluster and send the example job.
For the purpose of this post, we’ll use the geokit
gem that gives us the distance between two geo locations (long+lat). We’ll run a Hadoop MR Streaming job to find for each country in the world, the city that is the closest to its geographical center (calculated from the max/min long/lat). This is obviously a stupid project that can run on my old cellphone’s CPU, but it’ll do the trick for this post. If you want to follow the steps, clone the Github repo.
Notice you’ll need ruby 2.1 or higher to run this project
Requirements
1. Enforce a specific ruby version
Replacing Amazon’s image seemed like an overkill to me, but I wanted to make sure I know what ruby version I’m running with (and since I’m living in the 21st century, I’d like it to be at least 2.1.0…)
2. Cluster should be agnostic to the jobs
First, the ‘cluster creation’ part involves CloudFormation and DataPipeline - both are parts of my system that I don’t really want to change frequently, especially not when an applicative code changes a bit and requires a new package.
Second, the same cluster serves few jobs (all doing parts of the data processing) and obvisouly I didn’t want a centric place that ‘knows’ about all dependencies and installs them on the machines.
The MR Job
The mapper code and the whole mr-job
directory are also available in the Github repo. The input file was generated by the create_input_file.rb
script you’ve just run, and has the following fields:
country_code
city
city_lat
city_long
country_center_lat
country_center_long
We’d like to generate the following output fields: country_code
, closest_city_to_center
, distance
The mapper output will have the following fields:
country_code
city
distance
Which we can later on process with a simple reducer to get what we want:
Installing Ruby On The Cluster
It’s pretty straight-forward, I’ve used EMR’s bootstrap actions to run a custom script when machines boot and used rbenv and the rbenv build plugin to install ruby.
Yes, I know.. It is kind of bundling the cluster to the jobs (two different jobs might want to run with different ruby versions), but that seemed to me like a reasonable compromise and if you really insist, you can overcome this limitation.
Packaging code and dependencies using Bundler
Bundler is a great tool for dependency management in the ruby eco-system. It’s used by rails so most developers are familiar with its concepts even without knowing (the Gemfile
file for example). If you’re not familiar with Bundler I’d recommend having a look at their website.
Jobs are required to declare their dependencies in the Gemfile
file. Then, we can use Bundler
to package the whole job, including it’s dependencies and deploy that to the hadoop machines. bundle install
takes care of resolving all dependencies and installs the missing ones and the ‘–deployment’ flag causes it to install all packages into the ./vendor/bundle
directory and not to the global directories. It also adds a ./.bundle
directory with a configuration file so bundle exec
will use the locally stored gems when running (on the cluster machines). We’ll use this feature to make sure all our dependencies are packaged together with our job when we submit it to the cluster.
Make sure to use the same ruby version on your dev machine as you use in production when packaging gems into ‘vendor/bundle’.
Install all dependencies locally:
Package the code, dependencies and configuration together:
Submitting the job to the cluster
Hadoop streaming supports making archives available to tasks, according to the documentation, it extracts files from the archives and make them accessible to the hadoop
user when running the jobs. We’ll use this feature with our tar.gz
archive so it has our code and all dependencies ready to use.
For some reason, we can’t use a mapper from inside an archive. I’m not sure about the exact reason, but that means we also want to use the ‘–files’ flag with a small script that just runs our code with bundle exec
. Here is a generic one, so something like bundler_run.sh my_app run_me.rb param1 param2
will work:
To work with EMR itself, I’ll use the AWS Ruby API, you’re free to use any other API/CLI you prefer, they all basically look the same.
First, let’s upload the relevant files to S3:
Then, create a cluster (for your convenience, the following also available as a script in the GitHub repo):
Notice: The IAM roles (EMR_DefaultRole
and EMR_EC2_DefaultRole
) can be created automatically with aws emr create-default-roles
from commandline, or manually by you for more control on permissions.
And finally, submit the job:
And just in case you’ve wondered, here is some of the output:
By the way, the country with the closest city to its center is San Marino. The city of Borgo Maggiore is about 200m from the country center.