5995-hd-needle-thread-wallpaperFrom long time ago threads are needed in programming to overlap operations which don't depend between them. Nowadays, with the multi-cores is something basic for every application if you want to achieve a good performance.  This introduce some problems like accessing shared variables at the same time by several threads or synchronize tasks on the code. I will try to cover how to deal with these topics on C++11 analyzing the executions with helgrind, a Valgrind tool. 

 

Creating both producer and consumer threads

Now, with C++11 this task pretty straightforward. We just need to declare the threads using std::thread types and in a similar way it is done in pthreads we pass the function we want execute in the new thread:

#include <thread>
#include <iostream>
int buffer = 0;
void produce(const int threadId)
{
    std::cout << "Producer started on thread: " << threadId << std::endl;
    while(1)
    {
        // do stuff
        ++buffer;
    }
}
void consume(const int threadId)
{
    std::cout << "Consumer started on thread: " << threadId << std::endl;
    while(1)
    {
        // do stuff
        --buffer;
    }
}
int main()
{
    std::cout << "buffer address: " << &buffer << std::endl;
    std::thread producer(produce, 0);
    std::thread consumer(consume, 1);
    producer.join();
    consumer.join();
    return 0;
}

 

Whereas the contructor and thread execution is an asynchronous task, the join method blocks the current thread (main thread in this case) until the new thread finishes the execution. If you don't wait for your threads the application will continue executing and, in this case, it will throw an exception and it will finish the execution inmediately.

This can be compiled through:

$ g++ producer-consumer.cpp  -std=c++11 -pthread

 

Race conditions

Do you think the previous example works fine, sorry it is not. Try to pass this example by valgrind, it will complain telling you about data races:

$ valgrind --tool=helgrind ./a.out 
==3917== Helgrind, a thread error detector
==3917== Copyright (C) 2007-2013, and GNU GPL'd, by OpenWorks LLP et al.
==3917== Using Valgrind-3.10.1 and LibVEX; rerun with -h for copyright info
==3917== Command: ./a.out
==3917==
buffer address: 0x804d190
==3917== Thread #3 was created
==3917== Thread #2 was created
==3917== Possible data race during read of size 4 at 0x804D190 by thread #3
==3917== This conflicts with a previous write of size 4 by thread #2
...

As you may notice both threads are writing on buffer variable and increase or decrease operations are divided into three operations: read, add/sub and store. If while one of them reads the variable and just the other one writes the value will be unconssistent.   

 

Avoiding race conditions

Avoid race conditions can be done easily by applying mutexes, which are the best way to lock a resource until someone finishes to work with it, so it become in a thread safe resource. C++11 provides std::mutex type to lock the resources:

#include <thread>
#include <mutex>
#include <iostream>
int buffer = 0;
std::mutex mutex;
void produce(const int threadId)
{
    std::cout << "Producer started on thread: " << threadId << std::endl;
    while(1)
    {
        mutex.lock();
        ++buffer;
        mutex.unlock();
    }
}
void consume(const int threadId)
{
    std::cout << "Consumer started on thread: " << threadId << std::endl;
    while(1)
    {
        mutex.lock();
        --buffer;
        mutex.unlock();
    }
}
int main()
{
    std::cout << "buffer address: " << &buffer << std::endl;
    std::thread producer(produce, 0);
    std::thread consumer(consume, 1);
    producer.join();
    consumer.join();
    return 0;
}

 

Synchronizing threads

What happens if you need to set limits in your buffer? In that case you need to synchronize the threads using the condition variables. Condition variables allow to wait until some condition becomes true and in C++11 this is done through std::condition_variable type which has wait() and notify_one() public methods: wait() is called to release the mutex and blocks the current thread, while notify() is called to wake up a waiting thread a let it get the mutex.

#include <thread>
#include <mutex>
#include <iostream>
#include <condition_variable>
int buffer = 0;
const int MAX = 10;
std::mutex mutex;
std::condition_variable full, empty;
void produce(const int threadId)
{
    //std::cout << "Producer started on thread: " << threadId << std::endl;
    while(1)
    {
        std::unique_lock<std::mutex> lock(mutex);
        if (buffer == MAX) {
            //std::cout << "Producer waiting" << std::endl;
            full.wait(lock);
        }
        ++buffer;
        //std::cout << buffer << std::endl;
        empty.notify_one();
        lock.unlock();
    }
}
void consume(const int threadId)
{
    //std::cout << "Consumer started on thread: " << threadId << std::endl;
    while(1)
    {
        std::unique_lock<std::mutex> lock(mutex);
        if (buffer == 0) {
            //std::cout << "Consumer waiting for elements" << std::endl;
            empty.wait(lock);
        }
        --buffer;
        //std::cout << buffer << std::endl;
        full.notify_one();
        lock.unlock();
    }
}
int main()
{
    std::cout << "buffer address: " << &buffer << std::endl;
    std::thread producer(produce, 0);
    std::thread consumer(consume, 1);
    producer.join();
    consumer.join();
    return 0;
}

 

Using pthreads

Although threads in C++11 use POSIX threads (pthreads) the use of its functions directly it's more tricky.  As you can see the structure is quite similar the saw above except we need to initialize and destroy the mutex and condition variables.

#include <iostream>
#include <pthread.h>
#include <stdio.h>
int buffer = 0;
const int MAX = 10;
pthread_mutex_t mutex;
pthread_cond_t full, empty;
void* produce(void* threadId)
{
    //std::cout << "Producer started on thread: " << (int)threadId << std::endl;
    while(1)
    {
        pthread_mutex_lock(&mutex);
        if (buffer == MAX) {
            //std::cout << "Producer waiting" << std::endl;
            pthread_cond_wait(&full, &mutex);
        }
        ++buffer;
        //std::cout << buffer << std::endl;
        pthread_cond_signal(&empty);
        pthread_mutex_unlock(&mutex);
    }
}
void* consume(void* threadId)
{
    //std::cout << "Consumer started on thread: " << (int)threadId << std::endl;
    while(1)
    {
        pthread_mutex_lock(&mutex);
        if (buffer == 0) {
            //std::cout << "Consumer waiting for elements" << std::endl;
            pthread_cond_wait(&empty, &mutex);
        }
        --buffer;
        //std::cout << buffer << std::endl;
        pthread_cond_signal(&full);
        pthread_mutex_unlock(&mutex);
    }
}
int main()
{
    std::cout << "buffer address: " << &buffer << std::endl;
    pthread_t producer;
    pthread_t consumer;
    // Initializing mutex
    pthread_mutex_init(&mutex, NULL); // same as: mutex = PTHREAD_MUTEX_INITIALIZER;
    // Initializing cond variables
    pthread_cond_init(&full, NULL);  // same as: full = PTHREAD_COND_INITIALIZER;
    pthread_cond_init(&empty, NULL);
    // Creating threads
    pthread_create(&producer, NULL, produce, (void*)0);
    pthread_create(&consumer, NULL, consume, (void*)1);
    // Wait until threads finish
    pthread_join(producer, NULL);
    pthread_join(consumer, NULL);
    // Destroy environment
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&empty);
    pthread_cond_destroy(&full);
    pthread_exit(NULL);
    return 0;
}

In this way, what actually C++11 threads extension does is to introduce shorter function names and remove the need to initialize and destroy objects manually.