Sane Concurrency with Go

Glyph Lefkowitz wrote an enlightening post recently in which he expounds in some detail about the challenges of writing highly concurrent software. If you write software and you haven’t read it yet I recommend you do. It’s a very well written piece, full of wisdom that no modern software engineer should be without.

There are many tidbits to extract, but if I may be so bold as to provide a summary of its main point, it would be this: The combination of preemptive multitasking and shared state generally leads to unmanageable complexity, and should be avoided by those who would prefer to maintain some amount of their own sanity. Preemptive scheduling is fine for truly parallel activities, but explicit cooperative multitasking is much preferred when mutable state is being shared across multiple concurrent threads.

With cooperative multitasking, your code will still likely be complex, it just has a chance at staying manageably complex. When control transfer is explicit a reader of the code at least has some visible indication of where things might go off the rails. Without explicit markers every new statement is a potential landmine of “What if this operation isn’t atomic with the last?” The spaces between each command become endless voids of darkness from which frightening Heisenbugs arise.

For the last year-plus while working on Heka (a high performance data, log, and metrics processing engine) I’ve been mostly programming in Go. One of Go’s selling points is that there are some very useful concurrency primitives baked right into the language. But how does Go’s approach to concurrency fare when viewed through the lens of encouraging code that supports local reasoning?

Not very well, I’m afraid. Goroutines all have access to the same shared memory space, state is mutable by default, and Go’s scheduler makes no guarantees about when, exactly, context switching will occur. In a single core setting I’d say Go’s runtime falls into the “implicit coroutines” category, option 4 in Glyph’s list of oft-presented async programming patterns. When goroutines can be run in parallel across multiple cores, all bets are off.

Go may not protect you, but that doesn’t mean you can’t take steps to protect yourself. By using some of the primitives that Go provides it’s possible to write code that minimizes unexpected behavior related to preemptive scheduling. Consider the following Go port of Glyph’s example account transfer code (ignoring that floats aren’t actually a great choice for storing fixed point decimal values):

    func Transfer(amount float64, payer, payee *Account,
        server SomeServerType) error {

        if payer.Balance() < amount {
            return errors.New("Insufficient funds")
        }
        log.Printf("%s has sufficient funds", payer)
        payee.Deposit(amount)
        log.Printf("%s received payment", payee)
        payer.Withdraw(amount)
        log.Printf("%s made payment", payer)
        server.UpdateBalances(payer, payee) // Assume this is magic and always works.
        return nil
    }

This is clearly unsafe to be called from multiple goroutines, because they might concurrently get the same result from the Balance calls, and then collectively ask for more than the balance available with the Withdraw calls. It’d be better if we made it so the dangerous part of this code can’t be executed from multiple goroutines. Here’s one way to accomplish that:

    type transfer struct {
        payer *Account
        payee *Account
        amount float64
    }

    var xferChan = make(chan *transfer)
    var errChan = make(chan error)
    func init() {
        go transferLoop()
    }

    func transferLoop() {
        for xfer := range xferChan {
            if xfer.payer.Balance < xfer.amount {
                errChan <- errors.New("Insufficient funds")
                continue
            }
            log.Printf("%s has sufficient funds", xfer.payer)
            xfer.payee.Deposit(xfer.amount)
            log.Printf("%s received payment", xfer.payee)
            xfer.payer.Withdraw(xfer.amount)
            log.Printf("%s made payment", xfer.payer)
            errChan <- nil
        }
    }

    func Transfer(amount float64, payer, payee *Account,
        server SomeServerType) error {

        xfer := &transfer{
            payer: payer,
            payee: payee,
            amount: amount,
        }

        xferChan <- xfer
        err := <-errChan
        if err == nil  {
            server.UpdateBalances(payer, payee) // Still magic.
        }
        return err
    }

There’s quite a bit more code here, but we’ve eliminated the concurrency problem by implementing a trivial event loop. When the code is first executed, it spins up a goroutine running the loop. Transfer requests are passed into the loop over a channel created for that purpose. Results are returned back to outside of the loop via an error channel. Because the channels are unbuffered, they block, and no matter how many concurrent transfer requests come in via the Transfer function, they will be served serially by the single running event loop.

The code above is a little bit awkward, perhaps. A mutex would be a better choice for such a simple case, but I’m trying to demonstrate the technique of isolating state manipulation to a single goroutine. Even with the awkwardness, it performs more than well enough for most requirements, and it works, even with the simplest of Account struct implementations:

    type Account struct {
        balance float64
    }

    func (a *Account) Balance() float64 {
        return a.balance
    }

    func (a *Account) Deposit(amount float64) {
        log.Printf("depositing: %f", amount)
        a.balance += amount
    }

    func (a *Account) Withdraw(amount float64) {
        log.Printf("withdrawing: %f", amount)
        a.balance -= amount
    }

It seems silly that the Account implementation would be so naive, however. It might make more sense to have the Account struct itself provide some protection, by not allowing any withdrawals that are greater than the current balance. What if we changed the Withdraw function to the following?:

    func (a *Account) Withdraw(amount float64) {
        if amount > a.balance {
            log.Println("Insufficient funds")
            return
        }
        log.Printf("withdrawing: %f", amount)
        a.balance -= amount
    }

Unfortunately, this code suffers from the same issue as our original Transfer implementation. Parallel execution or unlucky context switching means we might end up with a negative balance. Luckily, the internal event loop idea applies equally well here, perhaps even more neatly because an event loop goroutine can be nicely coupled with each individual Account struct instance. Here’s an example of what that might look like:

    type Account struct {
        balance float64
        deltaChan chan float64
        balanceChan chan float64
        errChan chan error
    }
    func NewAccount(balance float64) (a *Account) {
        a = &Account{
            balance:     balance,
            deltaChan:   make(chan float64),
            balanceChan: make(chan float64),
            errChan:     make(chan error),
        }
        go a.run()
        return
    }

    func (a *Account) Balance() float64 {
        return <-a.balanceChan
    }

    func (a *Account) Deposit(amount float64) error {
        a.deltaChan <- amount
        return <-a.errChan
    }

    func (a *Account) Withdraw(amount float64) error {
        a.deltaChan <- -amount
        return <-a.errChan
    }

    func (a *Account) applyDelta(amount float64) error {
        newBalance := a.balance + amount
        if newBalance < 0 {
            return errors.New("Insufficient funds")
        }
        a.balance = newBalance
        return nil
    }

    func (a *Account) run() {
        var delta float64
        for {
            select {
            case delta = <-a.deltaChan:
                a.errChan <- a.applyDelta(delta)
            case a.balanceChan <- a.balance:
                // Do nothing, we've accomplished our goal w/ the channel put.
            }
        }
    }

The API is slightly different, with both the Deposit and Withdraw methods now returning errors. Rather than directly handling their requests, though, they put the account balance adjustment amount onto a deltaChan, which feeds into the event loop running in the run method. Similarly, the Balance method requests data from the event loop by blocking until it receives a value over the balanceChan.

The important point to note in the code above is that all direct access to and mutation of the struct’s internal data values is done *within* code that is triggered by the event loop. If the public API calls play nicely and only use the provided channels to interact with the data, then no matter how many concurrent calls are being made to any of the public methods, we know that only one of them is being handled at any given time. Our event loop code is much easier to reason about.

This pattern is central to Heka’s design. When Heka starts, it reads the configuration file and launches each plugin in its own goroutine. Data is fed into the plugins via channel, as are time ticks, shutdown notices, and other control signals. This encourages plugin authors to implement their functionality with an event looptype structure like the example above.

Again, Go doesn’t protect you from yourself. It’s entirely possible to write a Heka plugin (or any struct) that is loose with its internal data management and subject to race conditions. But with a bit of care, and liberal application of Go’s race detector, you can write code that behaves predictably even in the face of preemptive scheduling.

Rob Miller

25 responses

  1. Kenji wrote on :

    There’s sane concurrency? I’ll be damned, I didn’t expect to witness this day ;D

  2. Ralph C. wrote on :

    I implemented a similar technique with the following example: https://gist.github.com/deckarep/7685352 to demonstrate to a friend how you can pull off using channels to synchronize mutating state while nice abstracting it away.

  3. Owen wrote on :

    I don’t think your use of channels to return the errors is safe here. There is nothing that ensures the error generated from a particular requests will go back to the instance that made the request.

    1. Rob Miller wrote on :

      Actually, because the channels are all unbuffered, it’s impossible for more than one incoming request to be waiting on the errChan at a time, and since every operation that uses the deltaChan always generates an errChan response, you know that the error you get back corresponds to the request that you just made. You’re right to look at that carefully, though; if the channels were buffered, or if some requests expected an error while others didn’t, there would be issues.

      1. Marty wrote on :

        You could also pass in a response channel as a field of transfer and return the error on that. Then it doesn’t matter how the channels are configured, but it makes the calling code a bit more complex because it has to set up a channel to receive responses on.

        1. Rob Miller wrote on :

          Ssshhhh!! You’re giving away the stuff I plan on covering in future posts in this series. 😉

  4. Anonymous coward wrote on :

    Mutexes? Race condition detector? I’ll stick with Erlang, thanks.

    1. Rob Miller wrote on :

      Indeed. Unlike Go, Erlang is a language that *does* provide protection against these sorts of issues. As is Haskell, and Rust, and many others. This post has little value to anyone familiar with any of those languages; if you know Erlang, you’re certainly already familiar with the Actor pattern.

      Go may provide less safety and correctness, but it also drastically reduces the learning curve to get started. Its rapid growth in popularity show that it’s definitely hitting a sweet spot of some sort. My little screed is aimed at folks who aren’t already familiar with the lessons that Erlang has to offer.

  5. Richard wrote on :

    Interesting and even a little fun. But you probably do not want to do this sort of math with floating point numbers.

    1. Rob Miller wrote on :

      Hey, you’re supposed to be *ignoring* the fact that floating point numbers are a poor choice here! It says so right in the text, just before the first code snippet. 😉

  6. Alexandre Fiori wrote on :

    Shouldn’t the balance be initialized with 0 in the the `NewAccount` function?

    1. Rob Miller wrote on :

      Oops, actually I just left out the `balance` argument that `NewAccount` was supposed to expect. Fixed, thanks.

      1. Michalis wrote on :

        You don’t need to use the “variable: value” pattern since you initialize all the struct fields.

        So it basically can be boiled down to this:

        a = &Account{balance, make(chan float64), make(chan float64), make(chan error)}

        Also you missed the return statement.

        Heka sounds really interesting, i really want to study the codebase if i will ever find that free time.

        1. Rob Miller wrote on :

          True that the var names aren’t actually needed, another artifact of code evolution. Either spelling is fine, though, not sure it’s worth changing. Facepalm re: leaving out the return, though… next time around I’ll be sure to actually run the code. 😛 I’ve fixed it, thx.

  7. Dustin Sallings wrote on :

    Your first transfer example is a little strange:

    1. Your reinvention of for/range on a channel is an anti-pattern that causes bugs regularly (but no, wouldn’t here, it’s just longer than necessary).
    2. Using a Once for lazy initialization on first call is unnecessary and distracting.
    3. Style-wise, we like to error first and exit when all’s well idiomatically.

    See the following: http://play.golang.org/p/pH8Zy3NL1l

    That’s slightly shorter than yours, but more importantly, I think it makes a better example of go code for strangers on the internet.

    love,
    Internet go policeman who regularly tries to explain people’s programs they’ve copied and pasted from various parts of the internet.

    1. Rob Miller wrote on :

      You’re right, I’m a bit embarrassed to say. I’ll edit the code to match yours. To make meager excuse, I’ll say I’m familiar with (and have used extensively) the idioms in your version, but due to the way the code in my post evolved over a number of revisions (originally the sync.Once was in the `Start` method of a struct, and the for loop contained a select instead of a single channel pull) I ended up with something a bit different than if I had written it straightforwardly. Mea culpa.

  8. pron wrote on :

    When we implemented the Go and Erlang lightweight-thread approach for the JVM in Quasar[1], we quickly realized that CSP/actors, while going a long way towards sane concurrency, are not sufficient. Every application eventually needs some shared mutable state (even if it’s implemented at the database layer). One approach is to have a single fiber (or goroutine) in charge of managing state (Clojure does that with agents), but that doesn’t scale for multiple writers. Neither does a global lock over the entire data store.

    Good concurrent data structures that allow multiple concurrent writers are necessary. Java has some good basic ones like ConcurrentHashMap, ConcurrentSkipListMap, and ConcurrentLinkedQueue. Clojure takes that a step further with transactional refs. For more interesting kinds of data it’s better to have a good in-memory transactional database.

    So for truly “sane concurrency” you need both lightweight threads with CSP and/or actors, plus a transactional, concurrent (preferably in-memory) data store[2].

    [1]: https://github.com/puniverse/quasar

    [2]: http://blog.paralleluniverse.co/2013/10/16/spaceships2/

  9. sesm wrote on :

    It would be interesting to see a comparison between Heka and Apache Storm (http://storm.incubator.apache.org/)

  10. John Graham-Cumming wrote on :

    @Owen: that’s not correct. The run() function ensures a strict receive then transmit ordering. For example, suppose that two goroutines call Withdraw(). Only one will get to transmit on the delta channel, the other will be blocked waiting to transmit. That means that the first one to transmit will also be the one to receive on the error channel because the other goroutine cannot have got to the receive yet.

  11. Brian Stengaard wrote on :

    Wouldn’t your first transferLoop example deadlock if you don’t add a “continue” after that balance check has gone wrong?

    http://play.golang.org/p/eT9EzSru4o (based on Dustin’s proposal)

    1. Brian Stengaard wrote on :

      In all events: excellent article – will definitely have a closer look at Heka.

    2. Rob Miller wrote on :

      Right you are. Clearly I should have written tests for my code.

      Fixed.

  12. yachris wrote on :

    I’m interested in the resulting performance.

    I had a similar thought to Owen above, but I was thinking of two withdrawals and how they’d interact.

    I think what you’ve done for protecting a single number is good, but of course, the implementation serializes all the threads… if a lot of time is spent updating a single account, all the CPUs will be waiting on that one channel.

    So are channels light-weight enough that every account can have one? Do they perform well enough that all the CPUs can get work done when updating multiple accounts simultaneously?

    Also, does Go provide things like the previously mentioned ConcurrentHashMap, which allow for multiple simultaneous writers, or is this something you have to come up with on your own?

    Thanks!

    1. Rob Miller wrote on :

      This is definitely a contrived example, based on extending the contrived example in Glyph’s original post. As some have pointed out, with unbuffered channels there’s not really any benefit to using channels vs. using a mutex. In future posts I plan on building on these ideas to show how hand-rolled event loops and channels can help solve problems where mutexes become awkward.

      That said, channels and goroutines are lightweight, it’s expected that you might have many thousands of them in play at any given time. As with any event loop approach, though, reasonable performance depends on the operations within the loop happening quickly enough that there’s not too much back-pressure on the code that’s waiting for the loop’s response.

      And no, Go itself doesn’t provide anything akin to a ConcurrentHashMap, you just get simpler primitives. A blocking implementation would be trivial to build using locks or an event loop, see the link in Ralph C’s above comment. Real transactionality would be considerably more work, of course. There may be some libraries out there tackling that problem, but I haven’t looked into it to be able to give any recommendations.

  13. Michael wrote on :

    Nice post. Encouraged me to write some ideas in python arising from this post – http://www.sparkslabs.com/michael/blog/2014/03/16/readable-concurrency-in-python
    (based on Guild my rewrite of Kamaelia with an aim for read/writability)