An Introduction to Asynchronous Programming and Twisted

Part 6: And Then We Took It Higher

This continues the introduction started here. You can find an index to the entire series here.

Poetry for Everyone

We’ve made a lot of progress with our poetry client. Our last version (2.0) is using Transports, Protocols, and Protocol Factories, the workhorses of Twisted networking. But there are more improvements to make. Client 2.0 (and also 2.1) can only be used for downloading poetry at the command line. This is because the PoetryClientFactory is not only in charge of getting poetry, but also in charge of shutting down the program when it’s finished. That’s an odd job for something called “PoetryClientFactory“, it really ought to do nothing beyond making PoetryProtocols and collecting finished poems.

We need a way to send a poem to the code that requested the poem in the first place. In a synchronous program we might make an API like this:

def get_poetry(host, post):
    """Return a poem from the poetry server at the given host and port."""

But of course, we can’t do that here. The above function necessarily blocks until the poem is received in entirety, otherwise it couldn’t work the way the documentation claims. But this is a reactive program so blocking on a network socket is out of the question. We need a way to tell the calling code when the poem is ready, without blocking while the poem is in transit. But this is the same sort of problem that Twisted itself has. Twisted needs to tell our code when a socket is ready for I/O, or when some data has been received, or when a timeout has occurred, etc. We’ve seen that Twisted solves this problem using callbacks, so we can use callbacks too:

def get_poetry(host, port, callback):
    """
    Download a poem from the given host and port and invoke

      callback(poem)

    when the poem is complete.
    """

Now we have an asynchronous API we can use with Twisted, so let’s go ahead and implement it.

As I said before, we will at times be writing code in ways a typical Twisted programmer wouldn’t. This is one of those times and one of those ways. We’ll see in Parts 7 and 8 how to do this the “Twisted way” (surprise, it uses an abstraction!) but starting out simply will give us more insight into the finished version.

Client 3.0

You can find version 3.0 of our poetry client in twisted-client-3/get-poetry.py. This version has an implementation of the get_poetry function:

def get_poetry(host, port, callback):
    from twisted.internet import reactor
    factory = PoetryClientFactory(callback)
    reactor.connectTCP(host, port, factory)

The only new wrinkle here is passing the callback function to the PoetryClientFactory. The factory uses the callback to deliver the poem:

class PoetryClientFactory(ClientFactory):

    protocol = PoetryProtocol

    def __init__(self, callback):
        self.callback = callback

    def poem_finished(self, poem):
        self.callback(poem)

Notice the factory is much simpler than in version 2.1 since it’s no longer in charge of shutting the reactor down. It’s also missing the code for detecting failures to connect, but we’ll fix that in a little bit. The PoetryProtocol itself doesn’t need to change at all so we just re-use the one from client 2.1:

class PoetryProtocol(Protocol):

    poem = ''

    def dataReceived(self, data):
        self.poem += data

    def connectionLost(self, reason):
        self.poemReceived(self.poem)

    def poemReceived(self, poem):
        self.factory.poem_finished(poem)

With this change, the get_poetry function, and the PoetryClientFactory and PoetryProtocol classes, are now completely re-usable. They are all about downloading poetry and nothing else. All the logic for starting up and shutting down the reactor is in the main function of our script:

def poetry_main():
    addresses = parse_args()

    from twisted.internet import reactor

    poems = []

    def got_poem(poem):
        poems.append(poem)
        if len(poems) == len(addresses):
            reactor.stop()

    for address in addresses:
        host, port = address
        get_poetry(host, port, got_poem)

    reactor.run()

    for poem in poems:
        print poem

So if we wanted, we could take the re-usable parts and put them in a shared module that anyone could use to get their poetry (as long as they were using Twisted, of course).

By the way, when you’re actually testing client 3.0 you might re-configure the poetry servers to send the poetry faster or in bigger chunks. Now that the client is less chatty in terms of output it’s not as interesting to watch while it downloads the poems.

Discussion

We can visualize the callback chain at the point when a poem is delivered in Figure 11:

Figure 11: the poem callbacks
Figure 11: the poem callbacks

Figure 11 is worth contemplating. Up until now we have depicted callback chains that terminate with a single call to “our code”. But when you are programming with Twisted, or any single-threaded reactive system, these callback chains might well include bits of our code making callbacks to other bits of our code. In other words, the reactive style of programming doesn’t stop when it reaches code we write ourselves. In a reactor-based system, it’s callbacks all the way down.

Keep that fact in mind when choosing Twisted for a project. When you make this decision:

I’m going to use Twisted!

You are also making this decision:

I’m going to structure my program as a series of asynchronous callback chain invocations powered by a reactor loop!

Now maybe you won’t exclaim it out loud the way I do, but it is nevertheless the case. That’s how Twisted works.

It’s likely that most Python programs are synchronous and most Python modules are synchronous too. If we were writing a synchronous program and suddenly realized it needed some poetry, we might use the synchronous version of our get_poetry function by adding a few lines of code to our script like these:

...
import poetrylib # I just made this module name up
poem = poetrylib.get_poetry(host, port)
...

And continue on our way. If, later on, we decided we didn’t really want that poem after all then we’d just snip out those lines and no one would be the wiser. But if we were writing a synchronous program and then decided to use the Twisted version of get_poetry, we would need to re-architect our program in the asynchronous style using callbacks. We would probably have to make significant changes to the code. Now, I’m not saying it would necessarily be a mistake to rewrite the program. It might very well make sense to do so given our requirements. But it won’t be as simple as adding an import line and an extra function call. Simply put, synchronous and asynchronous code do not mix.

If you are new to Twisted and asynchronous programming, I might recommend writing a few Twisted programs from scratch before you attempt to port an existing codebase. That way you will get a feel for using Twisted without the extra complexity of trying to think in both modes at once as you port from one to the other.

If, however, your program is already asynchronous then using Twisted might be much easier. Twisted integrates relatively smoothly with pyGTK and pyQT, the Python APIs for two reactor-based GUI toolkits.

When Things Go Wrong

In client 3.0 we no longer detect a failure to connect to a poetry server, an omission which causes even more problems than it did in client 1.0. If we tell client 3.0 to download a poem from a non-existent server then instead of crashing it just waits there forever. The clientConnectionFailed callback still gets called, but the default implementation in the ClientFactory base class doesn’t do anything at all. So the got_poem callback is never called, the reactor is never stopped, and we’ve got another do-nothing program like the ones we made in Part 2.

Clearly we need to handle this error, but where? The information about the failure to connect is delivered to the factory object via clientConnectionFailed so we’ll have to start there. But this factory is supposed to be re-usable, and the proper way to handle an error will depend on the context in which the factory is being used. In some applications, missing poetry might be a disaster (No poetry?? Might as well just crash). In others, maybe we just keep on going and try to get another poem from somewhere else.

In other words, the users of get_poetry need to know when things go wrong, not just when they go right. In a synchronous program, get_poetry would raise an Exception and the calling code could handle it with a try/except statement. But in a reactive program, error conditions have to be delivered asynchronously, too. After all, we won’t even find out the connection failed until after get_poetry returns. Here’s one possibility:

def get_poetry(host, port, callback):
    """
    Download a poem from the given host and port and invoke

      callback(poem)

    when the poem is complete. If there is a failure, invoke:

      callback(None)

    instead.
    """

By testing the callback argument (i.e., if poem is None) the client can determine whether we actually got a poem or not. This would suffice for our client to avoid running forever, but that approach still has some problems. First of all, using None to indicate failure is somewhat ad-hoc. Some asynchronous APIs might want to use None as a default return value instead of an error condition. Second, a None value carries a very limited amount of information. It can’t tell us what went wrong, or include a traceback object we can use in debugging. Ok, second try:

def get_poetry(host, port, callback):
    """
    Download a poem from the given host and port and invoke

      callback(poem)

    when the poem is complete. If there is a failure, invoke:

      callback(err)

    instead, where err is an Exception instance.
    """

Using an Exception is closer to what we are used to with synchronous programming. Now we can look at the exception to get more information about what went wrong and None is free for use as a regular value. Normally, though, when we encounter an exception in Python we also get a traceback we can analyze or print to a log for debugging at some later date. Tracebacks are extremely useful so we shouldn’t give them up just because we are using asynchronous programming.

Keep in mind we don’t want a traceback object for the point where our callback is invoked, that’s not where the problem happened. What we really want is both the Exception instance and the traceback from the point where that exception was raised (assuming it was raised and not simply created).

Twisted includes an abstraction called a Failure that wraps up both an Exception and the traceback, if any, that went with it. The Failure docstring explains how to create one. By passing Failure objects to callbacks we can preserve the traceback information that’s so handy for debugging.

There is some example code that uses Failure objects in twisted-failure/failure-examples.py. It shows how Failures can preserve the traceback information from a raised exception, even outside the context of an except block. We won’t dwell too much on making Failure instances. In Part 7 we’ll see that Twisted generally ends up making them for us.

Alright, third try:

def get_poetry(host, port, callback):
    """
    Download a poem from the given host and port and invoke

      callback(poem)

    when the poem is complete. If there is a failure, invoke:

      callback(err)

    instead, where err is a twisted.python.failure.Failure instance.
    """

With this version we get both an Exception and possibly a traceback record when things go wrong. Nice.

We’re almost there, but we’ve got one more problem. Using the same callback for both normal results and failures is kind of odd. In general, we need to do quite different things on failure than on success. In a synchronous Python program we generally handle success and failure with two different code paths in a try/except statement like this:

try:
    attempt_to_do_something_with_poetry()
except RhymeSchemeViolation:
    # the code path when things go wrong
else:
    # the code path when things go so, so right baby

If we want to preserve this style of error-handling, then we need to use a separate code path for failures. In asynchronous programming a separate code path means a separate callback:

def get_poetry(host, port, callback, errback):
    """
    Download a poem from the given host and port and invoke

      callback(poem)

    when the poem is complete. If there is a failure, invoke:

      errback(err)

    instead, where err is a twisted.python.failure.Failure instance.
    """

Client 3.1

Now that we have an API with reasonable error-handling semantics we can implement it. Client 3.1 is located in twisted-client-3/get-poetry-1.py. The changes are pretty straightforward. The PoetryClientFactory gets both a callback and an errback, and now it implements clientConnectionFailed:

class PoetryClientFactory(ClientFactory):

    protocol = PoetryProtocol

    def __init__(self, callback, errback):
        self.callback = callback
        self.errback = errback

    def poem_finished(self, poem):
        self.callback(poem)

    def clientConnectionFailed(self, connector, reason):
        self.errback(reason)

Since clientConnectionFailed already receives a Failure object (the reason argument) that explains why the connection failed, we just pass that along to the errback.

The other changes are all of a piece so I won’t bother posting them here. You can test client 3.1 by using a port with no server like this:

python twisted-client-3/get-poetry-1.py 10004

And you’ll get some output like this:

Poem failed: [Failure instance: Traceback (failure with no frames): : Connection was refused by other side: 111: Connection refused.
]

That’s from the print statement in our poem_failed errback. In this case, Twisted has simply passed us an Exception rather than raising it, so we don’t get a traceback here. But a traceback isn’t really needed since this isn’t a bug, it’s just Twisted informing us, correctly, that we can’t connect to that address.

Summary

Here’s what we’ve learned in Part 6:

  • The APIs we write for Twisted programs will have to be asynchronous.
  • We can’t mix synchronous code with asynchronous code.
  • Thus, we have to use callbacks in our own code, just like Twisted does.
  • And we have to handle errors with callbacks, too.

Does that mean every API we write with Twisted has to include two extra arguments, a callback and an errback? That doesn’t sound so nice. Fortunately, Twisted has an abstraction we can use to eliminate both those arguments and pick up a few extra features in the bargain. We’ll learn about it in Part 7.

Suggested Exercises

  1. Update client 3.1 to timeout if the poem isn’t received after a given period of time. Invoke the errback with a custom exception in that case. Don’t forget to close the connection when you do.
  2. Study the trap method on Failure objects. Compare it to the except clause in the try/except statement.
  3. Use print statements to verify that clientConnectionFailed is called after get_poetry returns.

36 thoughts on “An Introduction to Asynchronous Programming and Twisted”

  1. Hi Dave, first of all, thanks for yout tutorial, it’s the best I’ve found to really begin to understand the way twitter works. There are some other tutorials around, but they all are totally obsolete and don’t explain the concepts as you, they’re just an exposition on how to do a specific task.

    And my doubt: viewing the PoetryProtocol code, I’d like to know why do you need to use poemReceived , why not just adding self.factory.poem_finished(poem) to the connectionLost code. Is it done in order to write a beautiful code or is there any need behind it?.

    Thanks

    1. Hey José, glad you like the tutorial, and that’s a great question. And you are right: there’s isn’t strictly a need for poemReceived in this case, since the protocol is so simple and our use case is, too. But I like poemReceived for a couple reasons.

      First, it’s a kind of documentation. It tells the person reading the Protocol implementation about the sort of ‘higher-level’ messages the Protocol is generating from the low-level stream of bytes.

      And second, it provides a clean place for someone who wants to sub-class our Protocol to post-process the poems as they come in. And by ‘clean’ I mean the method name won’t change even if we update the wire-level protocol implementation.

      This is a common pattern in the Protocols you find in the Twisted source code. For example, the Protocol that can receive netstrings has a stringReceived method and the protocol that can receive line-oriented text data has a lineReceived method. In both cases the methods raise NotImplementedError to signify that you are expected to sub-class them and override their respective methods.

  2. Hi Dave,

    The timeout I have implemented using callLater() but I was intrigued by the Failure.trap() method. Here’s a stripped version of the twisted-intro/twisted-client-3/get-poetry-1.py where I have wrapped the Exception as a Failure and passed it to callLater().

    May I ask if what I did is okay?

    class PoetryProtocol(Protocol):

    def timeOut(self, failure):
    self.factory.errback(failure)

    class PoetryClientFactory(ClientFactory):

    def buildProtocol(self, address):
    proto = ClientFactory.buildProtocol(self, address)
    proto.address = address

    import random
    from twisted.internet import reactor

    # randomize timeouts for now
    timeout = random.randint(1,3)
    msg = ‘==> TIMEOUT for %s’ % (proto.address,)
    exc = PoetryException(msg)
    f = Failure(exc)
    proto.callID = reactor.callLater(timeout, proto.timeOut, f)

    print ‘==> buildProtocol’, address, ‘with TIMEOUT’, timeout
    return proto

    def poetry_main():

    def poem_failed(err):
    print >>sys.stderr, ‘Poem failed:’, err.getErrorMessage()
    errors.append(err)
    e = err.trap(PoetryException)
    if e == PoetryException:
    print ‘==> PoetryException encountered: ‘, err.getErrorMessage()
    poem_done()

    1. Hey Cyril, I assume in timeOut you meant “self.factory.deferred.errback(failure)”? Anyway, a couple of points:

      The way you are doing timeouts may not be quite complete. Calling the errback on the deferred will tell the client that the timeout happened, but it won’t stop the poem from being downloaded. And when that finishes, the deferred will be fired a second time (causing an error, since it’s already been fired). So you also need to set the deferred attributed on the factory to None so the factory won’t fire it a second time and close the connection as well. Does that make sense? The latest version of Twisted has added support for canceling deferred operations. I write about it in Part 19.
      When you do e = err.trap(SomeException) and the code makes it to the next line, you can be sure that e == SomeException. So that if statement is unnecessary. However, if the exception doesn’t match the type you pass in, then trap() raise the exception, causing the deferred to move to the next errback in the chain. So if you’re going to use trap in an errback, you need to make sure there is at least on more errback in the chain. If you just want to see whether a failure is wrapping an exception of a certain type, you would use failure.check(PoetryException) == PoetryException.

      1. Hi Dave,

        Sorry about the “unformatted” code above.

        No, it’s self.factory.errback(failure). I wanted to pass the failure object so that I can pass it on to the factory and noticed that was wrong when you mentioned it. I changed it accordingly.

        Thank you for those important points, I commented the reactor.stop() line and lo, the deferred is indeed being fired!

        I changed my exercise based on your comments and came up with the following:

        class PoetryProtocol(Protocol):
        address = None
        poem = ”
        callID = None

        def dataReceived(self, data):
        self.poem += data

        def connectionLost(self, reason):
        print ‘==> connectionLost()’, self.address
        self.poemReceived(self.poem)

        def poemReceived(self, poem):
        print ‘==> poem_finished()’, self.address
        if self.callID:
        print ‘==> cancelling TIMEOUT…’, self.address
        self.callID.cancel()
        self.callID = None
        self.factory.poem_finished(poem)

        def timeOut(self):
        print ‘==> timeOut()’, self.address, self.callID
        self.callID = None # clear deferred
        self.transport.loseConnection()

        msg = ‘==> TIMEOUT for %s’ % (self.address,)
        exc = PoetryException(msg)
        failure = Failure(exc)
        self.factory.errback(failure)

        The Print statements are obviously for my “debugging”. I tested these with the reactor.stop() commented and it seems to work.

        For this exercise, the whole twisted-intro/twisted-client-3/get-poetry-2.py is at http://pastebin.org/978492 just in case someone’s interested.

        Again, thanks for the tutorials and for taking the time to reply. :)

        1. Whoops, you’re right, I was confused about which Part you were on :)

          I think you’re on the right track, but I believe you need to prevent the callback from being executed
          if the timeout expires, you see what I mean? I tested your code and it ended up calling both the callback
          and the errback in the event of a timeout.

    1. Hey Cyril, I think you are close! It’s just that you need to prevent the callback from being fired
      if the timeout happens. When you call loseConnection() on the transport, that will turn around
      and call connectionLost() on your protocol. Right now, that causes the callback to fire. Then your
      timeout handler will call the errback. So you need one more bit of state on your protocol, you need
      to know whether or not you have fired the callback or errback yet, and avoid firing a second time
      if so. You’ll need to set that state before you close the connection in the timeout handler, to prevent
      the callback from being fired. See what I mean?

  3. Dave,

    Excellent tutorial. However, as we continue to add more of our code to the reactor loop, I am becoming increasingly concerned this wonderful walk-through is a front for drawing phallic 2D sketches ;)

    Great work!

  4. Hmm this one’s decidedly trickier. I’m having trouble making the traceback stick to the failure while making it custom. Not really a big deal in poetry servers, but it kinda bugged me. (And I used the connectionMade method… simplified things!)

  5. Thanks for your great tutorial :)

    I have a question.
    How can I share status between PoetryProtocol instances ?
    It looks like that each PoetryProtocol instance is generated from different instances of PoetryClientFactory.

    I tried like this:
    factory = PoetryClientFactory(callback)
    for address in addresses:
    host, port = address
    reactor.connectTCP(host, port, factory)
    reactor.run()
    however, I could not figure out how to generate deferred object for each PoetryProtocol instance.

    Thanks in advance.(sorry for my poor English)

    1. Hello!

      Each PoetryProtocol instance will get a .factory attribute
      with a reference to the PoetryClientFactory object. So you can store shared
      state on the factory, or another object referenced by the factory.

  6. hello :)

    This is code snippet from this article.
    ———-
    def get_poetry(host, port, callback):
    from twisted.internet import reactor
    factory = PoetryClientFactory(callback)
    reactor.connectTCP(host, port, factory)

    for address in addresses:
    host, port = address
    get_poetry(host, port, got_poem)
    ———-

    then, objects are generated as follows
    +———-+ +———-+
    | factory1 | | factory2 |
    +———-+ +———-+
    | |
    +———–+ +———–+
    | protocol1 | | protocol2 |
    +———–+ +———–+ …

    but I think that, objects must be generated as follows in order to share state
    +———+
    | factory | <- stores shared states
    +———+
    |
    +——————-+
    | |
    +———–+ +———–+
    | protocol1 | | protocol2 |
    +———–+ +———–+ …

    Sorry if I'm wrong…

  7. I failed drawing figures :(

    In the first figure, factory1 has protocol1 and factory2 has protocol2.

    In the second, factory has protocol1 and protocol2.

    1. Oh, sorry, yes you are right, I forgot to look at which article
      your comment was on. But there’s still an easy way to solve
      the problem. Put the shared state on an object that is shared
      amongst all the Factories.

      1. thanks for your reply.

        I think I know what you mean.
        but, is this correct as the Factory Pattern …?

        1. For client connections it’s common to use a different factory for each one.
          For server connections you typically just have one Factory (for a particular
          listening port). You’ll see this in later Parts.

  8. Hi Dave, I am having some trouble in trying to understand from where the callback is originating. What is the business logic here?
    When you are initializing factory with callback. What does it mean? I’m really confused.

    1. I understood what callback in this case is. It is basically the got_poem() method. We are passing it as a parameter itself. But what I still dont get is if there is an error, how does the get_poetry() method come to know about it?

      Also why are you comparing len(poems) + len(errors) with len(addresses) ?

      1. We find out about the error from Twisted telling us via the clientConnectionFailed callback on the Factory object. Twisted calls that when an attempt to connect fails. And in that case get_poetry arranges for the errback to be called. Since all of our attempts to contact a poetry server will either retrieve a poem or result in an error, when the total number of poems and errors equals the number of servers we are done.

        1. So, when :

          get_poetry(host, port, got_poem, poem_failed)
          is invoked.

          This code is run :

          factory = PoetryClientFactory(callback, errback)

          and factory is initialized with a callback object and an errback object, which are actually the got_poem() and poem_failed() methods.

          Now,
          reactor.connectTCP(host, port, factory)

          And if now factory object gets an error, clientConnetionFailed() is called which in turn invokes self.errback(reason) that is the poem_failed(err) method. Have I got the working right?

    1. I would go through a few more tutorials first, but you already have most of what you need — the clientConnectionFailed is going to be the key to this problem, since it will tell you when you cannot make a connection. Instead of connecting to different servers, you’ll make lots of connections to the same server but different port numbers, and keep track of which ports accept a connection and which ones do not.

  9. Yeah I thought so too that I’d be needing to go through some more twisted material. But I guess I’ll start building it up and keep adding functionalities as I progress through this tutorial. Thanks a lot for your help! :)

Leave a Reply