IBM
Shop Support Downloads
IBM Home Products Consulting Industries News About IBM Search
IBM : developerWorks : Linux library
 
Download it now!
PDF (75 KB)
Free Acrobat™ Reader

Common threads: POSIX threads explained, Part 3
Improve efficiency with condition variables

Daniel Robbins
President/CEO, Gentoo Technologies, Inc.
September 2000

Contents:
 Condition variables
 pthread_cond_wait() quiz
 Initialization and cleanup
 Waiting
 Signalling and broadcasting
 The work crew
 The queue
 The data_control code
 Debug time
 The work crew code
 Code walkthrough
 Cleanup special
 Creating work
 threadfunc()
 join_threads()
 Wrapping it up
 Resources
 About the author

In this article, the last of a three-part series on POSIX threads, Daniel takes a good look at how to use condition variables. Condition variables are POSIX thread structures that allow you to "wake up" threads when certain conditions are met. You can think of them as a thread-safe form of signalling. Daniel wraps up the article by using all that you've learned so far to implement a multi-threaded work crew application.

Condition variables explained
I ended
my previous article by describing a particular dilemma: how does a thread deal with a situation where it is waiting for a specific condition to become true? It could repeatedly lock and unlock a mutex, each time checking a shared data structure for a certain value. But this is a waste of time and resources, and this form of busy polling is extremely inefficient. The best way to do this is to use the pthread_cond_wait() call to wait on a particular condition to become true.

It's important to understand what pthread_cond_wait() does -- it's the heart of the POSIX threads signalling system, and also the hardest part to understand.

First, let's consider a scenario where a thread has locked a mutex, in order to take a look at a linked list, and the list happens to be empty. This particular thread can't do anything -- it's designed to remove a node from the list, and there are no nodes available. So, this is what it does:

While still holding the mutex lock, our thread will call pthread_cond_wait(&mycond,&mymutex). The pthread_cond_wait() call is rather complex, so we'll step through each of its operations one at a time.

The first thing pthread_cond_wait() does is simultaneously unlock the mutex mymutex (so that other threads can modify the linked list) and wait on the condition mycond (so that pthread_cond_wait() will wake up when it is "signalled" by another thread). Now that the mutex is unlocked, other threads can access and modify the linked list, possibly adding items.

At this point, the pthread_cond_wait() call has not yet returned. Unlocking the mutex happens immediately, but waiting on the condition mycond is normally a blocking operation, meaning that our thread will go to sleep, consuming no CPU cycles until it is woken up. This is exactly what we want to happen. Our thread is sleeping, waiting for a particular condition to become true, without performing any kind of busy polling that would waste CPU time. From our thread's perspective, it's simply waiting for the pthread_cond_wait() call to return.

Now, to continue the explanation, let's say that another thread (call it "thread 2") locks mymutex and adds an item to our linked list. Immediately after unlocking the mutex, thread 2 calls the function pthread_cond_broadcast(&mycond). By doing so, thread 2 will cause all threads waiting on the mycond condition variable to immediately wake up. This means that our first thread (which is in the middle of a pthread_cond_wait() call) will now wake up.

Now, let's take a look at what happens to our first thread. After thread 2 called pthread_cond_broadcast(&mymutex) you might think that thread 1's pthread_cond_wait() will immediately return. Not so! Instead, pthread_cond_wait() will perform one last operation: relock mymutex. Once pthread_cond_wait() has the lock, it will then return and allow thread 1 to continue execution. At that point, it can immediately check the list for any interesting changes.

Stop and review!
That was pretty complicated, so let's review. Our first thread first called:


    pthread_mutex_lock(&mymutex);

Then, it inspected the list. Nothing interesting was found, so it called:


    pthread_cond_wait(&mycond, &mymutex);

The pthread_cond_wait() call then performed a number of operations before returning:

       
    pthread_mutex_unlock(&mymutex);

It unlocked mymutex and then went to sleep waiting for mycond to receive a POSIX thread "signal". Once receiving a "signal" (in quotes because we're not talking about traditional UNIX signals, but rather a signal from a pthread_cond_signal() or pthread_cond_broadcast() call), it woke up. But pthread_cond_wait() didn't immediately return -- it had one more thing to do: it relocked the mutex:

     
    pthread_mutex_lock(&mymutex);

pthread_cond_wait() knows that we are looking for changes "behind" mymutex, so it goes ahead and acquires the lock for us, and then returns.

pthread_cond_wait() quiz
Now that we've reviewed the pthread_cond_wait() call, you should feel comfortable with how it works. You should be able to recite all the operations that pthread_cond_wait() performs, in order. Give it a try. Once you understand pthread_cond_wait(), the rest is easy, so keep rereading the above section until you have committed it to memory. OK, now that you have, can you tell me what state the mutex must be in before the call to pthread_cond_wait()? And what state is the mutex after the call to pthread_cond_wait() returns? The answer to both questions is "locked". Now that you have fully absorbed the pthread_cond_wait() call, it's time to move on to easier things -- initialization and the actual signalling and broadcasting process. Then, we're going to get very familiar with a bunch of C code that comprises our multithreaded work queue.

Initialization and cleanup
The condition variable is an actual data structure that needs to be initialized. Here's how to do it. First, define or allocate a condition variable, as follows:


    pthread_cond_t mycond;

Then, initialize it by calling:


    pthread_cond_init(&mycond,NULL);

Voila, initialization complete! Now, before deallocating or discarding a condition variable, you'll need to destroy it, as follows:


    pthread_cond_destroy(&mycond);

That was easy. Now let's take a look at the pthread_cond_wait() call.

Waiting
Once you have an initialized mutex and condition variable, you can wait for a condition as follows:


    pthread_cond_wait(&mycond, &mymutex);

Note that your code should logically group mycond and mymutex. There should be only one mutex for a particular condition, and the condition variable should signify a particular type of condition change "inside" the mutex data. You can have a bunch of condition variables for a particular mutex (cond_empty, cond_full, cond_cleanup, for example), but you must have only one mutex per condition variable.

Signalling and broadcasting
Signalling and broadcasting are a snap to grasp. When a thread makes a change to some shared data, and it wants to wake up all waiting threads, you'll want to use the pthread_cond_broadcast call, as follows:


    pthread_cond_broadcast(&mycond);

There are certain instances when the active thread will only need to wake up the first sleeping thread. Imagine a situation where you just added one work job to a queue. You only need to wake up one worker thread (waking up any more would be uncivilized!):


    pthread_cond_signal(&mycond);

This will wake up a single thread. It would have been nice if the POSIX thread standard allowed for an integer to be specified, allowing you to wake up a certain number of sleeping threads. But, unfortunately, I wasn't invited to the meeting.

The work crew
I'm going to show you how to create a multithreaded work crew. In this arrangement, we have a bunch of worker threads that are created. Each checks wq (the "work queue") for any work that needs to be completed. If there is work available, a node is removed from the queue, this particular bundle of work is performed, and thread then waits for new work to arrive.

In the meantime, our main thread is in charge of creating these worker threads, adding work to the queue, and then collecting all the worker threads when it's time to quit. You're about to be inundated with lots of C code, so prepare yourself!

The queue
I needed a queue for two reasons. First, I need a queue to hold the work jobs. I also needed a data structure that could be used to keep track of terminated threads. Remember how in the last few articles (see
Resources at the end of this article), I've mentioned that you need to pthread_join with a specific TID? Using a "cleanup queue" (called "cq") solves the problem of not being able to wait for any terminated thread (we'll cover this in more detail later). Here's my standard queue code. Save this code to files called queue.h and queue.c:

queue.h
/* queue.h
** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.
** Author: Daniel Robbins
** Date: 16 Jun 2000
*/

typedef struct node {
  struct node *next;
} node;

typedef struct queue {
  node *head, *tail; 
} queue;

void queue_init(queue *myroot);
void queue_put(queue *myroot, node *mynode);
node *queue_get(queue *myroot);

queue.c
/* queue.c
** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.
** Author: Daniel Robbins
** Date: 16 Jun 2000
**
** This set of queue functions was originally thread-aware.  I
** redesigned the code to make this set of queue routines
** thread-ignorant (just a generic, boring yet very fast set of queue
** routines).  Why the change?  Because it makes more sense to have
** the thread support as an optional add-on.  Consider a situation
** where you want to add 5 nodes to the queue.  With the
** thread-enabled version, each call to queue_put() would
** automatically lock and unlock the queue mutex 5 times -- that's a
** lot of unnecessary overhead.  However, by moving the thread stuff
** out of the queue routines, the caller can lock the mutex once at
** the beginning, then insert 5 items, and then unlock at the end.
** Moving the lock/unlock code out of the queue functions allows for
** optimizations that aren't possible otherwise.  It also makes this
** code useful for non-threaded applications.
**
** We can easily thread-enable this data structure by using the
** data_control type defined in control.c and control.h.  */

#include <stdio.h>
#include "queue.h"

void queue_init(queue *myroot) {
  myroot->head=NULL;
  myroot->tail=NULL;
}

void queue_put(queue *myroot,node *mynode) {
  mynode->next=NULL;
  if (myroot->tail!=NULL)
    myroot->tail->next=mynode;
  myroot->tail=mynode;
  if (myroot->:head==NULL)
    myroot->head=mynode;
}

node *queue_get(queue *myroot) {
  //get from root
  node *mynode;
  mynode=myroot->head;
  if (myroot->head!=NULL)
    myroot->head=myroot->head->next;
  return mynode;
}

The data_control code
Instead of writing thread-safe queue routines, I created a "data wrapper" or "control" structure that can be used to thread-enable any kind of data structure. Let's take a look at control.h:

control.h
#include 

typedef struct data_control {
  pthread_mutex_t mutex;
  pthread_cond_t cond;
  int active;
} data_control;

Now that you've seen the data_control struct definition, here's a visual representation:

A data_control structure in use


The lock in the image symbolizes the mutex, which is used to allow exclusive access to the data structure. The yellow star symbolizes the condition variable, which can be used to sleep until the data structure in question changes. And the on/off switch represents the "active" int, which is used to tell threads whether this data is active or not. In my code, I use the active int as a flag to tell the work queue when to shut down. Here's control.c:

control.c
/* control.c
** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.
** Author: Daniel Robbins
** Date: 16 Jun 2000
**
** These routines provide an easy way to make any type of
** data-structure thread-aware.  Simply associate a data_control
** structure with the data structure (by creating a new struct, for
** example).  Then, simply lock and unlock the mutex, or
** wait/signal/broadcast on the condition variable in the data_control
** structure as needed.
**
** data_control structs contain an int called "active".  This int is
** intended to be used for a specific kind of multithreaded design,
** where each thread checks the state of "active" every time it locks
** the mutex.  If active is 0, the thread knows that instead of doing
** its normal routine, it should stop itself.  If active is 1, it
** should continue as normal.  So, by setting active to 0, a
** controlling thread can easily inform a thread work crew to shut
** down instead of processing new jobs.  Use the control_activate()
** and control_deactivate() functions, which will also broadcast on
** the data_control struct's condition variable, so that all threads
** stuck in pthread_cond_wait() will wake up, have an opportunity to
** notice the change, and then terminate.
*/

#include "control.h"

int control_init(data_control *mycontrol) {
  int mystatus;
  if (pthread_mutex_init(&(mycontrol->mutex),NULL))
    return 1;
  if (pthread_cond_init(&(mycontrol->cond),NULL))
    return 1;
  mycontrol->active=0;
  return 0;
}

int control_destroy(data_control *mycontrol) {
  int mystatus;
  if (pthread_cond_destroy(&(mycontrol->cond)))
    return 1;
  if (pthread_cond_destroy(&(mycontrol->cond)))
    return 1;
  mycontrol->active=0;
  return 0;
}
int control_activate(data_control *mycontrol) {
  int mystatus;
  if (pthread_mutex_lock(&(mycontrol->mutex)))
    return 0;
  mycontrol->active=1;
  pthread_mutex_unlock(&(mycontrol->mutex));
  pthread_cond_broadcast(&(mycontrol->cond));
  return 1;
}

int control_deactivate(data_control *mycontrol) {
  int mystatus;
  if (pthread_mutex_lock(&(mycontrol->mutex)))
    return 0;
  mycontrol->active=0;
  pthread_mutex_unlock(&(mycontrol->mutex));
  pthread_cond_broadcast(&(mycontrol->cond));
  return 1;
}

Debug time
One more miscellaneous file before we get to the biggie. Here's dbug.h:

dbug.h
#define dabort() \
 {  printf("Aborting at line %d in source file %s\n",__LINE__,__FILE__); abort(); }

We use this code to handle unrecoverable errors in our work crew code.

The work crew code
Speaking of the work crew code, here it is:

workcrew.c
#include <stdio.h>
#include <stdlib.h>
#include "control.h"
#include "queue.h"
#include "dbug.h"

/* the work_queue holds tasks for the various threads to complete. */

struct work_queue {
  data_control control;
  queue work;
} wq;


/* I added a job number to the work node.  Normally, the work node
   would contain additional data that needed to be processed. */

typedef struct work_node {
  struct node *next;
  int jobnum;
} wnode;

/* the cleanup queue holds stopped threads.  Before a thread
   terminates, it adds itself to this list.  Since the main thread is
   waiting for changes in this list, it will then wake up and clean up
   the newly terminated thread. */

struct cleanup_queue {
  data_control control;
  queue cleanup;
} cq;

/* I added a thread number (for debugging/instructional purposes) and
   a thread id to the cleanup node.  The cleanup node gets passed to
   the new thread on startup, and just before the thread stops, it
   attaches the cleanup node to the cleanup queue.  The main thread
   monitors the cleanup queue and is the one that performs the
   necessary cleanup. */

typedef struct cleanup_node {
  struct node *next;
  int threadnum;
  pthread_t tid;
} cnode;

void *threadfunc(void *myarg) {

  wnode *mywork;
  cnode *mynode;

  mynode=(cnode *) myarg;

  pthread_mutex_lock(&wq.control.mutex);

  while (wq.control.active) {
    while (wq.work.head==NULL && wq.control.active) {
      pthread_cond_wait(&wq.control.cond, &wq.control.mutex);
    }
    if (!wq.control.active) 
      break;
    //we got something!
    mywork=(wnode *) queue_get(&wq.work);
    pthread_mutex_unlock(&wq.control.mutex);
    //perform processing...
    printf("Thread number %d processing job %d\n",mynode->threadnum,mywork->jobnum);
    free(mywork);
    pthread_mutex_lock(&wq.control.mutex);
  }

  pthread_mutex_unlock(&wq.control.mutex);

  pthread_mutex_lock(&cq.control.mutex);
  queue_put(&cq.cleanup,(node *) mynode);
  pthread_mutex_unlock(&cq.control.mutex);
  pthread_cond_signal(&cq.control.cond);
  printf("thread %d shutting down...\n",mynode->threadnum);
  return NULL;
  
}

#define NUM_WORKERS 4

int numthreads;

void join_threads(void) {
  cnode *curnode;

  printf("joining threads...\n");

  while (numthreads) {
    pthread_mutex_lock(&cq.control.mutex);

    /* below, we sleep until there really is a new cleanup node.  This
       takes care of any false wakeups... even if we break out of
       pthread_cond_wait(), we don't make any assumptions that the
       condition we were waiting for is true.  */

    while (cq.cleanup.head==NULL) {
      pthread_cond_wait(&cq.control.cond,&cq.control.mutex);
    }

    /* at this point, we hold the mutex and there is an item in the
       list that we need to process.  First, we remove the node from
       the queue.  Then, we call pthread_join() on the tid stored in
       the node.  When pthread_join() returns, we have cleaned up
       after a thread.  Only then do we free() the node, decrement the
       number of additional threads we need to wait for and repeat the
       entire process, if necessary */

      curnode = (cnode *) queue_get(&cq.cleanup);
      pthread_mutex_unlock(&cq.control.mutex);
      pthread_join(curnode->tid,NULL);
      printf("joined with thread %d\n",curnode->threadnum);
      free(curnode);
      numthreads--;
  }
}


int create_threads(void) {
  int x;
  cnode *curnode;

  for (x=0; x<NUM_WORKERS; x++) {
    curnode=malloc(sizeof(cnode));
    if (!curnode)
      return 1;
    curnode->threadnum=x;
    if (pthread_create(&curnode->tid, NULL, threadfunc, (void *) curnode))
      return 1;
    printf("created thread %d\n",x);
    numthreads++;
  }
  return 0;
}

void initialize_structs(void) {
  numthreads=0;
  if (control_init(&wq.control))
    dabort();
  queue_init(&wq.work);
  if (control_init(&cq.control)) {
    control_destroy(&wq.control);
    dabort();
  }
  queue_init(&wq.work);
  control_activate(&wq.control);
}

void cleanup_structs(void) {
  control_destroy(&cq.control);
  control_destroy(&wq.control);
}


int main(void) {

  int x;
  wnode *mywork;

  initialize_structs();

  /* CREATION */
  
  if (create_threads()) {
    printf("Error starting threads... cleaning up.\n");
    join_threads();
    dabort();
  }

  pthread_mutex_lock(&wq.control.mutex);
  for (x=0; x<16000; x++) {
    mywork=malloc(sizeof(wnode));
    if (!mywork) {
      printf("ouch! can't malloc!\n");
      break;
    }
    mywork->jobnum=x;
    queue_put(&wq.work,(node *) mywork);
  }
  pthread_mutex_unlock(&wq.control.mutex);
  pthread_cond_broadcast(&wq.control.cond);

  printf("sleeping...\n");
  sleep(2);
  printf("deactivating work queue...\n");
  control_deactivate(&wq.control);
  /* CLEANUP  */

  join_threads();
  cleanup_structs();

}

Code walkthrough
Now it's time for a quick code walkthrough. The first struct defined is called "wq", and contains a data_control and a queue header. The data_control structure will be used to arbitrate access to the entire queue, including the nodes in the queue. Our next job is to define the actual work nodes. To keep the code lean to fit in this article, all that's contained here is a job number.

Next, we create the cleanup queue. The comments explain how this works. OK, now let's skip the threadfunc(), join_threads(), create_threads() and initialize_structs() calls, and jump down to main(). The first thing we do is initialize our structures -- this includes initializing our data_controls and queues, as well as activating our work queue.

Cleanup special
Now it's time to initialize our threads. If you look at our create_threads() call, everything will look pretty normal -- except for one thing. Notice that we are allocating a cleanup node, and initializing its threadnum and TID components. We also pass a cleanup node to each new worker thread as an initial argument. Why do we do this?

Because when a worker thread exits, it'll attach its cleanup node to the cleanup queue, and terminate. Then, our main thread will detect this addition to the cleanup queue (by use of a condition variable) and dequeue the node. Because the TID (thread id) is stored in the cleanup node, our main thread will know exactly which thread terminated. Then, our main thread will call pthread_join(tid), and join with the appropriate worker thread. If we didn't perform such bookkeeping, our main thread would need to join with worker threads in an arbitrary order, possibly in the order that they were created. Because the threads may not necessarily terminate in this order, our main thread could be waiting to join with one thread while it could have been joining with ten others. Can you see how this design decision can really speed up our shutdown code, especially if we were to use hundreds of worker threads?

Creating work
Now that we've started our worker threads (and they're off performing their threadfunc(), which we'll get to in a bit), our main thread begins inserting items into the work queue. First, it locks wq's control mutex, and then allocates 16000 work packets, inserting them into the queue one-by-one. After this is done, pthread_cond_broadcast() is called, so that any sleeping threads are woken up and able to do the work. Then, our main thread sleeps for two seconds, and then deactivates the work queue, telling our worker threads to terminate. Then, our main thread calls the join_threads() function to clean up all the worker threads.

threadfunc()
Time to look at threadfunc(), the code that each worker thread executes. When a worker thread starts, it immediately locks the work queue mutex, gets one work node (if available) and processes it. If no work is available, pthread_cond_wait() is called. You'll notice that it's called in a very tight while() loop, and this is very important. When you wake up from a pthread_cond_wait() call, you should never assume that your condition is definitely true -- it will probably be true, but it may not. The while loop will cause pthread_cond_wait() to be called again if it so happens that the thread was mistakenly woken up and the list is empty.

If there's a work node, we simply print out its job number, free it, and exit. Real code would do something more substantial. At the end of the while() loop, we lock the mutex so we can check the active variable as well as checking for new work nodes at the top of the loop. If you follow the code through, you'll find that if wq.control.active is 0, the while loop will be terminated and the cleanup code at the end of threadfunc() will begin.

The worker thread's part of the cleanup code is pretty interesting. First, it unlocks the work_queue, since pthread_cond_wait() returns with the mutex locked. Then, it gets a lock on the cleanup queue, adds our cleanup node (containing our TID, which the main thread will use for its pthread_join() call), and then it unlocks the cleanup queue. After that, it signals any cq waiters (pthread_cond_signal(&cq.control.cond)) so that the main thread will know that there's a new node to process. We don't use pthread_cond_broadcast() because it's not necessary -- only one thread (the main thread) is waiting for new entries in the cleanup queue. Our worker thread prints a shutdown message, and then terminates, waiting to be pthread_joined() by the main thread when it calls join_threads().

join_threads()
If you want to see a simple example of how condition variables should be used, take a look at the join_threads() function. While we still have worker threads in existence, join_threads() loops, waiting for new cleanup nodes in our cleanup queue. If there is a new node, we dequeue the node, unlock the cleanup queue (so that other cleanup nodes can be added by our worker threads), join with our new thread (using the TID stored in the cleanup node), free the cleanup node, decrement the number of threads "out there", and continue.

Wrapping it up
We've reached the end of the "POSIX threads explained" series, and I hope that you're now ready to begin adding multithreaded code to your own applications. For more information, please see the
Resources section, which also contains a tarball of all the sources used in this article. I'll see you next series!

Resources

  • A tarball of the sources used in this article is available.
  • Your friendly Linux pthread man pages ("man -k pthread") are an excellent resource.
  • For a thorough treatment of POSIX threads, I recommend this book: Programming with POSIX Threads, by David R. Butenhof (Addison-Wesley, 1997). This is arguably the best POSIX threads book available.
  • POSIX threads are also covered in this book: UNIX Network Programming - Networking APIs: Sockets and XTI, by W. Richard Stevens (Prentice Hall, 1997). This is a classic book, but it doesn't cover threads in as much detail as does Programming with POSIX Threads, above.
  • Refer to Daniel's previous articles in the POSIX threads series on developerWorks:
  • See documentation on Linux threads, by Sean Walton, KB7rfa.
  • Take a POSIX threads tutorial by Mark Hays at the University of Arizona.
  • In An Introduction to Pthreads-Tcl, see changes to Tcl that enable it to be used with POSIX threads.
  • Take another tutorial, Getting Started with POSIX Threads, by Tom Wagner and Don Towsley of the Computer Science Department at the University of Massachusetts, Amherst.
  • FSU PThreads is a C library that implements POSIX threads for SunOS 4.1.x, Solaris 2.x, SCO UNIX, FreeBSD, Linux, and DOS.
  • Refer to the home page for POSIX and DCE threads for Linux.
  • See The LinuxThreads library.
  • Proolix is a simple POSIX-compliant operating system for i8086+ under permanent development.

About the author
Daniel Robbins lives in Albuquerque, New Mexico. He is the President/CEO of Gentoo Technologies, Inc., the Chief Architect of the Gentoo Project and a contributing author to several books published by MacMillan: Caldera OpenLinux Unleashed, SuSE Linux Unleashed, and Samba Unleashed. Daniel has been involved with computers in some fashion since the second grade, when he was first exposed to the Logo programming language as well as a potentially dangerous dose of Pac-Man. This probably explains why he has since served as a Lead Graphic Artist at SONY Electronic Publishing/Psygnosis. Daniel enjoys spending time with his wife Mary, and his new baby daughter Hadassah. You can contact Daniel at
drobbins@gentoo.org.


 
What do you think of this article?

Killer! (5) Good stuff (4) So-so; not bad (3) Needs work (2) Lame! (1)

Comments?


Privacy Legal Contact