GRPC C++  1.26.0
mpmcqueue.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H
20 #define GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H
21 
23 
27 
28 namespace grpc_core {
29 
31 
32 // Abstract base class of a Multiple-Producer-Multiple-Consumer(MPMC) queue
33 // interface
35  public:
36  virtual ~MPMCQueueInterface() {}
37 
38  // Puts elem into queue immediately at the end of queue.
39  // This might cause to block on full queue depending on implementation.
40  virtual void Put(void* elem) = 0;
41 
42  // Removes the oldest element from the queue and return it.
43  // This might cause to block on empty queue depending on implementation.
44  // Optional argument for collecting stats purpose.
45  virtual void* Get(gpr_timespec* wait_time = nullptr) = 0;
46 
47  // Returns number of elements in the queue currently
48  virtual int count() const = 0;
49 };
50 
52  public:
53  // Creates a new MPMC Queue. The queue created will have infinite length.
55 
56  // Releases all resources held by the queue. The queue must be empty, and no
57  // one waits on conditional variables.
59 
60  // Puts elem into queue immediately at the end of queue. Since the queue has
61  // infinite length, this routine will never block and should never fail.
62  void Put(void* elem);
63 
64  // Removes the oldest element from the queue and returns it.
65  // This routine will cause the thread to block if queue is currently empty.
66  // Argument wait_time should be passed in when trace flag turning on (for
67  // collecting stats info purpose.)
68  void* Get(gpr_timespec* wait_time = nullptr);
69 
70  // Returns number of elements in queue currently.
71  // There might be concurrently add/remove on queue, so count might change
72  // quickly.
73  int count() const { return count_.Load(MemoryOrder::RELAXED); }
74 
75  struct Node {
76  Node* next; // Linking
78  void* content; // Points to actual element
79  gpr_timespec insert_time; // Time for stats
80 
81  Node() {
82  next = prev = nullptr;
83  content = nullptr;
84  }
85  };
86 
87  // For test purpose only. Returns number of nodes allocated in queue.
88  // Any allocated node will be alive until the destruction of the queue.
89  int num_nodes() const { return num_nodes_; }
90 
91  // For test purpose only. Returns the initial number of nodes in queue.
92  int init_num_nodes() const { return kQueueInitNumNodes; }
93 
94  private:
95  // For Internal Use Only.
96  // Removes the oldest element from the queue and returns it. This routine
97  // will NOT check whether queue is empty, and it will NOT acquire mutex.
98  // Caller MUST check that queue is not empty and must acquire mutex before
99  // callling.
100  void* PopFront();
101 
102  // Stats of queue. This will only be collect when debug trace mode is on.
103  // All printed stats info will have time measurement in microsecond.
104  struct Stats {
105  uint64_t num_started; // Number of elements have been added to queue
106  uint64_t num_completed; // Number of elements have been removed from
107  // the queue
108  gpr_timespec total_queue_time; // Total waiting time that all the
109  // removed elements have spent in queue
110  gpr_timespec max_queue_time; // Max waiting time among all removed
111  // elements
112  gpr_timespec busy_queue_time; // Accumulated amount of time that queue
113  // was not empty
114 
115  Stats() {
116  num_started = 0;
117  num_completed = 0;
118  total_queue_time = gpr_time_0(GPR_TIMESPAN);
119  max_queue_time = gpr_time_0(GPR_TIMESPAN);
120  busy_queue_time = gpr_time_0(GPR_TIMESPAN);
121  }
122  };
123 
124  // Node for waiting thread queue. Stands for one waiting thread, should have
125  // exact one thread waiting on its CondVar.
126  // Using a doubly linked list for waiting thread queue to wake up waiting
127  // threads in LIFO order to reduce cache misses.
128  struct Waiter {
129  CondVar cv;
130  Waiter* next;
131  Waiter* prev;
132  };
133 
134  // Pushs waiter to the front of queue, require caller held mutex
135  void PushWaiter(Waiter* waiter);
136 
137  // Removes waiter from queue, require caller held mutex
138  void RemoveWaiter(Waiter* waiter);
139 
140  // Returns pointer to the waiter that should be waken up next, should be the
141  // last added waiter.
142  Waiter* TopWaiter();
143 
144  Mutex mu_; // Protecting lock
145  Waiter waiters_; // Head of waiting thread queue
146 
147  // Initial size for delete list
148  static const int kDeleteListInitSize = 1024;
149  // Initial number of nodes allocated
150  static const int kQueueInitNumNodes = 1024;
151 
152  Node** delete_list_ = nullptr; // Keeps track of all allocated array entries
153  // for deleting on destruction
154  size_t delete_list_count_ = 0; // Number of entries in list
155  size_t delete_list_size_ = 0; // Size of the list. List will be expanded to
156  // double size on full
157 
158  Node* queue_head_ = nullptr; // Head of the queue, remove position
159  Node* queue_tail_ = nullptr; // End of queue, insert position
160  Atomic<int> count_{0}; // Number of elements in queue
161  int num_nodes_ = 0; // Number of nodes allocated
162 
163  Stats stats_; // Stats info
164  gpr_timespec busy_time; // Start time of busy queue
165 
166  // Internal Helper.
167  // Allocates an array of nodes of size "num", links all nodes together except
168  // the first node's prev and last node's next. They should be set by caller
169  // manually afterward.
170  Node* AllocateNodes(int num);
171 };
172 
173 } // namespace grpc_core
174 
175 #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H */
T Load(MemoryOrder order) const
Definition: atomic.h:44
Definition: mpmcqueue.h:51
void * Get(gpr_timespec *wait_time=nullptr)
int num_nodes() const
Definition: mpmcqueue.h:89
int count() const
Definition: mpmcqueue.h:73
int init_num_nodes() const
Definition: mpmcqueue.h:92
Definition: mpmcqueue.h:34
virtual int count() const =0
virtual ~MPMCQueueInterface()
Definition: mpmcqueue.h:36
virtual void * Get(gpr_timespec *wait_time=nullptr)=0
virtual void Put(void *elem)=0
@ GPR_TIMESPAN
Unmeasurable clock type: no base, created by taking the difference between two times.
Definition: gpr_types.h:42
GPRAPI gpr_timespec gpr_time_0(gpr_clock_type type)
Time constants.
Internal thread interface.
Definition: backoff.h:26
TraceFlag DebugOnlyTraceFlag
Definition: trace.h:115
DebugOnlyTraceFlag grpc_thread_pool_trace
Analogous to struct timespec.
Definition: gpr_types.h:47
Definition: mpmcqueue.h:75
Node * prev
Definition: mpmcqueue.h:77
Node * next
Definition: mpmcqueue.h:76
Node()
Definition: mpmcqueue.h:81
gpr_timespec insert_time
Definition: mpmcqueue.h:79
void * content
Definition: mpmcqueue.h:78