inicio mail me! sindicaci;ón

Producer/Consumer in Python

Like Hello World in imperative programming the Producer/Consumer problem is one of kind of program everybody has wrote once in a lifetime.

It’s one of the toy examples in concurrent programming demonstrating the basics of synchronization.

Basically there’s a shared buffer, a producer entity and a consumer one. The producer generates some data and puts it in the buffer (which can have one or multiple slots), then the consumer gets the data from the same buffer and does something with it.

Now we’re going to see a couple (and half) implementations in Python. I bet there are tons of ways to solve this little problem and hence I’ll show you a threaded implementation with locking around around data structures which are not thread safe, a slight modification using Queue objects and an alternative implementation using just simple generators.

Threads and Locks

I’m not going to repeat here why threads are mostly bad for concurrency, debugging and so on because there’s plenty of documentation on the net about that.

The core of the concept it’s: if you want to share your data structures among threads be prepared to lock operations (and eventually step into locking problems) unless those structures are thread safe per se.

Locking basically means identifying a critical section and wrapping it in a sort of synchronization primitive: mutexes, semaphores, primitive locks and others.

In our example the shared data structure is the buffer so it’s there where we’re gonna put the locks into.

I modeled the Buffer class as a resemblance of the Queue class for consistency matters. Let’s see:

[code lang="python"] class Buffer(object): def init(self): self._list = list() self._lock = Lock()

def get(self):
    with self._lock:
        return self._list.pop()

def put(self, val):
    with self._lock:
        self._list.append(val)

def qsize(self):
    return len(self._list)

[/code]

The class has a private list() instance to hold the values and uses a Lock object to wrap the critical sections. I also use the new with statement introduced in Python 2.5 which helps to automatically release the lock after the critical section.

The Producer and Consumer are Thread objects:

[code lang="python"] class Producer(Thread): def init(self, buffer, name, limit=10): # … [/code]

As you can see we’ve to pass the buffer object at the time we instantiate the class like this:

[code lang="python"] def main(): buffer = Buffer() # we create a single buffer shared among both threads prod = Producer(buffer, “Producer”) cons = Consumer(buffer, “Consumer”) prod.start(); cons.start(); [/code]

This way the threads will not leave the data structure in an inconsistent state. This is a proof of concept obviously. You can find the whole source code here: prodcons-threaded-locks.py

Queue

As I said this is a proof of concept because in simple situations like that we don’t need to code a data structure from scratch with Python. The most pythonic way to handle data in a thread safe manner is to make use of the Queue objects.

These objects implement a thread safe FIFO queue, exactly what we need for the example.

I won’t show the code because it’s trivial so you can find it here: prodcons-threaded.py

Generators

Since we don’t like threads really much let’s indulge a bit in our puppy example and rewrite the example without them.

One built-in way to do that is to take advantage of the power of generators enabling us to create coroutines (aka resumable functions).

The main problem is the loss of a scheduler. With threads we don’t have to worry about the scheduling of the tasks because the preemptive system scheduler takes care of all in our place. With generators we don’t have that luxury and since we love code reuse we’re going to use a very cool (and simple) example made by Maciej Obarski.

The Scheduler class in our case simply iterates in the task list and calls the next() method to resume and move forward the computation in the generator object:

[code lang="python"] class Scheduler(object): def init(self): self.tasks = deque()

def add_task(self, task):
    self.tasks.append(task)

def main(self):
    tasks = self.tasks
    while True:
        try:
            tasks[0].generator.next()
            tasks.rotate(-1)
        except StopIteration:
            del tasks[0]
        except IndexError:
            break

[/code]

No black magic indeed.

The Producer and Consumer are pretty similar of the threaded ones except they don’t inherit from Thread and they have to hand over explicitly the control to the caller (it’s called cooperative instead of preemptive multitasking). How they do that? With yield.

This is a portion of the generator based Consumer class:

[code lang="python"] def run(self): “”"Here we simulate two ‘resume’ points in the function:

First we take the value and hand over the control to the scheduler,
then we process the value we got. We can do so because this function
is a coroutine and it stores its local variables across the calls."""
while self.buffer.qsize():
    data = self.buffer.get()
    time.sleep(0.5)
    print "%s has got %s from the buffer." % (self.name, data)
    yield True
    time.sleep(0.5)
    print "%s is doing something with %s" % (self.name, data)
    yield True

[/code]

The black magic here it’s deep inside how the compiler treats functions with yield turning them in some sort of coroutines.

As before you can read the full source code here: prodcons-generator.py.

Conclusions

I think if you want to get serious about concurrency in Python without threads you have to learn external tools like py.greenlet, Stackless Python, asyncore (I’m kidding), Twisted, pyevent and more. Another interesting project which makes an extensive use of generators is BBC’s Kamaelia.

Have fun.

Related posts

  • Producer/Consumer with greenlets
  • Producer/Consumer with multitask library
  • Unicode and Python
  • Reddit has got me
  • think about the problem to solve, not about the language
  • Gravatar

    A song for the lovers » Producer/Consumer with greenlets said,

    April 14, 2007 @ 10:46 pm

    [...] is an addition of the post Producer/Consumer in Python I wrote [...]

    Gravatar

    John Doe said,

    April 26, 2007 @ 8:32 pm

    I don’t see any need for the locks as list operations are atomic?!

    Regards John

    Gravatar

    Lawrence said,

    April 26, 2007 @ 11:14 pm

    @John: it’s just a proof of concept

    Gravatar

    Klaus said,

    April 27, 2007 @ 7:18 am

    Have a look at SimPy (Simulation in Python); http://simpy.sourceforge.net . SimPy uses generators for co-routines and comes with a variety of process synchronization constructs (e.g. semaphores, event signalling, waiting for a general condition). It is easy to implement the general “n Producers/m Consumers” problem correctly in SimPy.

    Regards,

    Klaus

    Gravatar

    Lawrence said,

    April 27, 2007 @ 3:56 pm

    Cool, I’ll give it a try Klaus, Thanks!

    Gravatar

    A song for the lovers » Producer/Consumer with multitask library said,

    May 29, 2007 @ 7:59 pm

    [...] fifteen minutes later I rewrote the classic producer/consumer example with [...]

    RSS feed for comments on this post · TrackBack URI

    Leave a Comment