WireCellToolkit
Wire Cell Simulation, Signal Process and Reconstruction Toolki for Liquid Argon Detectors
mpmc_blocking_q.h
Go to the documentation of this file.
1 #pragma once
2 
3 //
4 // Copyright(c) 2018 Gabi Melman.
5 // Distributed under the MIT License (http://opensource.org/licenses/MIT)
6 //
7 
8 // multi producer-multi consumer blocking queue.
9 // enqueue(..) - will block until room found to put the new message.
10 // enqueue_nowait(..) - will return immediately with false if no room left in
11 // the queue.
12 // dequeue_for(..) - will block until the queue is not empty or timeout have
13 // passed.
14 
16 
17 #include <condition_variable>
18 #include <mutex>
19 
20 namespace spdlog {
21 namespace details {
22 
23 template<typename T>
25 {
26 public:
27  using item_type = T;
28  explicit mpmc_blocking_queue(size_t max_items)
29  : q_(max_items)
30  {
31  }
32 
33 #ifndef __MINGW32__
34  // try to enqueue and block if no room left
35  void enqueue(T &&item)
36  {
37  {
38  std::unique_lock<std::mutex> lock(queue_mutex_);
39  pop_cv_.wait(lock, [this] { return !this->q_.full(); });
40  q_.push_back(std::move(item));
41  }
42  push_cv_.notify_one();
43  }
44 
45  // enqueue immediately. overrun oldest message in the queue if no room left.
46  void enqueue_nowait(T &&item)
47  {
48  {
49  std::unique_lock<std::mutex> lock(queue_mutex_);
50  q_.push_back(std::move(item));
51  }
52  push_cv_.notify_one();
53  }
54 
55  // try to dequeue item. if no item found. wait upto timeout and try again
56  // Return true, if succeeded dequeue item, false otherwise
57  bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
58  {
59  {
60  std::unique_lock<std::mutex> lock(queue_mutex_);
61  if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
62  {
63  return false;
64  }
65  q_.pop_front(popped_item);
66  }
67  pop_cv_.notify_one();
68  return true;
69  }
70 
71 #else
72  // apparently mingw deadlocks if the mutex is released before cv.notify_one(),
73  // so release the mutex at the very end each function.
74 
75  // try to enqueue and block if no room left
76  void enqueue(T &&item)
77  {
78  std::unique_lock<std::mutex> lock(queue_mutex_);
79  pop_cv_.wait(lock, [this] { return !this->q_.full(); });
80  q_.push_back(std::move(item));
81  push_cv_.notify_one();
82  }
83 
84  // enqueue immediately. overrun oldest message in the queue if no room left.
85  void enqueue_nowait(T &&item)
86  {
87  std::unique_lock<std::mutex> lock(queue_mutex_);
88  q_.push_back(std::move(item));
89  push_cv_.notify_one();
90  }
91 
92  // try to dequeue item. if no item found. wait upto timeout and try again
93  // Return true, if succeeded dequeue item, false otherwise
94  bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
95  {
96  std::unique_lock<std::mutex> lock(queue_mutex_);
97  if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
98  {
99  return false;
100  }
101  q_.pop_front(popped_item);
102  pop_cv_.notify_one();
103  return true;
104  }
105 
106 #endif
107 
109  {
110  std::unique_lock<std::mutex> lock(queue_mutex_);
111  return q_.overrun_counter();
112  }
113 
114 private:
115  std::mutex queue_mutex_;
116  std::condition_variable push_cv_;
117  std::condition_variable pop_cv_;
119 };
120 } // namespace details
121 } // namespace spdlog
bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
Definition: async.h:27