Feed on
Posts
Comments

I’m sure you’ve heard by now, Firefox 4 is officially released.  The Metrics team has done our part by working with webdev to release a new real-time download visualization:

World map visualizing real-time Firefox 4 downloads

http://glow.mozilla.org/

 

The basic backend flow is like this:

  1. The various load balancing clusters that host download.mozilla.org are configured to log download requests to a remote syslog server.
  2. The remote server is running rsyslog and has a config that specifically filters those remote syslog events into a dedicated file that rolls over hourly
  3. SQLStream is installed on that server and it is tailing those log files as they appear.
  4. The SQLStream pipeline does the following for each request:
    1. filtering out anything other than valid download requests
    2. uses MaxMind GeoIP to get a geographic location from the IP address
    3. uses a streaming group by to aggregate the number of downloads by product, location, and timestamp
    4. every 10 seconds, sends a stream of counter increments to HBase for the timestamp row with the column qualifiers being each distinct location that had downloads in that time interval
  5. The glow backend is a python app that pulls the data out of HBase using the Python Thrift interface and writes a file containing a JSON representation of the data every minute.
  6. That JSON file can be cached on the front-end forever since each minute of data has a distinct filename
  7. The glow website pulls down that data and plays back the downloads or allows you to browse the geographic totals in the arc chart view

Some links for people interested in the code:

 

The Background

During my work as metrics liaison with the Firefox Input team, an exciting requirement has come up: scalable online clustering of the millions of feedback items that the users of Firefox share with us.

When designing a service at the metrics team, besides functional requirements (accept text messages, produce clusters) we consider scalability and durability. In fact, scalability concerns play a major role in wanting to replace the current solution (which has done a fine job so far) and not picking another powerful existing tool: We expect the influx of messages (already heading towards 2 million) to increase up to 50x once Firefox 4 is released.

On to Architecture

There is a slide outlining what the system (called Grouperfish) is planned to look like. As this service is to be developed quickly and in iterations, even major parts of the system might be replaced in the future though. This is the rationale for our first version, to be released sometime around the Firefox 4 release:

Concurrency

We want to be able to handle tens of thousands of GET’s and thousands of POST’s per second, provided we have enough commodity hardware at our disposal.

To accept incoming documents and queue them for clustering, Node.JS fits the bill. Its event-based concurrency model dominates thread- and process-based designs in IO-bound tasks such as this. Also, depending on the storage you pick, requests might pause to wait on garbage collection or to rewrite store files. Node can handle a lot of waiting requests because it does not use system level threads (or even processes) for concurrency.

Storage

Grouperfish must store millions of documents in hundreds of thousands of collections. The generated clusters may reference thousands of documents each, each ranging from a few bytes to about a megabyte. Also, we want to store processing data for clustering.

When planning for more data than fits into your collective RAM, you usually have two options (SQL not being one of them since RAM has become pretty big):

Dynamo-style key/value stores like Riak and Cassandra allow to store replicated values with high write rates, and also to quickly retrieve individual items from disk. You do not need to worry about one machine getting too much attention (e.g. when one of your services gets slashdotted), thanks to consistent hashing. Riak even has a notion of buckets, keys and values: We would intuitively use buckets for collections of documents (and of clusters), and values for individual documents (and clusters). No wonder we looked at this more closely. Unfortunately though, Riak’s buckets are more of a namespacing device than anything else. It is expensive to get all elements of a bucket, since they are neither indexed by a common key nor stored together on disk. The Riak design can be a bit misleading in this regard, as buckets are in fact spread throughout the key space. To retrieve all keys in a bucket, Riak will check every single key — possibly scanning gigabytes of main memory (for the very recent Riak search to help, you’d need to blow up your values quite a bit). And you still only have the keys. To get possibly millions of associated values, you need to move your little disk heads a lot. This is not always as bad as it sounds because Riak gives you streaming access to the data as it comes in. But in general, the smaller your buckets in relation to the entire key space, the higher the cost of retrieving many of them.

The other major contender are column-oriented data stores of the BigTable family, the most prominent of which is Apache HBase (the aforementioned Cassandra is actually somewhat in-between, having properties from both worlds). The two main differences for users of HBase vs. Dynamo style stores as far as we are concerned: 1. Data is stored per column family: to retrieve the vector representations of a million documents, we do not have to scan through a million document texts. 2. Records are sorted by key, much like in a traditional database (but optimized for fast inserts, using LSM trees). This is a blessing and a curse. A blessing, because we can scan over contiguous collections of documents. A curse, because we are vulnerable to hotspotting on popular collections. To counter this, we need to make sure that there are random parts in our row keys, e.g. using UUID’s. Because HBase divides tables into regions as they grow and hands them off to other nodes, this method avoids hotspots. And we do not lose the streaming advantage as long as we use common prefixes per collection.

Given our access patterns (insert documents, update clusters, re-process entire collections, fetch lists of clusters), efficient sequential access to selected parts of the data is very important. Sorted, column oriented storage seems to be the way to go. There are other pros and cons (single point of failure, write throughput, hardware requirements), but if we don’t cater to our use case, those won’t ever matter.

Clustering

Grouperfish must be able to handle small numbers of large corpora (millions of documents), as well as large numbers of small corpora (millions of collections). The generated clusters may contain thousands of messages each.

This is practically a no-brainer: Apache Mahout supports in-memory operation (for smaller clusters) as well as distributed clustering (using Apache hadoop, for larger clusters). Mahout can update existing clusters with new documents and generate labels for our clusters. Of course, Mahout is a java-library, so we need to run it within a JVM. To simplify management and introspection, we will run our clustering workers in jetty web containers.

Scheduling

We need to be able to add workers to increase clustering frequency. When there are more new messages than can be clustered right away, we want them to be queued. Also, we have Node.JS and we have Java/Mahout. We want our queue to bridge the gap.

Messaging has become a big topic as systems have become larger and more distributed. We want to use messages to decouple write requests from processing them. There is a very elegant solution to maintain queues, offered by the in-memory data store Redis. Redis is somewhat like a developers dream of shared memory. No encoding and decoding of lists, maps and values as they enter and leave the stores — just operate on your data structures within shared memory. Unfortunately, Redis queues are really just a linked list with a blocking POP operation. While that is very nice, we want to track and resubmit failed tasks when a worker node falls victim to rampaging rodents.

The considerations of choosing RabbitMQ to realize a task queue are worth an article of their own. Suffice to say, it has Node- and Java-bindings, and it supports message Acknowledgement from workers. We still want to use Redis to keep track of collection size, to cache the actual incoming data (no need to ask hbase if we use it right away), and for locking, so that every collection is only modified by one worker at a time. We also might use it to cache frequently requested clusters.

More Thoughts

Selecting these components, I learned that it is important to choose technologies in an unbiased fashion, and to reconsider decisions when a technology has no answer for your requirement. For example, I originally wanted to use just Riak for storage — I like its simplicity and style, and the bucket metaphor — but the enumeration of large buckets would be too slow for an online system. It might be fine for a batch-only system, or a system that just does not operate on collections of varying size as much.

For a Message queue, ØMQ sounded awesome, offering low latency and powerful constructs, but I quickly realized that it is not really what I understand a message queue to be, but rather a very smart abstraction over traditional sockets. Probably someone will eventually build a distributed task queue on top of it though.

We recently had a situation where we needed to copy a lot of HBase data while migrating from our old datacenter to our new one. The old cluster was running Cloudera’s CDH2 with HBase 0.20.6 and the new one is running CDH3b3. Usually I would use Hadoop’s distcp utility for such a job. As it turned out we were unable to use distcp while HBase was still running on the source cluster. Part of the reason for this is that the HFTP will throw XML errors due to HBase modifying files (particularly the case if HBase removes a directory). And to transfer our entire dataset at the time was going to take well over a day. This presented a serious problem because we couldn’t accept that kind of downtime. We were also about 75% full in the source cluster so doing HBase export was out as well. Thus I created a utility called Backup.

Backup is designed to essentially do the same work as distcp with a few differences. The first being that Backup would be designed move beyond failures. Since we’re still running HBase on the source cluster we can actually expect quite a few failures as a matter of fact. So inside Backup’s MapReduce job will by design catch generic exceptions. This is probably a bit over-zealous, but I really needed it not to fail no matter what. Especially after a few hours in.

One of the other differences is that I designed Backup to always use relative paths. It does this by generating a common path between the source and destination via regular expression. Distcp on the other hand will do some really interesting things depending on what options you’ve enabled.  If you use the -f flag for providing a file list, it will take all the files and write them directly to the target directory, rather than putting them in their respective sub-directories based on the source path. If you run with the -update flag it seems to put the source directory inside the destination rather than realizing that I want these two directories to look the same.

The last major difference is that Backup is designed to run in update mode always.  This was found because our network connection could only push about 200 MB/s between datacenters. We later found that a firewall was the bottleneck, but we didn’t want to drop our pants to the world either. Distcp would take hours just to stat and compare the files. For context we had something on the order of 300K-400K files we were looking to transfer. This is because distcp currently does this in a single-thread before it runs its MapReduce job. This actually makes sense when considering that distcp is only a single MapReduce job and it wants to distribute the copy evenly. Since we needed to minimize downtime, the first thing I did was distribute the file stat comparisons. In exchange we currently take a hit on not being able to evenly distribute the copy work. Backup uses a hack to attempt to get better distribution, but it’s nowhere near ideal. Currently it looks at the top-level directories just under the main source directory.  It then splits that list of directories into mapred.map.tasks number of files. Since the data is small (remember this is paths and not the actual data) you’re pretty much guaranteed MapReduce will take your suggestion for once. This splits up the copy pretty well especially for the first run. On subsequent runs however you’ll get bottlenecked by a few nodes doing all the work. You can always up the mapred.map.tasks even higher, but really I need to split it out into two MapReduce jobs. I also added a -f flag so that we could specify file lists. I’ll explain later on why this was really useful for us.

So back to our situation. I ran the first Backup job while HBase was running. This copied the bulk of our 28 TB dataset obviously with a bunch of a failures because HBase had deleted some directories. Now that we had most of the data we could do subsequent Backup’s within a smaller time window. We ingest about 300 GB/day so our skinny pipe between datacenters was able to make subsequent transfers in hours and not days. During scheduled downtime we would shutdown the source HBase. Then we copied the data to a secondary cluster in the new datacenter. As soon as the transfer was finished we would verify the source and destination matched. If so then we were all good to start up the source cluster again and resume normal production operation. Meanwhile we would copy the data from the secondary cluster to the new production cluster. The reason for doing this was because HBase 0.89+ would change the region directories, and we also needed to allow Socorro web developers to do their testing. So having the two separate clusters was a real blessing. It allowed us to keep a pristine backup at all times on secondary while testing against the new production cluster. So we did this a number of times the week before launch. Always trying to keep everything as up to date as we could before we threw the switch to cut over.

It was during this last week I added the -f flag which allowed giving Backup a source file list. We would run “hadoop fs -lsr /hbase” on both the source and the destination cluster. I wrote a simple python utility (lsr_diff) to compare these two files and figure out what needed to be copied and what needed to be deleted. The files to copy could be given to the Backup job while the deletes could be handled with a short shell script (Backup doesn’t have delete functionality). The process looked something like this:


RUN ON SOURCE CLUSTER:
hadoop fs -lsr /hbase > source_hbase.txt
RUN ON TARGET CLUSTER:
hadoop fs -lsr /hbase > target_hbase.txt
scp source_host:./source_hbase.txt .
python lsr_diff.py source_hbase.txt target_hbase.txt
sort copy-paths.txt -o copy-paths.sorted
sudo -u hdfs hadoop fs -put copy-paths.sorted copy-paths.sorted
nohup sudo -u hdfs hadoop jar akela-job.jar com.mozilla.hadoop.Backup -Dmapred.map.tasks=112 -f hdfs://target_host:8020/user/hdfs/copy-paths.sorted hftp://source_host:50070/hbase hdfs://target_host:8020/hbase

The number of map tasks I refined over time, but I started the initial run with (# of hosts * # of map task slots). On subsequent runs I ended up doubling that number. After the backup job completed each time we would run “hadoop fs -lsr” and diff again to make sure that everything copied over. I saw a lot of times that wasn’t the case when the source was HFTP from one datacenter to another. However when copying files from an HDFS source within our new datacenter I never saw an issue with copying.

Due to other issues (there always are right?) we had a pretty tight timeline and this system was pretty hacked together, but it worked for us. In the future I would love to see some modifications made to distcp. Here’s my wishlist based on our experiences:

1.) Distribute the file stat comparisons and then run a second MapReduce job to do the actual copying.
2.) Do proper relative path copies.
3.) Distribute deletes too.

To be honest though I found the existing distcp code a bit overly complex otherwise I might have made the modifications myself. Perhaps the best thing is that someone take a crack at a fresh rewrite of distcp altogether. I would love to hear people’s feedback.

Few months ago, at Hadoop World 2010, the metrics team gave a talk on Flume + Hive integration and how we plan to integrate it with other projects. As we were nearing production date, the BuildBot/TinderBox team came with an interesting, albeit pragmatic requirement. “Flume + Hive really solves our needs, but we would ideally like a solution that indexes our data and can be queried in real-time“.

Flume does get us data in real-time from point A to point B, but once inside HDFS, Hive isn’t suited for low-latency queries. One viable option was to use HBase (metrics team already uses it for Socorro – https://wiki.mozilla.org/Socorro:HBase) and then integrate it with a Hive-HBase plugin for SQLish queries. Possible in theory, the process is bit tedious to support faceted queries, wild-card searches etc. inside HBase. Solr-Cloud was the next obvious choice, however, Daniel Einspanjer (@deinspanjer) had other ideas when he suggested ElasticSearch (http://www.elasticsearch.com). Frankly, I hadn’t heard of ElasticSearch before and SOLR is synonymous when it comes to building an indexing solution, we had our doubts. Now, Solr-Cloud and ElasticSearch use the same underlying technology, Lucene, for indexing data, support REST API querying, can handle millions of documents and are a cloud solution. We had few days to experiment and decided to spin few machines for ElasticSearch, and it turned out to be a great investment!

New technologies are really cool to implement, but once you past the initial honeymoon phase, complexities start surfacing up and one either enters the “cul-de-sac” mode to fall-back on the more traditional methods. Determined to not go through this ordeal, we decided to stress test the ElasticSearch cluster to atleast 5x of expected load.

Integrating with Flume + ElasticSearch + Hive

The Flume-Hive patch (FLUME-74) had given us enough insights in writing Flume plugins, so we decided to extend it and support ElasticSearch. ElasticSearch has a nice Java plugin written by @tallpsmith. While it would solve the purpose, Flume and ES needed to be on same network to be discovered via multicast and it relied on certain ES Java API’s. If you are building an asynchronous systems, better interface them with a web-based API than traditional Java APIs, and ElasticSearch has a real nice REST API.

While ES provides all functionalities of a traditional search engine, we wanted to maintain the ability to run complex queries via Hive. This meant, forking each log event to two different locations, HDFS (eventually Hive) and ElasticSearch. The new sequence of events looked something like this:
BuildBot -> Flume agent -> Flume collector -> REST API for realtime indexing | Write to HDFS and eventually Hive

The CustomDFSSink.Java appends every event to the filestream object as it is received, this turned out to be a perfect place to insert the REST API. The data already exists in the “Event e” object, might as well marshal it to an equivalent JSON for ES.

'hiveElasticSearchCollectorSink("<hdfs_location>","<folder-prefix-for-hdfs>","<log-type-for-hdfs>","<elastic-server-url>","<elasticsearch-index>","<elasticsearch-type>")'

The first 3 arguments to ‘hiveElasticSearchCollectorSink‘ are exactly similar to the original hiveCollectorSink(), the latter 3 are specific to ElasticSearch. On very high level, an ES index is like a database and type is like a table. You can read more about ElasticSearch at http://www.elasticsearch.com/docs/elasticsearch

Failover scenario

Things fail when you least expect them to, especially on a long weekend or when you are in midst of crucial demo. Flume offers recovery from certain fail-over scenarios, however, its recovery system is based understandably on “acks” and follows a specific protocol. While its prudent to use existing code, the current system seemed bit overkill for us and we already had a “marker-folder” failover system in place with Hive. The marker-folder scenario fitted ES quite nicely. On a 10,000 foot level, each event makes a single attempt to POST request to ES, if it fails, writes data to a “marker-folder” with corresponding meta-data. Data inside marker-folder is run at specific intervals by a custom sink that runs the queries inside marker-folder in addition to its mundane Hive and ES tasks.

Problem with dfs.close()

DFS isn’t a traditional file system and while it’s good at a lot of stuff, atomicity isn’t its strength. While stress testing the new setup, EOFException (stack trace here) would intermittently appear in our logs. After spending a couple of days in trying to debug this problem, it suddenly struck us that dfs.close() is void in nature and the call isn’t atomic in nature; meaning its possible that even after a .close() returns, HDFS might still be writing data to disk, usually happens when you are writing large files to disk with gzip compression. As a recap, after every .close(), HiveCollectorSink() used to move each HDFS file to its own partition, thereby causing a race condition between actually finishing the .close() operation and “ALTER TABLE” performing the internal .move() operation to its own partition. One way to fix this problem was not move the file, continue writing to the original HDFS location and use that location as a partition with following HQL “ALTER TABLE CREATE IF NOT EXISTS PARTITION“. The “IF NOT EXISTS” clause allows us to add data to the same hourly partition and not worry about overwriting existing data or creating multiple partitions for a 10-minute data rollover inside Flume.

Need for a Custom Marker Sink

The marker-logic reads each file inside the “marker-folder“, performs its operations and then “deletes” the file. This creates a race-problem when you have multiple sinks, each reading and deleting the file. Duplicity isn’t an issue out here and the system will continue to work except for few innocuous errors related to race condition of multiple sinks trying to delete a file while other sink has already deleted the file. Using a special sink() to perform the “marker-folder” operations helped us get rid of the errors and added the much needed role abstraction.

BuildBot setup

Sample flow listed below:
connect node1.research.hadoop.sjc1.mozilla.com

exec spawn cm-metricsetl02.mozilla.org rawlogs
exec spawn node2-research-collector rawlogscollector
wait 10000
exec config rawlogs ‘rawlogsflow’ ‘exec(“/home/aphadke/buildparsers/installed-logparser/logparser/bin/scrape-gen.sh”,false,true,600000)’ ‘autoDFOChain’
exec config rawlogscollector ‘rawlogsflow’ ‘autoCollectorSource’ ‘HiveCollectorSink(“hdfs://admin1.research.hadoop.sjc1.mozilla.com/flume/%{host}/raw-logs/%Y-%m-%d-%H/”,”logs”,”build_logs”)’

The current architecture consists of four flows, rawlogs, buildlogs, testruns and testfailures; Each flow outputting a specific JSON to STDOUT on the flume agent-node. The agent-node reads STDOUT data making its way to the flume collector-node via DFO sink. DFO sink makes sure that data gets transferred reliably to its intended destination. The four-flows write data to their own directory structure and to their respective Hive and ElasticSearch end-points.

Pitfalls

The “non-ACTIVE” problem: Flume is an awesome product but it isn’t perfect and there are instances wherein certain flows though they appear “ACTIVE” aren’t spewing out any data. One way to circumvent the “non-ACTIVE” problem is monitor ElasticSearch or HDFS for new files every ‘x’ hours. If the setup isn’t working as expected, Flume offers a nice option to “kick-start” the entire configuration:
<flume_location>/bin/flume shell -c <flume-master> -e “exec refreshAll”

The “refreshAll” command internally refreshes all the existing flows, one might notice “ERROR” state for a few flows via web-ui but things should resume to normalcy in about a minute.

Data integrity problem

DFO is reliable, however, its a catch-22 situation in relying on an existing code base and hoping it works as advertised or writing your own checksum stuff. If data integrity is vital, please write your own integrity-checking scripts. We have our own custom scripts that query data from source and confirm its availability inside ES, data is re-inserted at the beginning of pipeline if ES returns zero results.

Next Steps

The entire flume-elasticsearch-Hive code can be downloaded from https://github.com/anuragphadke/Flume-Hive. While we make efforts to keep the code in sync with Flume’s-trunk, there might be some delay in syncing it to the head.

Why isn’t this part of Flume code-base?

Fair question, we are in the process of merging the two together, however, its not possible to have a traditional plugin as the data needs to be forked to two different locations (Hive & ES). Writing code to fork the same data in two different locations is fairly complex and will involve a lot of duplicate code. The current code interferes with the core functionality of CustomDfsSink and few other classes. This isn’t ideal, Hive and ES are plugins and the corresponding code + jars shouldn’t be part of the core Flume codebase, especially for users who don’t need this functionality. That said, we are working closely with Flume community and hope to have an integrated solution sooner than later.

Feel free to let us know your thoughts, brickbats, flowers via comments or email: <first letter of my name followed by my last name> @ <mozilla>-dot-<com>

Find this cool and cutting edge? There’s ton more happening under the hood, We are actively hiring and would love to have you talk to us. Please visit http://www.mozilla.com/en-US/about/careers.html and help us make the web a better place.

Credits

BuildBot is an extremely complex project and it wouldn’t have been possible with the dedicated efforts of the following members and open-source community:
Daniel Eisenpanjer, Xavier Stevens, Anurag Phadke (metrics team)
Jonathan Griffin, Jeff Hammel, Clint Talbert (Automation Tools team)
Jonathan Hsieh, Eric Sammer, Patrick Angels (Flume/cloudera)
Shay Bannon (Elastic Search)

As documented in THRIFT-601, sending random data to Thrift can cause it to leak memory.
At Mozilla, we use a web load balancer to distribute traffic to our Thrift machines, and the default liveness check it uses is a simple TCP connect. We also had Nagios performing TCP connect checks on these nodes for general alerting.

All these connects were causing the Thrift servers to start generating OOM errors sometimes as quickly as a few days after being started.

I wrote a test utility that performs a legitimate Thrift API call (it actually tries to get the schema of the .META. table) and returns a success if it can execute the call.

The utility can either run from the command line, or it can use the lightweight HTTP server class that is part of the Sun JRE 6 and it will listen for a request to /thrift/health and report back the status.


$ java -jar HbaseThriftTester.jar
usage: HbaseThriftTester [-timeout <ms>] <mode> <host:port>...
-check Immediately checks the following host:port
combinations and returns a summary message with an
exit value of the number of failures.
-listen <port> Run as an HTTP daemon listening on port. Checks the
hosts every time /thrift/health URL is requested.
-timeout <seconds> Number of seconds to wait for Thrift call to
complete

The app is bundled up using one-jar so it is simple and easy to call from within a Nagios script or some-such. Maybe it will be useful to someone else. Just pull down the project then build with ant.

HBase Thrift Tester project

Introduction: Using A Riak Cluster for the Mozilla Test Pilot Project

As part of integrating Test Pilot into the Firefox 4.0 beta, we needed a production-worthy back-end for storing the experiment results and performing analysis on them. As discussed in the previous blog post, Riak and Cassandra and Hbase, oh my!, we decided on Riak as that back-end.

Some of the preliminary work and a lot of the initial implementation involved conducting benchmarking studies that would verify the fitness of the solution and give us a solid understanding of when and how we would need to scale. Mozilla worked with Basho (the stewards of the Riak project) to perform this benchmarking, and this blog post details the results.

Continue Reading »

Exponential growth, one of the few problems every organization loves, is usually alleviated by scaling out using clustered computing (Hadoop), CDN, EC2 and myriad of other solutions. While a lot of cycles are spent in making sure each scaled out machine contains requisite libraries, latest code deployments, matching configs, and the whole nine yards, very little time is spent in collecting the log files + data from these machines and analyzing them.

Few reasons why log collection is usually at tail of priorities:

  1. Nagios alerts usually do a good job of monitoring for critical situations. The scripts make sure the app’s always online by grep’ing for “ERROR, WARN” and other magic terms in logs, but what about errors that occur often but don’t bring down the app completely?
  2. Web-analytics give us all information we need. -Yes on a macroscopic view, but it’s really hard for an analytical software to provide fine granularity, such as how many hits did we receive pertaining to a given country for a given page for a given time-period?
  3. Ganglia graphs help us find out what machine/s are under heavy load – Absolutely, but trying to figure what triggered the load in first place is not always easy.

Chukwa, Scribe and Flume are headed in the right direction, but the final piece of puzzle of analyzing the data still remained unsolved, until few weeks back as we, at Mozilla, started integrating Flume with Hive.

Merge everything - Image courtesy Wikipedia.org

Flume is an open-source distributed log collection software that can be installed on multiple machines for monitoring log files with data slurped to a single HDFS location. The out of box solution only solved part of our problem of collecting data, but we needed a way to query it and thereby make intelligent decisions based on the results.

The teams first foray was to add a gzip patch that compressed the log data before transferring the files to HDFS. Once the data was transferred, we needed a way to query it. Our current production Hadoop cluster consists of modest 20 machines, has excellent monitoring in terms of nagios and ganglia, but the question of what might we be missing always lingered on our heads. A list of basic things needed to be taken care of while integrating Flume with Hive was created:

  1. How do we handle fail-overs when the HIVE metastore service, a possible single point of failure for HIVE goes down?
  2. How to query data by the day and hour.
  3. Can separate tables be used for different log locations?
  4. Can we split a single log line in its respective columns?

1. Handling fail-overs: We are currently running HIVE metastore in remote mode using MySQL. More information on metastore setup can be found at http://wiki.apache.org/hadoop/Hive/AdminManual/MetastoreAdmin. Flume node-agents reliable handle fail-overs by maintaining checksums at regular intervals and making sure data isn’t inserted twice. The same principle was extended by adding marker points. i.e. a file containing HQL query and the location of data will be written to HDFS after every successful FLUME roll-over. Flume agents would look at a common location for pending HIVE writes before writing any log data to HDFS, attempt to move the data inside HIVE and only delete the marker file if successful. In situations where two or more flume agents attempt to move files to HIVE partition, one of them will encounter an innocuous HDFS file not found error and proceed as usual.

2. Appending to sub-partitions: Flume supports rollover where data is written to disk every ‘x’ millis. This is particularly useful as data is available inside HDFS at regular intervals and can be queried by the hour or minute granularity. While whole data can be written to a single partition, partitioning data inside HIVE is a huge performance benefit as it only siphons through a specific range rather than whole data set. This was achieved by having two partitions for a table – by date and hour. An equivalent HIVE query looks something like:

LOAD DATA INPATH ‘” + dstPath + “‘ INTO TABLE ” + hiveTableName + ” PARTITION (ds='” + dateFormatDay.format(cal.getTime()) +
“‘, ts='” + dateFormatHourMinute.format(cal.getTime()));

3. Using separate tables for different log locations: We wanted to use separate tables for Hadoop and HBase log locations. Our initial approach was to add a config setting in flume-site.xml, but half way down that road we realized that config is wrong place, as it needs to exist on each node-agent and mapping different folders to tables will be a logistical nightmare.

A new sink named hiveCollectorSink(hdfs_path, prefix, table_name) was added to the existing family (http://archive.cloudera.com/cdh/3/flume/UserGuide.html#_output_bucketing). This allowed us to add hive tables on the fly for each log folder location, thereby giving a separate placeholder for Hadoop and Hbase logs.

4. Splitting a single log line in respective columns (a.k.a. regex): Log4J is a standard log file convention used by quite a few applications including Hadoop and HBase. A sample line looks something like this:

2010-08-15 12:36:59,850 INFO org.apache.hadoop.hdfs.server.datanode.DataBlockScanner: Verification succeeded for blk_-1857716372571578738_336272

Given the above structure, we decided to split the line in 5 columns:
date, time, message_type, class_name and message; the table definition given below -

CREATE TABLE cluster_logs (
line_date STRING,
line_time STRING,
message_type STRING,
classname STRING,
message STRING
)
PARTITIONED BY (ds STRING, ts STRING, hn STRING)

ROW FORMAT SERDE ‘org.apache.hadoop.hive.contrib.serde2.RegexSerDe’
WITH SERDEPROPERTIES (
“input.regex” =
“^(?>(\\d{4}(?>-\\d{2}){2})\\s((?>\\d{2}[:,]){3}\\d{3})\\s([A-Z]+)\\s([^:]+):\\s)?(.*)”
)
STORED AS TEXTFILE;

NOTE: The “hn” (hostname) partition was added so we could query the data based on individual hostnames, enabling us to know what hostname has biggest chunk of ERROR, WARN messages.

The above framework has allowed us to reliably collect logs from our entire cluster to a single location and then query the data from a SQLish interface.

Future Steps:

  • Flume + Hive patch is still a work in progress and will be committed to the trunk in a couple of weeks.
  • Socorro (Mozilla’s crash reporting system) 1.9 and above will be using processors in a distributed mode and we plan to insert the processor’s log data inside HIVE thereby helping us better understand throughput, avg. time to process each crash-data and other metrics. Watch this space for more related posts.

The developers of Flume + Hive usually hang on IRC (irc.freenode.net) in the following channels: #flume, #hive, #hbase
Feel free to ask questions/thoughts/suggestions and I will reply to them below.

-Anurag Phadke (email: first letter of my firstname followed by last name at – the – rate  mozilla dot com)

Socorro: Mozilla’s Crash Reporting System

Laura just posted this fantastic article on the Mozilla WebDev blog talking about the past and future for Socorro.  It covers all the points that I was wanting to blog about here regarding what our integration of HBase brings to the table for Socorro.  Please, if you haven’t yet read that post, click the link.  I’ll wait…

Okay.  So now you know why I’m so excited about HBase.  We are going to be storing a non-trivial amount of information with it.  We are going to be doing some really interesting Map/Reduce work both as part of minidump stackwalk processing and as an end-user oriented ad-hoc query mechanism.  Socorro 2.0 should be delivering a brave new world of features to the developers tasked with using crash reports to hunt and squash bugs in Firefox 4.

Pentaho announced this morning that they were going to be adding some features to Pentaho Data Integration (Kettle) and to their BI suite to make it easy for people to use Kettle to retrieve, manipulate, and store data in Hadoop, and to integrate Hadoop communication into the reporting and analysis layer.

They posted a nice five minute screencast on their Hadoop landing page demonstrating a couple of pieces of Hive integration.  In it, they retrieve data using Hive, and they also use a Hive user defined function that is implemented as an embedded Kettle transformation.

I’m very excited to see this announcement.  Besides the significant work we’ve been doing on the Metrics team to integrate HBase into the Socorro project, we also have major plans for our Hadoop clusters for general data storage and processing.

Right now, we have Kettle jobs and transformations that manipulate gigabytes of data per hour, loading it into our data warehouse.  One of the things I love about Kettle is the ability to quickly and easily define, review, and extend complex jobs such as our end-of-day data aggregation:

In the future, as we have more data stored in Hadoop, I want to be able to run transformations on that data.  Sometimes, if the transformations involve lots of RDBMS work, I’ll want to be streaming the data out of HDFS.  For other types of transformations that involve mostly business logic and text transformations, being able to run that code directly in a Hadoop Map Reduce job will be a fantastic feature.

My personal feeling is that people in the Hadoop community really need something visual and flexible like the Kettle interface for defining and manipulating this type of business logic.  Great strides have been made with projects such as Cascading, but it is still raw code, and I feel that excludes a lot of people who could be getting work done faster and better if they had a good tool to help them adapt to the world of Map Reduce.

Currently, someone can start up Kettle’s GUI and start constructing jobs and transformations simply by piecing together steps of work such as reading a set of text files, performing a regex on them, doing some value lookups, then aggregating the data.  If they could then save that transformation and execute it as a Hadoop Map Reduce job, I think it will be revolutionary for both worlds of ETL and Hadoop.

When Mozilla Metrics starts tackling some of the Hadoop data processing jobs that we have scheduled, we’ll be making significant open source contributions to both communities to realize this vision, and I really hope that it will help widen the accessibility of Hadoop to a new group of potential users.

We are marching along in our integration of HBase with the Socorro Crash Stats project, but I wanted to take a minute away from that to talk about a separate project the Metrics team has also been involved with.

Mozilla Labs Test Pilot is a project to experiment and analyze data from real world Firefox users to discover quantifiable ways to improve our user experience.  I was very interested and excited about the project because of the care they take to protect the user’s privacy.  They have a very user focused privacy policy that is easy to read, which always makes me happy.  Every step of the way they make sure the user is aware and comfortable with the data they are sending by making it easy for the user to see their data before they submit it and providing the user the choice to submit it or not when the data is ready. The data is always very general in nature, not containing any sensitive information like URLs and it is not associated with any personally identifying information at any time.

In the pre 1.0 releases of Test Pilot, the data that is submitted from the add-on is received by a simple script transforms the data into a flat file that is stored on an NFS server.

We are planning on making a huge drive to ramp up the volume of users and the number of experiments, and that means that this simple storage mechanism will not survive.  Here are some of the most important requirements we’ve hashed out in our planning:

  • Expected minimum users: 1 million.  Design to accommodate 10 million by the end of the year and have a plan for scaling out to tens of millions. (This is the 1x 10x 100x rule of estimation of which I am a fan)
  • Expected amount of data stored per experiment: 1.2 TB
  • Expected peak traffic: approximately 75 GB per hour for two 8 hour periods following the conclusion of an experiment window.  This two day period will result in collection of approximately 90% of the total data.
  • Remain highly available under load
  • Provide necessary validation and security constraints to prevent bad data from polluting the experiment or damaging the application
  • Provide a flexible and easy-to-use way for data analysts to explore the data.  While all of these guys are great with statistics and thinking about data, not all of them have a programming background, so higher-level APIs are a plus.
  • Do it fast.

I am a technology nut.  I love to research technologies to keep abreast of the state-of-the-art and also potential tools.  While I’ve always been a SQL aficionado, I am also a big fan of the “NoSQL” technologies because I feel there is a great role that they serve.

When I looked at the characteristics of this project, I felt that a key-value or column-store solution was the best fit, so I started digging through my research bookmarks and doing some technology cost/benefit analysis.

Eventually, our team came down to three primary contenders:

We recently had a meeting wherein we hashed out a lot of the pros and cons of each of these solutions.  I wanted to share that discussion with everyone, not because I was looking forward to being set-upon by the two contenders that I didn’t feel were the best fit, but rather for two reasons:

  1. Crowd-sourcing — I believe that laying out the thoughts and assumptions in the open is the best way to ensure that we receive the broadest set of feedback from the experts in each of the varying technologies.  I further believe that it is better to be aware of the over-looked features and warnings raised by these experts and consider what can be done to mitigate them rather than hiding from them.
  2. Sharing of knowledge — Even if it turns out that we didn’t get all the answers right or that we didn’t come up with the ideal solution, I believe that we asked a lot of good questions here and I believe that listing these questions might help some other team who has to make a similar decision.

So let’s get down to the discussion points:

  • Scalability — Deliver a solution that can handle the expected starting load and that can easily scale out as that load goes up.
  • Elasticity — Because the peak traffic periods are relatively short and the non-peak hours are almost idle, it is important to consider ways to ensure the allocated hardware is not sitting idle, and that you aren’t starved for resources during the peak traffic periods.
  • Reliability — Stability and high availability is important.  It isn’t as critical as it might be in certain other projects, but if we were down for several hours during the peak traffic period, the client layer needs to be able to retain the data and resubmit at a later date.
  • Storage — Need enough room to store active experiments and also recent experiments that are being analyzed.  It is expected that data will become stale over time and can be archived off of the active cluster.
  • Analysis — What do we have to put together to provide a friendly system to the analysts?
  • Cost — Actual cost of the additional hardware needed to deploy the initial solution and to scale through at least the end of the year.
  • Manpower — How much time and effort will it take us to deliver the first critical stage of the project and the subsequent stages?  Also consider ongoing maintenance and ownership of the code.
  • Security — Because we will be accepting data from an outside, untrusted source, we need to consider what steps are necessary to ensure the health of the system and the privacy of users.
  • Extensibility — delivering a platform that can readily evolve to meet the future needs of the project and hopefully other projects as well.
  • Disaster Recovery / Migration — If the original system fails to meet the requirements after going live, what options do we have to recover from that situation?  If we decide to switch to another technology, how do we move the data?

Now we iterate those points again, but this time we have the points made by the team regarding each of the three solutions being considered:

  • Elasticity– Machines can be added as load increases.  Machines can be turned off and reconfigured to remove them. There is the ever-present the risk of a bug resulting in a lack of replication or corruption causing data loss.  In all three solutions, re-balancing the existing data incurs an additional load penalty as data is shifted around the cluster.  We need to consider how much time and manual administration is required, how much can be automated, how risky rebalancing is, and how long until we begin to see the benefit of the additional nodes.
    • HBase
      In HBase, the data is split into “regions”.  The backing data files for regions are stored in HDFS and hence replicated out to multiple nodes in the cluster.  Every RegionServer owns a set of regions.  Normally, the RegionServer will own regions that exist on the local HDFS DataNode.
      If you add a new node, HDFS will begin considering that node for the purposes of replication. When a region file is split, HBase will determine which machines should be the owners of the newly split files.  Eventually, the new node will store a reasonable portion of the new and newly split data.
      Re-balancing the data involves both re-balancing HDFS and then ensuring that HBase reasses the ownership of regions.
    • Cassandra
      In Cassandra, nodes claim ranges of data.  By default, when a new machine is added, it will receive half of the largest range of data.  There are configuration options during node start-up to change that behavior.  There are certain configuration requirements to ensure safe and easy balancing, and there is a rebalance command that can perform the work throughout all the data ranges.  There is also a monitoring tool that allows you to track the progress of the re-balancing.
    • Riak
      In Riak, the data is divided into partitions that are distributed among the nodes.  When a node is added, the distribution of partition ownership is changed and both old and new data will immediately begin migrating over to the new data.
  • Cost — Regardless of solution, we should be able to use commodity server hardware with Linux OS.
    • HBase — Because of the heavy peak traffic periods, it is very likely that we would need a dedicated cluster. Otherwise, other projects such as Socorro might be negatively impacted.  Also, a scheduled maintenance window would affect both projects instead of just one.
      HBase is memory-hungry.  Our current nodes are dual quad core hyper-threaded boxes with 4TB of disk and 24 GB of memory.  It is unlikely that we would want to go less than that.  We would need at least two highly available master nodes, and by the end of the year we’d likely need 12 machines for a single cluster solution.
    • Cassandra — Much lighter on the memory requirements, especially if you don’t need to keep a lot of data in cache.  We would likely want to double the amount of CPU on the four nodes currently allocated to the Test Pilot project. We’d also want to order 8 more machines.  To perform analysis with Cassandra, we’ll have to leverage our Hadoop cluster.
    • Riak — Also much lighter on memory requirements.  The existing four nodes (quad core 8 GB) allocated for the project should be enough to kick it off, and we’d expect to add at least two more equivalent machines to that cluster.  We’d also set up a second cluster of 6 to 8 less powerful machines for the analysis cluster.  Because of the elasticity of Riak, we could temporarily re-purpose N-3 of those machines to the write cluster to accommodate expected peak traffic windows.
  • Manpower
    • HBase — Need a front-end layer to accept experiment submissions from the client.  The fewer changes required for the client, the better.  Thrift or a roll-our-own Java are the two most likely options.  The application needs to be heavily tested for capacity and stability.  Likely two weeks for development and two weeks for testing.  Estimate is dependent on the amount of security code, sanity checks, and cluster communication fail-over that has to be implemented.  Additional maintenance burden of supporting a separate service.
      Schema design needs to be reflected in the front-end code to allow data to be parsed out and stored in the proper column families.
    • Cassandra — Mostly the same as HBase. Thrift or Java application hand developed and tested.  Schema design to accommodate storage by the front-end.
    • Riak — Built in REST server.  Already heavily tested and production ready.  Minimal schema design and no specific hooking in of the schema to the REST server should be needed.
  • Security — We can’t expect to hide any sort of handshake protocol or authentication token.  If we wanted to require an authentication token, extensive changes would have to be made to the client add-on which would delay the project.  SSL doesn’t seem to gain us much because we aren’t transmitting potentially sensitive data, and it has overhead penalties.  Our firewall and proxy/load-balancer layer is our most important line of defense.  It should reject URL hacks, unusual payload sizes, and potentially be able to blacklist repeated submissions from the same IP.  Ideally, if the payload inspection could communicate IP addresses or payload signatures to blacklist, we’d be pretty well equipped to prevent degradation of the cluster health.
    • HBase/Cassandra — We would need the custom built front-end layer to be responsible for inspecting the payload to look for invalid/incomplete data and reject it.  This adds to the requirements and implementation time of the custom front-end layer.
    • Riak — We can use Webmachine pre-commit hooks to allow inclusion of business logic to perform payload inspection.
  • Extensibility — When changes are made to the data stored, all three solutions will potentially require modification of the payload inspection routines and potentially the analysis entry-point to reflect the schema changes.
    • HBase — Schema changes involving adding or altering column families require disabling the table. This means a maintenance window.  Creation of new tables can be performed on the fly.
    • Cassandra — Schema changes require a rolling restart of the nodes.
    • Riak — New buckets and schema changes are completely dynamic.
  • Data Migration — All three solutions make it pretty easy to replicate, export, or MapReduce data out of the system.
  • Disaster Recovery  — In all three solutions, it would be best for the client add-on to have enough intelligence to be able to back-off if the cluster load is too high, and to retry submission later if it fails.
    • HBase — Custom front-end could incorporate fail-over code to locally spool submissions until cluster is back online.  A second cluster would be the most viable DR option.
    • Cassandra — Same as HBase
    • Riak — Could temporarily reassign the entire reporting cluster to handle incoming submissions. Because there is no custom front-end, if we were unable to make the Riak cluster available for client connections, we would have no buffer in place on the server side to spool submissions.
  • Reliability — Small periods of downtime should not be a major issue, especially if the client add-on has retry capability and/or if the front-end layer can spool.
    • HBase — Until subsequent versions provide better High Availability options, the Hadoop NameNode and HBase Master are still a single point of failure.  Certain types of administration and upgrades require restart of the entire cluster with a maintenance window required to modify the NameNode or HBase Master.  Rolling restarts are an option for many types of maintenance, but some HBase experts discourage them.
    • Cassandra — No single point of failure. Most configuration changes can be handled via rolling restarts.
    • Riak — Same as Cassandra.
  • Analysis
    • HBase — Can provide a HIVE based interface (possibly with JDBC connectivity).  Can provide a simplified MapReduce framework to allow analysts to submit certain types of common, simple jobs.
    • Cassandra — Uses Hadoop, answer same as HBase.
    • Riak — Map Reduce jobs can be written in JavaScript and submitted through the REST API.  A light-weight web interface can be created to allow submission of those jobs.

Based on the evaluation of these discussion points, and also on the availability of some Basho experts to deliver a nearly turn-key solution, we have decided to go with Riak for the implementation of the Test Pilot back-end.  While it feels a little odd to be using a technology that is similar in many ways to HBase which we are investing heavily in, I think it is the best choice for us and I actually see several areas that we could potentially consider using Riak for other projects.

If you have any questions, concerns, or clarifications, please feel free to submit them as comments and I will respond or update the post where applicable.

Next »