7/1/11

Gratuitous Hadoop: Stress Testing on the Cheap with Hadoop Streaming and EC2

January 6th, 2011 by Boris Shimanovsky

Things have a funny way of working out this way. A couple features were pushed back from a previous release and some last minute improvements were thrown in, and suddenly we found ourselves dragging out a lot more fresh code in our release than usual. All this the night before one of our heavy API users was launching something of their own. They were expecting to hit us thousands of times a second and most of their calls touched some piece of code that hadn’t been tested in the wild. Ordinarily, we would soft launch and put the system through its paces. But now we had no time for that. We really wanted to hammer the entire stack, yesterday, and so we couldn’t rely on internal compute resources.

Typically, people turn to a service for this sort of thing but for the load we wanted, they charge many hundreds of dollars. It was also short notice and after hours, so there was going to be some sort of premium to boot.

At first, I thought we should use something like JMeter from some EC2 machines. However, we didn’t have anything set up for that. What were we going to do with the stats? How would we synchronize starting and stopping? It just seemed like this path was going to take a while.

I wanted to go to bed. Soon.

We routinely run Hadoop jobs on EC2, so everything was already baked to launch a bunch of machines and run jobs. Initially, it seemed like a silly idea, but when the hammer was sitting right there, I saw nails. I Googled it to see if anyone had tried. Of course not. Why would they? And if they did, why admit it? Perhaps nobody else found themselves on the far right side of the Sleep vs Sensitivity-to-Shame scale.
So, it was settled — Hadoop it is! I asked my colleagues to assemble the list of test URLs. Along with some static stuff (no big deal) and a couple dozen basic pages, we had a broad mixture of API requests and AJAX calls we needed to simulate. For the AJAX stuff, we simply grabbed URLs from the Firebug console. Everything else was already in tests or right on the surface, so we’d have our new test set in less than an hour. I figured a few dozen lines of ruby code using Hadoop streaming could probably do what I had in mind.

I’ve read quite a few post-mortems that start off like this, but read on, it turns out alright.

Hadoop Streaming
Hadoop Streaming is a utility that ships with Hadoop that works with pretty much any language. Any executable that reads from stdin and writes to stdout can be a mapper or reducer. By default, they read line-by-line with the bytes up to the first tab character representing the key and any remainder becomes the value. It’s a great resource and bridges the power of Hadoop with the ease of quick scripts. We use it a lot when we need to scale out otherwise simple jobs.

Designing Our Job
We had just two basic requirements for our job:
• Visit URLs quickly
• Produce response time stats

Input File
The only input in this project is a list of URLs — only keys and no values. The job would have to run millions of URLs through the process to sustain the desired load for the desired time but we only had hundreds of calls that needed testing. First, we wanted to skew the URL frequency towards the most common calls. To do that, we just put those URLs in multiple times. Then we wanted to shuffle them for better distribution. Finally, we just needed lots of copies.
for i in {1..10000}; do sort -R < sample_urls_list.txt; done > full_urls_list.txt

Mapper
The mapper was going to do most of the work for us. It needed to fetch URLs as quickly as possible and record the elapsed time for each request. Hadoop processes definitely have overhead and even though each EC2 instance could likely be fetching hundreds of URLs at once, it couldn’t possibly run hundreds of mappers. To get past this issue, we had two options: 1) just launch more machines and under-utilize them or 2) fetch lots of URLs concurrently with each mapper. We’re trying not to needlessly waste money, so #1 is out.

I had used the curb gem (libcurl bindings for ruby) on several other projects and it worked really well. It turns out that it was going to be especially helpful here since it has a Multi class which can run concurrent requests each with blocks that function essentially as callbacks. With a little hackery, it could be turned into a poor/lazy/sleep-deprived man’s thread pool.

The main loop:
@curler.perform do
flush_buffer!
STDIN.take(@concurrent_requests-@curler.requests.size).each do |url|
add_url(url.chomp)
end
end


Blocks for success and failure:
curl.on_success do
if errorlike_content?(curl.body_str)
log_error(curl.url,'errorlike content')
else
@response_buffer<<[curl.url,Time.now-start_time]
end
end
curl.on_failure do |curl_obj,args|
error_type = args.first.to_s if args.is_a?(Array)
log_error(curl.url,error_type)
end


As you can see, each completion calls a block that outputs the URL and the number of seconds it took to process the request. A little healthy paranoia about thread safety resulted in the extra steps of buffering and flushing output — this would ensure we don’t interleave output coming from multiple callbacks.

If there is an error, the mapper will just emit the URL without an elapsed time as a hint to the stats aggregator. In addition, it uses the ruby “warn” method to emit a line to stderr. This increments a built-in Hadoop counter mechanism that watches stderr for messages in the following format:
reporter:counter:[group],[counter],[amount]
in this case the line is:
reporter:counter:error,[errortype],1
This is a handy way to report what’s happening while a job is in progress and is surfaced through the standard Hadoop job web interface.

Mapper-Only Implementation
The project could actually be done here if all we wanted was raw data to analyze via some stats software or a database. One could simply cat together the part files from HDFS and start crunching.

In this case, the whole job would look like this:
hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-D mapred.map.tasks=100 \
-D mapred.reduce.tasks=0 \
-D mapred.job.name="Stress Test - Mapper Only" \
-D mapred.speculative.execution=false \
-input "/mnt/hadoop/urls.txt" \
-output "/mnt/hadoop/stress_output" \
-mapper "$MY_APP_PATH/samples/stress/get_urls.rb 100"

and then when it finishes:
hadoop dfs -cat /mnt/hadoop/stress_output/part* > my_combined_data.txt

Reducer
In our case, I wanted to use the reducer to compute the stats as part of the job. In Hadoop streaming, a reducer can expect to receive lines through stdin, sorted by key and that the same key will not find its way to multiple reducers. This eliminates the requirement that the reducer code track state for more than one key at a time — it can simply do whatever it does to values associated with a key (e.g. aggregate) and move on when a new key arrives. This is a good time to mention the aggregate package which could be used as the reducer to accumulate stats. In our case, I wanted to control my mapper output as well as retain the flexibility to not run a reducer altogether and just get raw data.

The streaming job command looks like this:
hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-D mapred.map.tasks=100 \
-D mapred.reduce.tasks=8 \
-D mapred.job.name="Stress Test - Full" \
-D mapred.speculative.execution=false \
-input "/mnt/hadoop/urls.txt" \
-output "/mnt/hadoop/stress_output" \
-mapper "$MY_APP_PATH/samples/stress/get_urls.rb 100" \
-reducer “$MY_APP_PATH/samples/stress/stats_aggregator.rb --reducer”


For each key (URL) and value (elapsed time), the following variables get updated:
• sum – total time elapsed for all requests
• min – fastest response
• max – slowest response
• count – number of requests processed

key,val = l.chomp.split("\t",2)
if last_key.nil? || last_key!=key
write_stats(curr_rec) unless last_key.nil?
curr_rec = STATS_TEMPLATE.dup.update(:key=>key)
last_key=key
end

if val && val!=''
val=val.to_f
curr_rec[:sum]+=val
curr_rec[:count]+=1
curr_rec[:min]=val if curr_rec[:min].nil? || val curr_rec[:max]=val if curr_rec[:max].nil? || val>curr_rec[:max]
else
curr_rec[:errors]+=1
end


Finally, as we flush, we compute the overall average for the key.
def write_stats(stats_hash)
stats_hash[:average]=stats_hash[:sum]/stats_hash[:count] if stats_hash[:count]>0
puts stats_hash.values_at(*STATS_TEMPLATE.keys).join("\t")
end


Final Stats (optional)
When the job completes, it produces as many part files as there are total reducers. Before this data can be loaded into, say, a spreadsheet, it needs to be merged and converted into a friendly format. A few more lines of code get us a csv file that can easily be dropped into your favorite spreadsheet/charting software:
hadoop dfs -cat /mnt/hadoop/stress_output/part* | $MY_APP_PATH/samples/stress/stats_aggregator.rb --csv > final_stats.csv

Our CSV converter looks like this:
class CSVConverter
def print_stats
puts STATS_TEMPLATE.keys.to_csv
STDIN.each_line do |l|
puts l.chomp.split("\t").to_csv
end
end
end


Source Code
The mapper, get_urls.rb:
#!/usr/bin/env ruby1.9
require 'rubygems'
require 'curb'

class MultiCurler
DEFAULT_CONCURRENT_REQUESTS = 100
def initialize(opts={})
@concurrent_requests = opts[:concurrent_requests] || DEFAULT_CONCURRENT_REQUESTS
@curler = Curl::Multi.new
@response_buffer=[]
end
def start
while !STDIN.eof?
STDIN.take(@concurrent_requests).each do |url|
add_url(url.chomp)
end
run
end
end

private

def run
@curler.perform do
flush_buffer!
STDIN.take(@concurrent_requests-@curler.requests.size).each do |url|
add_url(url.chomp)
end
end
flush_buffer!
end

def flush_buffer!
while output = @response_buffer.pop
puts output.join("\t")
end
end

def add_url(u)

#skip really obvious input errors
return log_error(u,'missing url') if u.nil?
return log_error(u,'invalid url') unless u=~/^http:\/\//i

c = Curl::Easy.new(u) do|curl|
start_time = Time.now
curl.follow_location = true
curl.enable_cookies=true
curl.on_success do
if errorlike_content?(curl.body_str)
log_error(curl.url,'errorlike content')
else
@response_buffer<<[curl.url,Time.now-start_time]
end
end
curl.on_failure do |curl_obj,args|
error_type = args.first.to_s if args.is_a?(Array)
log_error(curl.url,error_type)
end
end
@curler.add(c)
end

def errorlike_content?(page_body)
page_body.nil? || page_body=='' || page_body=~/(unexpected error|something went wrong|Api::Error)/i
end

def log_error(url,error_type)
@response_buffer<<[url,nil]
warn "reporter:counter:error,#{error_type||'unknown'},1"
end

end


concurrent_requests = ARGV.first ? ARGV.first.to_i : nil

runner=MultiCurler.new(:concurrent_requests=>concurrent_requests)
runner.start


The reducer and postprocessing script, stats_aggregator.rb:
#!/usr/bin/env ruby1.9
require 'rubygems'
require 'csv'

module Stats
STATS_TEMPLATE={:key=>nil,:sum=>0,:average=>nil,:max=>nil,:min=>nil,:errors=>0,:count=>0}

class Reducer
def run

last_key=nil
curr_rec=nil

STDIN.each_line do |l|
key,val = l.chomp.split("\t",2)
if last_key.nil? || last_key!=key
write_stats(curr_rec) unless last_key.nil?
curr_rec = STATS_TEMPLATE.dup.update(:key=>key)
last_key=key
end

if val && val!=''
val=val.to_f
curr_rec[:sum]+=val
curr_rec[:count]+=1
curr_rec[:min]=val if curr_rec[:min].nil? || val curr_rec[:max]=val if curr_rec[:max].nil? || val>curr_rec[:max]
else
curr_rec[:errors]+=1
end
end
write_stats(curr_rec) if curr_rec
end

private

def write_stats(stats_hash)
stats_hash[:average]=stats_hash[:sum]/stats_hash[:count] if stats_hash[:count]>0
puts stats_hash.values_at(*STATS_TEMPLATE.keys).join("\t")
end
end


class CSVConverter
def print_stats
puts STATS_TEMPLATE.keys.to_csv
STDIN.each_line do |l|
puts l.chomp.split("\t").to_csv
end
end
end


end


mode = ARGV.shift

case mode
when '--reducer' #hadoop mode
Stats::Reducer.new.run
when '--csv' #csv converter; run with: hadoop dfs -cat /mnt/hadoop/stress_output/part* | stats_aggregator.rb --csv
Stats::CSVConverter.new.print_stats
else
abort 'Invalid mode specified for stats aggregator. Valid options are --reducer, --csv'
end


Reckoning (Shame Computation)
In a moment of desperation, we used Hadoop to solve a problem for which it is very tenuously appropriate, but it actually turned out great. I wrote very little code and it worked with almost no iteration and just a couple of up-front hours invested for an easily repeatable process. Our last minute stress test exposed a few issues that we were able to quickly correct and resulted in a smooth launch. All this, and it only cost us about $10 of EC2 time.

Hadoop Streaming is a powerful tool that every Hadoop shop, even the pure Java shops, should consider part of their toolbox. Lots of big data jobs are actually simple except for scale, so your local python, ruby, bash, perl, or whatever coder along with some EC2 dollars can give you access to some pretty powerful stuff. Sphere: Related Content

No hay comentarios: