Feed on

Few months ago, at Hadoop World 2010, the metrics team gave a talk on Flume + Hive integration and how we plan to integrate it with other projects. As we were nearing production date, the BuildBot/TinderBox team came with an interesting, albeit pragmatic requirement. “Flume + Hive really solves our needs, but we would ideally like a solution that indexes our data and can be queried in real-time“.

Flume does get us data in real-time from point A to point B, but once inside HDFS, Hive isn’t suited for low-latency queries. One viable option was to use HBase (metrics team already uses it for Socorro – https://wiki.mozilla.org/Socorro:HBase) and then integrate it with a Hive-HBase plugin for SQLish queries. Possible in theory, the process is bit tedious to support faceted queries, wild-card searches etc. inside HBase. Solr-Cloud was the next obvious choice, however, Daniel Einspanjer (@deinspanjer) had other ideas when he suggested ElasticSearch (http://www.elasticsearch.com). Frankly, I hadn’t heard of ElasticSearch before and SOLR is synonymous when it comes to building an indexing solution, we had our doubts. Now, Solr-Cloud and ElasticSearch use the same underlying technology, Lucene, for indexing data, support REST API querying, can handle millions of documents and are a cloud solution. We had few days to experiment and decided to spin few machines for ElasticSearch, and it turned out to be a great investment!

New technologies are really cool to implement, but once you past the initial honeymoon phase, complexities start surfacing up and one either enters the “cul-de-sac” mode to fall-back on the more traditional methods. Determined to not go through this ordeal, we decided to stress test the ElasticSearch cluster to atleast 5x of expected load.

Integrating with Flume + ElasticSearch + Hive

The Flume-Hive patch (FLUME-74) had given us enough insights in writing Flume plugins, so we decided to extend it and support ElasticSearch. ElasticSearch has a nice Java plugin written by @tallpsmith. While it would solve the purpose, Flume and ES needed to be on same network to be discovered via multicast and it relied on certain ES Java API’s. If you are building an asynchronous systems, better interface them with a web-based API than traditional Java APIs, and ElasticSearch has a real nice REST API.

While ES provides all functionalities of a traditional search engine, we wanted to maintain the ability to run complex queries via Hive. This meant, forking each log event to two different locations, HDFS (eventually Hive) and ElasticSearch. The new sequence of events looked something like this:
BuildBot -> Flume agent -> Flume collector -> REST API for realtime indexing | Write to HDFS and eventually Hive

The CustomDFSSink.Java appends every event to the filestream object as it is received, this turned out to be a perfect place to insert the REST API. The data already exists in the “Event e” object, might as well marshal it to an equivalent JSON for ES.


The first 3 arguments to ‘hiveElasticSearchCollectorSink‘ are exactly similar to the original hiveCollectorSink(), the latter 3 are specific to ElasticSearch. On very high level, an ES index is like a database and type is like a table. You can read more about ElasticSearch at http://www.elasticsearch.com/docs/elasticsearch

Failover scenario

Things fail when you least expect them to, especially on a long weekend or when you are in midst of crucial demo. Flume offers recovery from certain fail-over scenarios, however, its recovery system is based understandably on “acks” and follows a specific protocol. While its prudent to use existing code, the current system seemed bit overkill for us and we already had a “marker-folder” failover system in place with Hive. The marker-folder scenario fitted ES quite nicely. On a 10,000 foot level, each event makes a single attempt to POST request to ES, if it fails, writes data to a “marker-folder” with corresponding meta-data. Data inside marker-folder is run at specific intervals by a custom sink that runs the queries inside marker-folder in addition to its mundane Hive and ES tasks.

Problem with dfs.close()

DFS isn’t a traditional file system and while it’s good at a lot of stuff, atomicity isn’t its strength. While stress testing the new setup, EOFException (stack trace here) would intermittently appear in our logs. After spending a couple of days in trying to debug this problem, it suddenly struck us that dfs.close() is void in nature and the call isn’t atomic in nature; meaning its possible that even after a .close() returns, HDFS might still be writing data to disk, usually happens when you are writing large files to disk with gzip compression. As a recap, after every .close(), HiveCollectorSink() used to move each HDFS file to its own partition, thereby causing a race condition between actually finishing the .close() operation and “ALTER TABLE” performing the internal .move() operation to its own partition. One way to fix this problem was not move the file, continue writing to the original HDFS location and use that location as a partition with following HQL “ALTER TABLE CREATE IF NOT EXISTS PARTITION“. The “IF NOT EXISTS” clause allows us to add data to the same hourly partition and not worry about overwriting existing data or creating multiple partitions for a 10-minute data rollover inside Flume.

Need for a Custom Marker Sink

The marker-logic reads each file inside the “marker-folder“, performs its operations and then “deletes” the file. This creates a race-problem when you have multiple sinks, each reading and deleting the file. Duplicity isn’t an issue out here and the system will continue to work except for few innocuous errors related to race condition of multiple sinks trying to delete a file while other sink has already deleted the file. Using a special sink() to perform the “marker-folder” operations helped us get rid of the errors and added the much needed role abstraction.

BuildBot setup

Sample flow listed below:
connect node1.research.hadoop.sjc1.mozilla.com

exec spawn cm-metricsetl02.mozilla.org rawlogs
exec spawn node2-research-collector rawlogscollector
wait 10000
exec config rawlogs ‘rawlogsflow’ ‘exec(“/home/aphadke/buildparsers/installed-logparser/logparser/bin/scrape-gen.sh”,false,true,600000)’ ‘autoDFOChain’
exec config rawlogscollector ‘rawlogsflow’ ‘autoCollectorSource’ ‘HiveCollectorSink(“hdfs://admin1.research.hadoop.sjc1.mozilla.com/flume/%{host}/raw-logs/%Y-%m-%d-%H/”,”logs”,”build_logs”)’

The current architecture consists of four flows, rawlogs, buildlogs, testruns and testfailures; Each flow outputting a specific JSON to STDOUT on the flume agent-node. The agent-node reads STDOUT data making its way to the flume collector-node via DFO sink. DFO sink makes sure that data gets transferred reliably to its intended destination. The four-flows write data to their own directory structure and to their respective Hive and ElasticSearch end-points.


The “non-ACTIVE” problem: Flume is an awesome product but it isn’t perfect and there are instances wherein certain flows though they appear “ACTIVE” aren’t spewing out any data. One way to circumvent the “non-ACTIVE” problem is monitor ElasticSearch or HDFS for new files every ‘x’ hours. If the setup isn’t working as expected, Flume offers a nice option to “kick-start” the entire configuration:
<flume_location>/bin/flume shell -c <flume-master> -e “exec refreshAll”

The “refreshAll” command internally refreshes all the existing flows, one might notice “ERROR” state for a few flows via web-ui but things should resume to normalcy in about a minute.

Data integrity problem

DFO is reliable, however, its a catch-22 situation in relying on an existing code base and hoping it works as advertised or writing your own checksum stuff. If data integrity is vital, please write your own integrity-checking scripts. We have our own custom scripts that query data from source and confirm its availability inside ES, data is re-inserted at the beginning of pipeline if ES returns zero results.

Next Steps

The entire flume-elasticsearch-Hive code can be downloaded from https://github.com/anuragphadke/Flume-Hive. While we make efforts to keep the code in sync with Flume’s-trunk, there might be some delay in syncing it to the head.

Why isn’t this part of Flume code-base?

Fair question, we are in the process of merging the two together, however, its not possible to have a traditional plugin as the data needs to be forked to two different locations (Hive & ES). Writing code to fork the same data in two different locations is fairly complex and will involve a lot of duplicate code. The current code interferes with the core functionality of CustomDfsSink and few other classes. This isn’t ideal, Hive and ES are plugins and the corresponding code + jars shouldn’t be part of the core Flume codebase, especially for users who don’t need this functionality. That said, we are working closely with Flume community and hope to have an integrated solution sooner than later.

Feel free to let us know your thoughts, brickbats, flowers via comments or email: <first letter of my name followed by my last name> @ <mozilla>-dot-<com>

Find this cool and cutting edge? There’s ton more happening under the hood, We are actively hiring and would love to have you talk to us. Please visit http://www.mozilla.com/en-US/about/careers.html and help us make the web a better place.


BuildBot is an extremely complex project and it wouldn’t have been possible with the dedicated efforts of the following members and open-source community:
Daniel Eisenpanjer, Xavier Stevens, Anurag Phadke (metrics team)
Jonathan Griffin, Jeff Hammel, Clint Talbert (Automation Tools team)
Jonathan Hsieh, Eric Sammer, Patrick Angels (Flume/cloudera)
Shay Bannon (Elastic Search)

10 Responses to “Flume, Hive and realtime indexing via ElasticSearch”

  1. [...] This post was mentioned on Twitter by Planet Mozilla, anurag, anurag and others. anurag said: CORRECTED Flume, Hive and ElasticSearch at Mozilla – http://bit.ly/flume-es-hive WP cache seems to have an old copy, sorry for the bad link [...]

  2. on 02 Jan 2011 at 6:23 am Rich Kroll

    Thanks for the great writeup! I was just wondering if you have used the fanout collector instead of combining the two sinks in one?

  3. on 02 Jan 2011 at 8:50 am deinspanjer

    We have done a bit of exploration with fanout, but currently, fanout is incompatible with DFO. If the DFO gets an ack before any failures, it will not handle failures from any other sinks. Conversely, if the very last sink to report in is a failure, then it will resend to all the sinks.

    I wanted to try to patch this, but we didn’t have enough time in our schedule, and Cloudera devs mentioned that there was some re-architecture work to be done in that area so I was afraid that we might be patching soon-to-be dead code.

  4. on 02 Jan 2011 at 10:10 am Jonathan Hsieh

    Great post guys! We’re working on making the Flume’s agents/collectors topologies more flexible to make doing what you have done easier in the future!

  5. on 02 Jan 2011 at 11:07 am aphadke

    Thanks for the compliment! Excited to see what Flume has to offer in 2011 :-)


  6. on 04 Jan 2011 at 5:47 am Axel Hecht

    Is there information on what exactly you store, and how folks beyond WOO could get to that/benefit from it?

  7. on 04 Jan 2011 at 6:35 am deinspanjer

    There is some documentation, but I don’t have the link handy at the moment. The A-team will be putting a front-end on this that should be accessible to anyone.
    The data source is tinderbox logs, and if the UI is too specific then we can certainly look at providing access to the data in elasticsearch.

  8. [...] when you star an orange, the data is pushed into our database.  This database is hosted by the metrics team and stores log files and tbpl data.  Actually we pull from that directly and can calculate by test [...]

  9. on 11 Apr 2011 at 7:26 pm Rich Kroll

    @deinspanjer I’ve done some experimenting and thought you may be interested in some of the findings. I am interested in using ElasticSearch in a similar way as you outlined in this post but really wanted to have ES live in it’s own decorator and leverage a fan out sink to write to both ES and Hive. This would allow for additional decorators over ES or have (think batching).

    What I ended up doing was to add a UUID/GUID to the flume event when it is created. This is later used as the ID in ES when indexing the log event and allows for an event to be idempotent. The drawback to this design is that each event failure causes an additional write to ES. As there is no queue in front of ES, this could be a problem in the event of a large bulk of failed messages, but could be mitigated with other decorators.


  10. [...] guy lives and dies by the data. In late 2010, the metrics team gave a small talk on how we collect log data (click here for the video ppt). While that project has gone multiple iterations over time, the [...]

Trackback URI | Comments RSS

Leave a Reply

You must be logged in to post a comment.