Synchronization strategy for multithreaded programs

Catalog

A Preliminary Study on the Use of C++11 Threads

std::thread

#include <thread>

There is no danger of Race condition for read-only shared data among threads, while read-write shared data needs to synchronize threads when sharing among threads, that is, data protection, mainly including lock-based and lock-free strategies.

Mutual exclusion locks are commonly used to protect shared data among multi-threads to ensure that only one thread accesses shared data at a certain time, resulting in data protection between threads being serial. Therefore, in a multi-threaded environment, the smaller the area of lock protection, the higher the degree of concurrency.

Waiting for an event or condition to occur with conditional variables

C++11 provides two conditional variables: std::condition_variable and std::condition_variable_any, which need to be used with mutexes to ensure the synchronization of operations. The former can only be used with std::mutex, while the latter can be used with all mutex-like locks and is more general, but at the expense of space, performance or operating system resources, so std::condition_variable is preferred; they are defined in the header file.

The producer-consumer model is widely used in concurrent programming, which helps to decouple the system. Queuing is a common container for transferring data between producer and consumer threads. The first-in, first-out feature of queuing meets the sequential requirements of applications.

Face analysis component uses queue to transfer data. InputData function is called in the producer thread to send the data to the queue, and notify_one is called to notify the consumer thread, so that the consumer thread can analyze the face data. The pseudocode is as follows:

std::mutex mut;
std::condition_variable cond;
std::queue<data_chunk> data_queue;

// Queuing data to be analyzed
int InputData(const data_chunk& data)
{
  if (invalid_data)
  {
    LOG_ERROR("invalid param!");
    return ERROR_CODE;
  }
  // Queuing data
  // And give notice
  std::lock_guard<std::mutex> lk(mut);
  data_queue.push(data);
  cond.notify_one();
  return SUCCESS_CODE;
}

// InputData is called in the producer thread to queue the data to be analyzed
// Consumption thread function Process fetches data from queues for analysis
// Threads go to sleep when conditions do not occur due to blocking of wait function
void Process()
{
  while (not_exit_expression)
  {
    // Use unique_lock instead of lock_guard
    std::unique_lock<std::mutex> lk(mut);

    // The wait function returns when the condition is satisfied, otherwise the thread will be blocked.
    //
    // If the lambda expression returns false (i.e. unconditionally satisfied), wait releases the lock resource in lk.
    // And threads are blocked so that spawning threads can continue to acquire locks and send data to the queue; otherwise,
    //
    // When notify_one notifies the condition variable, consumption eliminates waking up from sleep, retrieves the lock, and checks the condition again. The lambda expression returns true.
    // At this point, wait returns and continues to hold the lock resource, then continues to execute
    cond.wait(lk, [](){ return !data_queue.empty(); });

    data_chunk data = data_queue.front();
    data_queue.pop();

    // The wait holds the lock when it returns, so it needs to be unlocked here.
    lk.unlock();

    // Processing data
    process_the_data;
  }
}

(iv) If the blocking logic (wait function) in the Process loop is replaced by non-blocking mode logic, that is, when the queue is empty, how will the loop continue affect the Process thread?

In the above code, the consumer thread uses std::unique_lock instead of std::lock_guard because the former is more flexible than the latter, and std::lock_guard does not implement lock/unlock member functions. In addition, queues are widely used to transfer data between threads, encapsulating non-thread-safe queues and conditional variables into specific scenario classes, not only coding repetition, but also inefficient and error-prone. Therefore, it is necessary to implement thread-safe queues.

Thread-safe queue adapters

The application scenarios of "Queues, Conditional Variables and Mutual Exclusions" have been introduced in the previous section. Now they are encapsulated as thread-safe queue adapters, CTSQueue.

Thread-safe queue-requirement analysis:

  • It still has the first-in, first-out characteristics of the queue.
  • Supporting data queue end-to-end queue and first-out queue operation;
  • Provides blocked and non-blocked versions of out-of-queue operations;
  • Generic queue;
  • C++11 container and thread synchronization primitives are used, but other synchronization primitives such as posix can also be used.

The complete code is as follows:

#include <queue>              // for queue
#include <memory>             // for std::shared_ptr
#include <mutex>              // for std::mutex
#include <condition_variable> // for std::condition_variable

// Thread-safe queues
template<typename T>
class CTSQueue
{
public:
  CTSQueue() = default;
  ~CTSQueue()
  { }

  // No Replication Operations - Disabled
  CTSQueue(const CTSQueue&) = delete;
  CTSQueue& operator=(const CTSQueue&) = delete;

  // Queuing operation
  void Push(T data)
  {
    std::lock_guard<std::mutex> lk(mut_);
    data_queue_.push(data);
    cond_.notify_one();
  }

  // Out-of-queue operation - non-blocking version
  bool TryPop(T& data)
  {
    std::lock_guard<std::mutex> lk(mut_);
    // The empty queue immediately returns to the queue and fails, otherwise
    // Data Out Queue (Copy Version)
    if (data_queue_.empty())
    {
      return false;
    }
    data = data_queue_.front();
    data_queue_.pop();
    return true;
  }
  std::shared_ptr<T> TryPop()
  {
    std::lock_guard<std::mutex> lk(mut_);
    if (data_queue_.empty())
    {
      return std::shared_ptr<T>();
    }
    // Out-of-queue (non-copy version)
    std::shared_ptr<T> res(std::make_shared<T>(data_queue_.front()));
    data_queue_.pop();
    return res;
  }

  // Out-of-Queue Operation-Blocked Version
  void WaitPop(T& data)
  {
    std::lock_guard<std::mutex> lk(mut_);
    // Call thread blocking when condition does not occur
    // Otherwise, the data is queued (copy version)
    cond_.wait(lk, [this](){ return !data_queue_.empty() });
    data = data_queue_.front();
    data_queue_.pop();
  }
  std::shared_ptr<T> WaitPop()
  {
    std::lock_guard<std::mutex> lk(mut_);
    cond_.wait(lk, [this](){ return !data_queue_.empty() });
    // Out-of-queue (non-copy version)
    std::shared_ptr<T> res(std::make_shared<T>(data_queue_.front()));
    data_queue_.pop();
    return res;
  }

  // Query Queue Status Operation
  bool Empty() const
  {
    std::lock_guard<std::mutex> lk(mut_);
    return data_queue_.empty();
  }

private:
  mutable std::mutex      mut_;
  std::queue<T>           data_queue_;
  std::condition_variable cond_;
};

Reference:
[1] <C++ Concurrency In Action>, https://www.bogotobogo.com/cplusplus/files/CplusplusConcurrencyInAction_PracticalMultithreading.pdf

Tags: PHP Lambda Programming

Posted on Sat, 10 Aug 2019 04:08:00 -0700 by tvance929