simple thread
channel.hpp
1 //SPDX-License-Identifier: LicenseRef-Apache-License-2.0
2 //Author: Blayne Dennis
3 
4 #ifndef __SIMPLE_THREADING_CHANNEL__
5 #define __SIMPLE_THREADING_CHANNEL__
6 
7 #include <memory>
8 #include <mutex>
9 #include <condition_variable>
10 #include <deque>
11 #include <utility>
12 #include <thread>
13 #include <future>
14 #include <chrono>
15 #include <type_traits>
16 
17 #include "utility.hpp"
18 #include "context.hpp"
19 #include "message.hpp"
20 
21 namespace st { // simple thread
22 
34 enum state {
35  closed = 0,
36  failure,
37  success
38 };
39 
40 namespace detail {
41 namespace channel {
42 
43 // private class used to implement `recv()` blocking behavior
44 struct blocker {
45  // stack condition data (not allocated!)
46  struct data {
47  inline void wait(std::unique_lock<std::mutex>& lk) {
48  do {
49  cv.wait(lk);
50  } while(!flag);
51  }
52 
53  inline void notify() {
54  flag = true;
55  cv.notify_one();
56  }
57 
58  bool flag = false;
59  std::condition_variable cv;
60  };
61 
62  blocker() = delete;
63  blocker(data* d) : m_data(d) { }
64  ~blocker(){ m_data->notify(); }
65 
66  // a pointer to stack condition data
67  data* m_data;
68 };
69 
70 struct context {
71  context() : m_closed(false) { }
72 
73  virtual ~context(){
74  close(true);
75  }
76 
77  inline bool closed() const {
78  std::lock_guard<std::mutex> lk(m_mtx);
79  return m_closed;
80  }
81 
82  inline void close(bool soft=true) {
83  std::unique_lock<std::mutex> lk(m_mtx);
84  if(!m_closed) {
85  m_closed = true;
86 
87  if(!soft) {
88  m_msg_q.clear();
89  }
90 
91  // unblock all receivers if no messages
92  if(m_msg_q.empty()) {
93  m_blockers.clear();
94  }
95 
96  // unblock excess receivers if messages are available
97  while(m_blockers.size() > m_msg_q.size()) {
98  m_blockers.pop_front();
99  }
100  }
101  }
102 
103  inline std::size_t queued() const {
104  std::lock_guard<std::mutex> lk(m_mtx);
105  return m_msg_q.size();
106  }
107 
108  inline std::size_t blocked_receivers() const {
109  std::lock_guard<std::mutex> lk(m_mtx);
110  return m_blockers.size();
111  }
112 
113  inline bool send(st::message msg) {
114  std::unique_lock<std::mutex> lk(m_mtx);
115 
116  if(m_closed || !msg) {
117  return false;
118  } else {
119  m_msg_q.push_back(std::move(msg));
120 
121  while(m_msg_q.size() && m_blockers.size()) {
122  m_blockers.pop_front();
123  }
124  return true;
125  }
126  }
127 
128  inline state recv(st::message& msg, bool block) {
129  msg.reset();
130 
131  std::unique_lock<std::mutex> lk(m_mtx);
132 
133  do {
134  if(!m_msg_q.empty()) {
135  msg = std::move(m_msg_q.front());
136  m_msg_q.pop_front();
137  return st::state::success;
138  } else if(m_closed) {
139  return st::state::closed;
140  } else if(block) {
141  // block until message is available or channel termination
142  blocker::data d; // stack condition and flag
143  // ~blocker() will notify condition
144  m_blockers.push_back(std::shared_ptr<blocker>(new blocker(&d)));
145  d.wait(lk);
146  }
147  } while(block);
148 
149  // failed try_recv()
150  return st::state::failure;
151  }
152 
153  bool m_closed;
154  mutable std::mutex m_mtx;
155  std::deque<st::message> m_msg_q;
156  std::deque<std::shared_ptr<blocker>> m_blockers;
157 };
158 
159 }
160 }
161 
170 struct channel : public st::shared_context<channel,detail::channel::context> {
171  inline virtual ~channel() { }
172 
177  static inline channel make() {
178  channel ch;
179  ch.ctx(std::make_shared<detail::channel::context>());
180  return ch;
181  }
182 
186  inline bool closed() const {
187  return ctx() ? ctx()->closed() : true;
188  }
189 
194  inline void close(bool soft=true) {
195  if(ctx()) {
196  ctx()->close(soft);
197  }
198  }
199 
203  inline std::size_t blocked_receivers() const {
204  return ctx() ? ctx()->blocked_receivers() : 0;
205  }
206 
226  inline bool recv(st::message& msg) {
227  return ctx() ? ctx()->recv(msg, true) == state::success : false;
228  }
229 
239  inline state try_recv(st::message& msg) {
240  return ctx() ? ctx()->recv(msg, false) : state::closed;
241  }
242 
246  inline std::size_t queued() const {
247  return ctx() ? ctx()->queued() : 0;
248  }
249 
261  template <typename... As>
262  bool send(As&&... as) {
263  return ctx() ? ctx()->send(st::message::make(std::forward<As>(as)...)) : false;
264  }
265 
266  //--------------------------------------------------------------------------
267  // Iteration
268 
277  class iterator :
278  public st::shared_context<iterator, detail::channel::context>,
279  public std::input_iterator_tag
280  {
281  inline void increment() {
282  bool ret = ctx() ? ctx()->recv(msg, true) == state::success : false;
283 
284  if(!ret) {
285  ctx().reset();
286  }
287  }
288 
289  st::message msg;
290 
291  public:
292  iterator() { }
293 
294  iterator(std::shared_ptr<detail::channel::context> rhs) {
295  ctx(rhs);
296  }
297 
298  inline virtual ~iterator() { }
299 
300  inline iterator& operator=(const iterator& rhs) {
301  ctx() = rhs.ctx();
302  msg = rhs.msg;
303  return *this;
304  }
305 
306  inline st::message& operator*() {
307  return msg;
308  }
309 
310  inline st::message* operator->() {
311  return &msg;
312  }
313 
314  inline iterator& operator++() {
315  increment();
316  return *this;
317  }
318 
319  inline iterator operator++(int) {
320  increment();
321  return *this;
322  }
323  };
324 
328  inline iterator begin() const {
329  iterator it(ctx());
330  ++it; // get an initial value
331  return it;
332  }
333 
340  inline iterator end() const {
341  return iterator();
342  }
343 
344  //--------------------------------------------------------------------------
345  // Asynchronous Execution
346 
368  template <typename F, typename... As>
369  bool async(std::size_t resp_id, F&& f, As&&... as) {
370  using isv = typename std::is_void<detail::function_return_type<F,As...>>::type;
371  return async_impl(
372  isv(),
373  resp_id,
374  std::forward<F>(f),
375  std::forward<As>(as)...);
376  }
377 
390  template< class Rep, class Period, typename P>
391  bool timer(std::size_t resp_id, const std::chrono::duration<Rep, Period>& timeout, P&& payload) {
392  return async(
393  resp_id,
394  [=]() mutable -> decltype(std::forward<P>(payload)) {
395  std::this_thread::sleep_for(timeout);
396  return std::forward<P>(payload);
397  });
398  }
399 
411  template< class Rep, class Period>
412  bool timer(std::size_t resp_id, const std::chrono::duration<Rep, Period>& timeout) {
413  return async(
414  resp_id,
415  [=]() mutable -> void {
416  std::this_thread::sleep_for(timeout);
417  });
418  }
419 
420 private:
421  template <typename F, typename... As>
422  bool async_impl(std::true_type, std::size_t resp_id, F&& f, As&&... as) {
423  if(ctx()) {
424  auto self = ctx();
425 
426  // launch a thread and schedule the call
427  std::async([=]() mutable {
428  f(std::forward<As>(as)...);
429  // capture a copy of the shared send context
430  self->send(st::message::make(resp_id));
431  });
432 
433  return true;
434  } else {
435  return false;
436  }
437  }
438 
439  template <typename F, typename... As>
440  bool async_impl(std::false_type, std::size_t resp_id, F&& f, As&&... as) {
441  if(ctx()) {
442  auto self = ctx();
443 
444  // launch a thread and schedule the call
445  std::async([=]() mutable {
446  auto result = f(std::forward<As>(as)...);
447  // capture a copy of the shared send context
448  self->send(st::message::make(resp_id, result));
449  });
450 
451  return true;
452  } else {
453  return false;
454  }
455  }
456 };
457 
458 }
459 
460 #endif
implementation of an input iterator for st::channel
Definition: channel.hpp:280
Interthread message passing queue.
Definition: channel.hpp:170
iterator end() const
Definition: channel.hpp:340
iterator begin() const
Definition: channel.hpp:328
bool async(std::size_t resp_id, F &&f, As &&... as)
asynchronously execute user Callable on a system thread
Definition: channel.hpp:369
bool timer(std::size_t resp_id, const std::chrono::duration< Rep, Period > &timeout)
start a timer
Definition: channel.hpp:412
state try_recv(st::message &msg)
do a non-blocking message receive over the channel
Definition: channel.hpp:239
bool timer(std::size_t resp_id, const std::chrono::duration< Rep, Period > &timeout, P &&payload)
start a timer
Definition: channel.hpp:391
void close(bool soft=true)
st::channel is set to the closed state
Definition: channel.hpp:194
bool recv(st::message &msg)
receive a message over the channel
Definition: channel.hpp:226
std::size_t blocked_receivers() const
Definition: channel.hpp:203
std::size_t queued() const
Definition: channel.hpp:246
static channel make()
Construct an allocated channel.
Definition: channel.hpp:177
bool send(As &&... as)
send an st::message with given parameters into the internal message queue
Definition: channel.hpp:262
bool closed() const
Definition: channel.hpp:186
interthread type erased message container
Definition: message.hpp:36
static message make()
construct a message
Definition: message.hpp:90
CRTP-templated interface to provide shared context api.
Definition: context.hpp:43
std::shared_ptr< detail::channel::context > & ctx() const
Definition: context.hpp:50