Like Hello World in imperative programming the Producer/Consumer problem is one of kind of program everybody has wrote once in a lifetime.
It’s one of the toy examples in concurrent programming demonstrating the basics of synchronization.
Basically there’s a shared buffer, a producer entity and a consumer one. The producer generates some data and puts it in the buffer (which can have one or multiple slots), then the consumer gets the data from the same buffer and does something with it.
Now we’re going to see a couple (and half) implementations in Python. I bet there are tons of ways to solve this little problem and hence I’ll show you a threaded implementation with locking around around data structures which are not thread safe, a slight modification using Queue objects and an alternative implementation using just simple generators.
Threads and Locks
I’m not going to repeat here why threads are mostly bad for concurrency, debugging and so on because there’s plenty of documentation on the net about that.
The core of the concept it’s: if you want to share your data structures among threads be prepared to lock operations (and eventually step into locking problems) unless those structures are thread safe per se.
Locking basically means identifying a critical section and wrapping it in a sort of synchronization primitive: mutexes, semaphores, primitive locks and others.
In our example the shared data structure is the buffer so it’s there where we’re gonna put the locks into.
I modeled the Buffer class as a resemblance of the Queue class for consistency matters. Let’s see:
[code lang="python"] class Buffer(object): def init(self): self._list = list() self._lock = Lock()
def get(self):
with self._lock:
return self._list.pop()
def put(self, val):
with self._lock:
self._list.append(val)
def qsize(self):
return len(self._list)
[/code]
The class has a private list() instance to hold the values and uses a Lock object to wrap the critical sections. I also use the new with statement introduced in Python 2.5 which helps to automatically release the lock after the critical section.
The Producer and Consumer are Thread objects:
[code lang="python"] class Producer(Thread): def init(self, buffer, name, limit=10): # … [/code]
As you can see we’ve to pass the buffer object at the time we instantiate the class like this:
[code lang="python"] def main(): buffer = Buffer() # we create a single buffer shared among both threads prod = Producer(buffer, “Producer”) cons = Consumer(buffer, “Consumer”) prod.start(); cons.start(); [/code]
This way the threads will not leave the data structure in an inconsistent state. This is a proof of concept obviously. You can find the whole source code here: prodcons-threaded-locks.py
Queue
As I said this is a proof of concept because in simple situations like that we don’t need to code a data structure from scratch with Python. The most pythonic way to handle data in a thread safe manner is to make use of the Queue objects.
These objects implement a thread safe FIFO queue, exactly what we need for the example.
I won’t show the code because it’s trivial so you can find it here: prodcons-threaded.py
Generators
Since we don’t like threads really much let’s indulge a bit in our puppy example and rewrite the example without them.
One built-in way to do that is to take advantage of the power of generators enabling us to create coroutines (aka resumable functions).
The main problem is the loss of a scheduler. With threads we don’t have to worry about the scheduling of the tasks because the preemptive system scheduler takes care of all in our place. With generators we don’t have that luxury and since we love code reuse we’re going to use a very cool (and simple) example made by Maciej Obarski.
The Scheduler class in our case simply iterates in the task list and calls the next() method to resume and move forward the computation in the generator object:
[code lang="python"] class Scheduler(object): def init(self): self.tasks = deque()
def add_task(self, task):
self.tasks.append(task)
def main(self):
tasks = self.tasks
while True:
try:
tasks[0].generator.next()
tasks.rotate(-1)
except StopIteration:
del tasks[0]
except IndexError:
break
[/code]
No black magic indeed.
The Producer and Consumer are pretty similar of the threaded ones except they don’t inherit from Thread and they have to hand over explicitly the control to the caller (it’s called cooperative instead of preemptive multitasking). How they do that? With yield.
This is a portion of the generator based Consumer class:
[code lang="python"] def run(self): “”"Here we simulate two ‘resume’ points in the function:
First we take the value and hand over the control to the scheduler,
then we process the value we got. We can do so because this function
is a coroutine and it stores its local variables across the calls."""
while self.buffer.qsize():
data = self.buffer.get()
time.sleep(0.5)
print "%s has got %s from the buffer." % (self.name, data)
yield True
time.sleep(0.5)
print "%s is doing something with %s" % (self.name, data)
yield True
[/code]
The black magic here it’s deep inside how the compiler treats functions with yield turning them in some sort of coroutines.
As before you can read the full source code here: prodcons-generator.py.
Conclusions
I think if you want to get serious about concurrency in Python without threads you have to learn external tools like py.greenlet, Stackless Python, asyncore (I’m kidding), Twisted, pyevent and more. Another interesting project which makes an extensive use of generators is BBC’s Kamaelia.
Have fun.

