Dabeaz

Dave Beazley's mondo computer blog. [ homepage | archive ]

Saturday, July 31, 2010

 

Yieldable Threads (Part 1)

Disclaimer: This whole post is one big thought experiment. It might, in fact, be a really dumb idea. Don't say that you weren't warned! - Dave.

Introduction

I'll admit that I'm an unabashed fan of Python generator functions--especially when applied to problems in data processing (e.g., setting up processing pipelines, cranking on big datasets, etc.). Generators also have a rather curious use in the world of concurrency--especially in libraries that aim to provide an alternative to threading (i.e., tasklets, greenlets, coroutines, etc.). Just in case you missed them, I've given past PyCon tutorials on both generators and coroutines.

A common theme surrounding the use of generators and concurrency is that you can define functions that seem to operate as "tasks" without using any system threads (sometimes this approach is known as microthreading or green threading). Typically this is done by playing clever tricks with I/O operations. For example, suppose you initially had the following function that served a client in a multithreaded server.

def serve_client(c):
    request = c.recv(MAXSIZE)        # Read from a socket
    response = processing(result)    # Process the request
    c.send(response)                 # Send on a socket
    c.close()

With the assistance of a coroutine or tasklet library, you might be able to get rid of threads and rewrite the function slightly, using yield statements like this (keep in mind this is a high-level overview--the actual specifics might vary):

def serve_client(c):
    request = yield c.recv(MAXSIZE)        # Read from a socket
    response = processing(result)          # Process the request
    yield c.send(response)                 # Send on a socket
    c.close()

If you've never seen anything like this before, it will probably make your head spin (see my coroutines tutorial from PyCon'09). However, the gist of the idea is that the yield statements cause the function to suspend execution at points where I/O operations might block. Underneath the covers, the I/O request operations (recv, send, etc.) are handled by a scheduler that uses nonblocking I/O and multiplexing (e.g., select(), poll(), etc.) to drive the execution of multiple generator functions at once, giving the illusion of concurrent execution. To be sure, it's a neat trick and it works great--well, so long as the processing() operation in the middle is well behaved.

Sadly, it's not generally safe to assume that the processing step will play nice. In fact, a major limitation of coroutine (and event-driven) approaches concerns the handling of processing steps that block or consume a large number of CPU cycles. This is because any kind of blocking causes the entire program and all "tasks" to grind to a halt until that operation completes. This becomes a major concern if you are going to use other programming libraries as most code has not been written to operate in such a non-blocking manner. In fact, libraries based on polling and non-blocking I/O typically take great pains to work around this limitation (for instance, consider the difficulty of performing a blocking third-party library database query in this environment).

Threads : I'm Not Dead Yet

A simple solution that almost always eliminates the problem of blocking is to program with threads. Yes, threads--everyone's favorite public enemy number one. In fact, threads work great for blocking operations. Not only that, the resulting threaded code tends to have readable and comprehensible control-flow (e.g., organized as a logical sequence of steps as opposed to being fragmented across dozens of asynchronous callbacks and event handlers). Frankly, I suspect that most Python programmers would prefer to use threads if it weren't for the simple fact that their performance on CPU-bound processing sucks (damn you GIL!). However, I digress--I've already said more than enough about that.

A Different Premise (Thought Experiment)

Generator and event-based based alternatives to threads are usually based on a premise that thread programming should be avoided. However, all of these thread alternatives are also strongly based on reimplementing the one part of thread programming that actually works reasonably well--handling of blocking I/O.

As a thought experiment, I got to wondering--why do thread alternatives fix what isn't broken about threads? If you really wanted to fix threads, wouldn't you want to address the part of thread programming that actually is broken? In particular, the poor execution of CPU-intensive work.

Yielding CPU-intensive work

Generator-based alternatives to threads use the yield statement to have I/O operations carried out elsewhere (by a scheduler sitting behind the scenes). However, what if you fully embraced threads, but applied that same idea to CPU intensive processing instead of I/O? For example, consider a threaded function that looked like this:

def serve_client(c):
    request = c.recv(MAXSIZE)                # Read from a socket
    response = yield processing, (result,)   # Request processing (somehow)
    c.send(response)                         # Send on a socket
    c.close()

In this code, yield is not used to perform non-blocking I/O. Instead, it's used to "punt" on CPU-intensive processing. For example, instead of directly calling the processing() function above, the yield statement merely spits it out to someone else. In a sense, the thread is using yield to say that it does NOT want to do that work and that it wants to suspend until someone else finishes it.

Some careful study is probably required, but just to emphasize, the above generator freely performs blocking I/O operations (something threads are good at), but kicks out problematic CPU-intensive processing using yield. It's almost the exact opposite of what you normally see with generator-based microthreading.

A Yieldable Thread Object

To run such a function as a thread, you need to have a little bit of extra runtime support. The following code defines a new thread object that knows how to utilize our new use of yield:

# ythr.py
# Author : David Beazley
# 
# A yieldable thread object that offloads CPU intensive work 
# to a user-defined apply function

from threading import Thread
from types import GeneratorType

# Compatibility function (for Python 3)
def apply(func,args=(),kwargs={}):
    return func(*args,**kwargs)

class YieldableThread(Thread):
    def __init__(self,target,args=(),kwargs={},cpu_apply=None):
        Thread.__init__(self)
        self.__target = target
        self.__args = args
        self.__kwargs = kwargs
        self.__cpu_apply = cpu_apply if cpu_apply else apply

    # Run the thread and check for generators (if any)
    def run(self):
        # Call the specified target function
        result = self.__target(*self.__args,**self.__kwargs)
        # Check if the result is a generator.  If so, run it
        if isinstance(result, GeneratorType):
            genfunc   = result    # generator function to run
            genresult = None      # last result to send into the generator
            while True:
                try:
                    # Run to the next yield and get work to do
                    work = genfunc.send(genresult)
                    # Execute the work using the user-defined apply function
                    genresult = self.__cpu_apply(*work)
                except StopIteration:
                    break

The key to this implementation is the bottom part of the run() method that checks to see if the target function produced a generator. If so, the run method manually steps through the generator (using its send() method). The yielded results are assumed to represent CPU-intensive functions that need to execute. For each of these, the work is passed to a user-supplied apply function (the __cpu_apply attribute). By default, this function is set to apply() which makes the thread run the work as if yield wasn't used at all. However, as we'll see shortly, there are many different things that can be done by supplying a different implementation.

An Example

To explore this thread implementation, we first need a CPU-intensive function to work with. Here's a trivial one just for the purpose of exploring the concept:

# A trivial CPU-bound function
def sumn(n):
    total = 0
    while n > 0:
        total += n
        n -= 1
    return total

This function just computes the sum of the first N integers in a really dumb way. Here is an example:

>>> sumn(25000)
312512500
>>> timeit('sumn(25000)','from __main__ import sumn',number=1000)
4.500338077545166
>>>

As you can see, summing the first 25000 integers 1000 times takes about 4.5 seconds (4.5 msec to do it just once). Remember that number--we'll return to it shortly.

Next, we need to mix in some I/O. Let's write a really simple multithreaded TCP server that turns the above function into a internet service. This code is just a standard threaded server that uses none of our magic (yet).

# sumserver.py
# 
# A server that computes sum of n integers

from socket import *
from threading import Thread

# CPU-bound function
def sumn(n):
    total = 0
    while n > 0:
        total += n
        n -= 1
    return total

# Function that handles clients
def serve_client(c):
    n = int(c.recv(16))
    result = sumn(n)
    c.send(str(result).encode('ascii'))
    c.close()

# Plain-old threaded server
def run_server(addr):
    s = socket(AF_INET, SOCK_STREAM)
    s.setsockopt(SOL_SOCKET, SO_REUSEADDR,1)
    s.bind(addr)
    s.listen(5)
    while True:
        c,a = s.accept()
        thr = Thread(target=serve_client,args=(c,))
        thr.daemon = True
        thr.start()
        
if __name__ == '__main__':
    run_server(("",10000))

Finally, let's write a test client program that can be used to make a bunch of requests and time the performance.

# sumclient.py
from socket import *
def run_client(addr,repetitions,n):
    while repetitions > 0:
        s = socket(AF_INET, SOCK_STREAM)
        s.connect(addr)
        s.send(str(n).encode('ascii'))
        resp = s.recv(128)
        s.close()
        repetitions -= 1

if __name__ == '__main__':
    import sys
    import time
    import threading

    ADDR = ("",10000)
    REPETITIONS = 1000
    N = 25000

    nclients = int(sys.argv[1])
    requests_per_client = REPETITIONS//nclients

    # Launch a set of client threads to make requests and time them
    thrs = [threading.Thread(target=run_client,args=(ADDR,requests_per_client,N)) for n in range(nclients)]
    start = time.time()
    for t in thrs:
        t.start()
    for t in thrs:
        t.join()
    end = time.time()
    print("%d total requests" % (nclients*requests_per_client))
    print(end-start)

This client simply initiates 1000 requests with the server, using different numbers of threads. Let's run the server and try the client with different numbers of threads.

bash-3.2$ python sumclient.py 1         # 1 client thread
1000 total requests
4.34612298012
bash-3.2$ python sumclient.py 2         # 2 client threads
1000 total requests
7.81390690804
bash-3.2$ python sumclient.py 4         # 4 client threads
1000 total requests
9.5317029953
bash-3.2$ python sumclient.py 8         # 8 client threads
1000 total requests
10.2061738968
bash-3.2$ 

Observe that with only 1 client thread, the performance of the server is comparable with the performance of timeit(). Making 1000 requests takes about 4.3 seconds (in fact, it seems to be a little faster). However, if we start increasing the concurrency the performance degrades fast. With 4 client threads, the server is already running twice as slow. This is not a surprise--we already know that Python threads have problems with CPU bound processing.

A Modified Example (Using yield)

Let's modify the server code to use our new YieldableThread object. Here is the code:

# ysumserver.py
# 
# A server that computes sum of n integers (using yieldable threads)

from socket import *
from ythr import YieldableThread

# CPU-bound function (unmodified)
def sumn(n):
    total = 0
    while n > 0:
        total += n
        n -= 1
    return total

# Function that handles clients
def serve_client(c):
    n = int(c.recv(16))
    result = yield sumn, (n,)               # Notice use of yield
    c.send(str(result).encode('ascii'))
    c.close()

# Threaded server that uses yieldable threads. Note extra cpu_apply
# argument that allows a user-defined apply() function to be passed
def run_server(addr,cpu_apply=None):
    s = socket(AF_INET, SOCK_STREAM)
    s.setsockopt(SOL_SOCKET, SO_REUSEADDR,1)
    s.bind(addr)
    s.listen(5)
    while True:
        c,a = s.accept()
        thr = YieldableThread(target=serve_client,args=(c,),cpu_apply=cpu_apply)
        thr.daemon = True
        thr.start()
        
if __name__ == '__main__':
    run_server(("",10000))

Observe that this version of the code is only slightly modified.

By default, yieldable threads should have performance comparable to normal threads. Try the client again with this new server:

bash-3.2$ python sumclient.py 1
1000 total requests
4.95635294914
bash-3.2$ python sumclient.py 2
1000 total requests
7.82525205612
bash-3.2$ python sumclient.py 4
1000 total requests
9.25957417488
bash-3.2$ python sumclient.py 8
1000 total requests
9.95880198479

Yep, the same lousy performance as before. So, where is this going?

Add Some Special Magic

Recall that yieldable threads allow the user to pass in their own custom apply() function for performing CPU-bound processing. That's where the magic enters the picture.

Let's write a new apply function and try running the server again. Try this one:

# ysumserver.py
...
# A locked version of apply that only allows one thread to run
from threading import Lock
_apply_lock = Lock()
def exclusive_apply(func,args=(),kwargs={}):
    with _apply_lock:
         return func(*args,**kwargs)

if __name__ == '__main__':
    run_server(("",10000),cpu_apply=exclusive_apply)

Let's try our client with this new server:

bash-3.2$ python sumclient.py 1
1000 total requests
4.55530810356
bash-3.2$ python sumclient.py 2
1000 total requests
5.75427007675
bash-3.2$ python sumclient.py 4
1000 total requests
5.75416207314
bash-3.2$ python sumclient.py 8
1000 total requests
5.81962108612
bash-3.2$ 

Wow! Look at the change for the threaded clients. When running with 8 threads, this new server serves requests about 1.7x faster. No code modifications were made to the server--only a different specification of the apply() function.

How is this possible you ask? Well, if you recall from my GIL talk, CPU-bound threads tend to fight with each other on certain multicore machines. By putting that lock in the apply function, threads aren't allowed to fight anymore (only one gets to run CPU-intensive work at once). Again, keep in mind that the work in this example only takes about 4.5 milliseconds--we're getting a nice speedup even though none of the threads are running in the apply function for very long.

Here is another more interesting example. Let's farm the CPU-intensive work out to a multiprocessing pool. Change the server slightly:

# ysumserver.py
...
if __name__ == '__main__':
    import multiprocessing
    pool = multiprocessing.Pool()
    run_server(("",10000),cpu_apply=pool.apply)

Now, let's try our client again.

bash-3.2$ python sumclient.py 1
1000 total requests
4.50634002686
bash-3.2$ python sumclient.py 2
1000 total requests
2.29651284218
bash-3.2$ python sumclient.py 4
1000 total requests
1.45105290413
bash-3.2$ python sumclient.py 8
1000 total requests
1.59892106056
bash-3.2$                      

Hey, look at that--the performance is actually getting better! For example, the performance with 4 client threads is more than 3 times faster than with just one thread. This is because the CPU-intensive work is now being handled on multiple cores through the use of the multiprocessing module.

Wrapping up (for now)

Since this post is already getting long, I'm going to wrap it up. However, let's conclude by revisiting a previous bit of code. In our server, we defined a client handler function like this:

def serve_client(c):
    n = int(c.recv(16))
    result = yield sumn, (n,)          
    c.send(str(result).encode('ascii'))
    c.close()

In this code, there are no dependencies on any libraries or special objects. It fact, all it does is spit out a bit of CPU-bound processing with the yield statement. Behind the scenes, the YieldableThread object is free to take this work and do whatever it wants to with it. For example, run it in a special environment, pass it to the multiprocessing module, send it out to the cloud, etc. I think that's kind of cool.

Of course, at this point, you might be asking yourself, "what can possibly go wrong?" To answer that, you'll have to wait for the next installment. However, as a preview, I'll just say that the answer is "a lot!"

Postscript

All of my tests were performed using Python 2.7 running on a 4-core Mac Pro(2 x 2.66 GHz, Dual-Core Intel Xeon) running OS X version 10.6.4.

Although I've never seen generators used quite like this before, I don't want to steal anyone's thunder--if you are aware of prior work, please send me a link so I can post it here.

Additional Postscript

If you like messing around with concurrency, distributed systems, and other neat things, then you would probably like the courses that I teach in Chicago.






<< Home

Archives

Prior Posts by Topic

08/01/2009 - 09/01/2009   09/01/2009 - 10/01/2009   10/01/2009 - 11/01/2009   11/01/2009 - 12/01/2009   12/01/2009 - 01/01/2010   01/01/2010 - 02/01/2010   02/01/2010 - 03/01/2010   04/01/2010 - 05/01/2010   05/01/2010 - 06/01/2010   07/01/2010 - 08/01/2010   08/01/2010 - 09/01/2010   09/01/2010 - 10/01/2010   12/01/2010 - 01/01/2011   01/01/2011 - 02/01/2011   02/01/2011 - 03/01/2011   03/01/2011 - 04/01/2011   04/01/2011 - 05/01/2011   05/01/2011 - 06/01/2011   08/01/2011 - 09/01/2011   09/01/2011 - 10/01/2011   12/01/2011 - 01/01/2012   01/01/2012 - 02/01/2012   02/01/2012 - 03/01/2012   03/01/2012 - 04/01/2012   07/01/2012 - 08/01/2012   01/01/2013 - 02/01/2013   03/01/2013 - 04/01/2013   06/01/2014 - 07/01/2014   09/01/2014 - 10/01/2014  

This page is powered by Blogger. Isn't yours?

Subscribe to Posts [Atom]