Feed on

The Background

During my work as metrics liaison with the Firefox Input team, an exciting requirement has come up: scalable online clustering of the millions of feedback items that the users of Firefox share with us.

When designing a service at the metrics team, besides functional requirements (accept text messages, produce clusters) we consider scalability and durability. In fact, scalability concerns play a major role in wanting to replace the current solution (which has done a fine job so far) and not picking another powerful existing tool: We expect the influx of messages (already heading towards 2 million) to increase up to 50x once Firefox 4 is released.

On to Architecture

There is a slide outlining what the system (called Grouperfish) is planned to look like. As this service is to be developed quickly and in iterations, even major parts of the system might be replaced in the future though. This is the rationale for our first version, to be released sometime around the Firefox 4 release:


We want to be able to handle tens of thousands of GET’s and thousands of POST’s per second, provided we have enough commodity hardware at our disposal.

To accept incoming documents and queue them for clustering, Node.JS fits the bill. Its event-based concurrency model dominates thread- and process-based designs in IO-bound tasks such as this. Also, depending on the storage you pick, requests might pause to wait on garbage collection or to rewrite store files. Node can handle a lot of waiting requests because it does not use system level threads (or even processes) for concurrency.


Grouperfish must store millions of documents in hundreds of thousands of collections. The generated clusters may reference thousands of documents each, each ranging from a few bytes to about a megabyte. Also, we want to store processing data for clustering.

When planning for more data than fits into your collective RAM, you usually have two options (SQL not being one of them since RAM has become pretty big):

Dynamo-style key/value stores like Riak and Cassandra allow to store replicated values with high write rates, and also to quickly retrieve individual items from disk. You do not need to worry about one machine getting too much attention (e.g. when one of your services gets slashdotted), thanks to consistent hashing. Riak even has a notion of buckets, keys and values: We would intuitively use buckets for collections of documents (and of clusters), and values for individual documents (and clusters). No wonder we looked at this more closely. Unfortunately though, Riak’s buckets are more of a namespacing device than anything else. It is expensive to get all elements of a bucket, since they are neither indexed by a common key nor stored together on disk. The Riak design can be a bit misleading in this regard, as buckets are in fact spread throughout the key space. To retrieve all keys in a bucket, Riak will check every single key — possibly scanning gigabytes of main memory (for the very recent Riak search to help, you’d need to blow up your values quite a bit). And you still only have the keys. To get possibly millions of associated values, you need to move your little disk heads a lot. This is not always as bad as it sounds because Riak gives you streaming access to the data as it comes in. But in general, the smaller your buckets in relation to the entire key space, the higher the cost of retrieving many of them.

The other major contender are column-oriented data stores of the BigTable family, the most prominent of which is Apache HBase (the aforementioned Cassandra is actually somewhat in-between, having properties from both worlds). The two main differences for users of HBase vs. Dynamo style stores as far as we are concerned: 1. Data is stored per column family: to retrieve the vector representations of a million documents, we do not have to scan through a million document texts. 2. Records are sorted by key, much like in a traditional database (but optimized for fast inserts, using LSM trees). This is a blessing and a curse. A blessing, because we can scan over contiguous collections of documents. A curse, because we are vulnerable to hotspotting on popular collections. To counter this, we need to make sure that there are random parts in our row keys, e.g. using UUID’s. Because HBase divides tables into regions as they grow and hands them off to other nodes, this method avoids hotspots. And we do not lose the streaming advantage as long as we use common prefixes per collection.

Given our access patterns (insert documents, update clusters, re-process entire collections, fetch lists of clusters), efficient sequential access to selected parts of the data is very important. Sorted, column oriented storage seems to be the way to go. There are other pros and cons (single point of failure, write throughput, hardware requirements), but if we don’t cater to our use case, those won’t ever matter.


Grouperfish must be able to handle small numbers of large corpora (millions of documents), as well as large numbers of small corpora (millions of collections). The generated clusters may contain thousands of messages each.

This is practically a no-brainer: Apache Mahout supports in-memory operation (for smaller clusters) as well as distributed clustering (using Apache hadoop, for larger clusters). Mahout can update existing clusters with new documents and generate labels for our clusters. Of course, Mahout is a java-library, so we need to run it within a JVM. To simplify management and introspection, we will run our clustering workers in jetty web containers.


We need to be able to add workers to increase clustering frequency. When there are more new messages than can be clustered right away, we want them to be queued. Also, we have Node.JS and we have Java/Mahout. We want our queue to bridge the gap.

Messaging has become a big topic as systems have become larger and more distributed. We want to use messages to decouple write requests from processing them. There is a very elegant solution to maintain queues, offered by the in-memory data store Redis. Redis is somewhat like a developers dream of shared memory. No encoding and decoding of lists, maps and values as they enter and leave the stores — just operate on your data structures within shared memory. Unfortunately, Redis queues are really just a linked list with a blocking POP operation. While that is very nice, we want to track and resubmit failed tasks when a worker node falls victim to rampaging rodents.

The considerations of choosing RabbitMQ to realize a task queue are worth an article of their own. Suffice to say, it has Node- and Java-bindings, and it supports message Acknowledgement from workers. We still want to use Redis to keep track of collection size, to cache the actual incoming data (no need to ask hbase if we use it right away), and for locking, so that every collection is only modified by one worker at a time. We also might use it to cache frequently requested clusters.

More Thoughts

Selecting these components, I learned that it is important to choose technologies in an unbiased fashion, and to reconsider decisions when a technology has no answer for your requirement. For example, I originally wanted to use just Riak for storage — I like its simplicity and style, and the bucket metaphor — but the enumeration of large buckets would be too slow for an online system. It might be fine for a batch-only system, or a system that just does not operate on collections of varying size as much.

For a Message queue, ØMQ sounded awesome, offering low latency and powerful constructs, but I quickly realized that it is not really what I understand a message queue to be, but rather a very smart abstraction over traditional sockets. Probably someone will eventually build a distributed task queue on top of it though.

4 Responses to “Scalable Text Clustering for the Web”

  1. […] Next stop is 3.4 with a ramped adult mobile dashboard and grouperfish clustering […]

  2. on 10 Mar 2011 at 10:35 pm Otis Gospodnetic


    Have you considered using Kafka (another fine thing from SNA guys)?

  3. on 11 Mar 2011 at 7:25 am Michael Kurze

    The design of Kafka does support our requirements (queue semantics, at-least-once delivery). I am sure Node bindings will crop up in the future (to my knowledge currently there are none). RabbitMQ has the additional advantage of talking AMQP, which is nice for interoperability — although the AMQP spec development seems to have been somewhat complicated in the past. Also there is blocking polling, but the Kafka team is working on that if I understand correctly.

  4. […] Scale to cluster over hundreds of millions of pieces of feedback. Actually, we’re already doing this. […]

Trackback URI | Comments RSS

Leave a Reply

You must be logged in to post a comment.