For some data processing we do at work, I wanted to use Hadoop Streaming to run through a file with my ruby mapper code. I’ve used ruby and Hadoop Streaming in the past, but normally just for simple scripts that didn’t require anything but the standard packages that come with every installation of ruby.
This time, I had to use some gems, and my code was a bit larger, so I got into some corners of different ruby versions (e.g.
Array#to_h only exists from ruby 2.1). While I could work out around the ruby versions and write my code so it’s compatible with ruby 1.8.7 (which is normally installed by default), I had to use the external packages and wanted to ‘do it right’. No hacks, no manual installations on the cluster and nothing that can’t be easily reproduced on other clusters (we use EMR so I spin clusters almost every day).
Thanks go to Ido Hadanny who did the same for Python and therefore can be considered as the ‘spiritual father’ of this post.
UPDATE 23/11/15: Thanks to jaycclee for letting me know that my scripts don’t run well on the new
machines. It’s fixed now. A
launch.rb script was added to the repo to start an EMR cluster and send the example job.
For the purpose of this post, we’ll use the
geokit gem that gives us the distance between two geo locations (long+lat). We’ll run a Hadoop MR Streaming job to find for each country in the world, the city that is the closest to its geographical center (calculated from the max/min long/lat). This is obviously a stupid project that can run on my old cellphone’s CPU, but it’ll do the trick for this post. If you want to follow the steps, clone the Github repo.
Notice you’ll need ruby 2.1 or higher to run this project
1. Enforce a specific ruby version
Replacing Amazon’s image seemed like an overkill to me, but I wanted to make sure I know what ruby version I’m running with (and since I’m living in the 21st century, I’d like it to be at least 2.1.0…)
2. Cluster should be agnostic to the jobs
First, the ‘cluster creation’ part involves CloudFormation and DataPipeline - both are parts of my system that I don’t really want to change frequently, especially not when an applicative code changes a bit and requires a new package.
Second, the same cluster serves few jobs (all doing parts of the data processing) and obvisouly I didn’t want a centric place that ‘knows’ about all dependencies and installs them on the machines.
The MR Job
The mapper code and the whole
mr-job directory are also available in the Github repo. The input file was generated by the
create_input_file.rb script you’ve just run, and has the following fields:
We’d like to generate the following output fields:
The mapper output will have the following fields:
Which we can later on process with a simple reducer to get what we want:
Installing Ruby On The Cluster
It’s pretty straight-forward, I’ve used EMR’s bootstrap actions to run a custom script when machines boot and used rbenv and the rbenv build plugin to install ruby.
Yes, I know.. It is kind of bundling the cluster to the jobs (two different jobs might want to run with different ruby versions), but that seemed to me like a reasonable compromise and if you really insist, you can overcome this limitation.
Packaging code and dependencies using Bundler
Bundler is a great tool for dependency management in the ruby eco-system. It’s used by rails so most developers are familiar with its concepts even without knowing (the
Gemfile file for example). If you’re not familiar with Bundler I’d recommend having a look at their website.
Jobs are required to declare their dependencies in the
Gemfile file. Then, we can use
Bundler to package the whole job, including it’s dependencies and deploy that to the hadoop machines.
bundle install takes care of resolving all dependencies and installs the missing ones and the ‘–deployment’ flag causes it to install all packages into the
./vendor/bundle directory and not to the global directories. It also adds a
./.bundle directory with a configuration file so
bundle exec will use the locally stored gems when running (on the cluster machines). We’ll use this feature to make sure all our dependencies are packaged together with our job when we submit it to the cluster.
Make sure to use the same ruby version on your dev machine as you use in production when packaging gems into ‘vendor/bundle’.
Install all dependencies locally:
Package the code, dependencies and configuration together:
Submitting the job to the cluster
Hadoop streaming supports making archives available to tasks, according to the documentation, it extracts files from the archives and make them accessible to the
hadoop user when running the jobs. We’ll use this feature with our
tar.gz archive so it has our code and all dependencies ready to use.
For some reason, we can’t use a mapper from inside an archive. I’m not sure about the exact reason, but that means we also want to use the ‘–files’ flag with a small script that just runs our code with
bundle exec. Here is a generic one, so something like
bundler_run.sh my_app run_me.rb param1 param2 will work:
To work with EMR itself, I’ll use the AWS Ruby API, you’re free to use any other API/CLI you prefer, they all basically look the same.
First, let’s upload the relevant files to S3:
Then, create a cluster (for your convenience, the following also available as a script in the GitHub repo):
Notice: The IAM roles (
EMR_EC2_DefaultRole) can be created automatically with
aws emr create-default-roles from commandline, or manually by you for more control on permissions.
And finally, submit the job:
And just in case you’ve wondered, here is some of the output:
By the way, the country with the closest city to its center is San Marino. The city of Borgo Maggiore is about 200m from the country center.