4 #ifndef __SIMPLE_THREADING_CHANNEL__
5 #define __SIMPLE_THREADING_CHANNEL__
9 #include <condition_variable>
15 #include <type_traits>
17 #include "utility.hpp"
18 #include "context.hpp"
19 #include "message.hpp"
47 inline void wait(std::unique_lock<std::mutex>& lk) {
53 inline void notify() {
59 std::condition_variable cv;
63 blocker(data* d) : m_data(d) { }
64 ~blocker(){ m_data->notify(); }
71 context() : m_closed(false) { }
77 inline bool closed()
const {
78 std::lock_guard<std::mutex> lk(m_mtx);
82 inline void close(
bool soft=
true) {
83 std::unique_lock<std::mutex> lk(m_mtx);
97 while(m_blockers.size() > m_msg_q.size()) {
98 m_blockers.pop_front();
103 inline std::size_t queued()
const {
104 std::lock_guard<std::mutex> lk(m_mtx);
105 return m_msg_q.size();
108 inline std::size_t blocked_receivers()
const {
109 std::lock_guard<std::mutex> lk(m_mtx);
110 return m_blockers.size();
114 std::unique_lock<std::mutex> lk(m_mtx);
116 if(m_closed || !msg) {
119 m_msg_q.push_back(std::move(msg));
121 while(m_msg_q.size() && m_blockers.size()) {
122 m_blockers.pop_front();
131 std::unique_lock<std::mutex> lk(m_mtx);
134 if(!m_msg_q.empty()) {
135 msg = std::move(m_msg_q.front());
137 return st::state::success;
138 }
else if(m_closed) {
139 return st::state::closed;
144 m_blockers.push_back(std::shared_ptr<blocker>(
new blocker(&d)));
150 return st::state::failure;
154 mutable std::mutex m_mtx;
155 std::deque<st::message> m_msg_q;
156 std::deque<std::shared_ptr<blocker>> m_blockers;
179 ch.ctx(std::make_shared<detail::channel::context>());
187 return ctx() ?
ctx()->closed() :
true;
204 return ctx() ?
ctx()->blocked_receivers() : 0;
227 return ctx() ?
ctx()->recv(msg,
true) == state::success :
false;
240 return ctx() ?
ctx()->recv(msg,
false) : state::closed;
247 return ctx() ?
ctx()->queued() : 0;
261 template <
typename... As>
279 public std::input_iterator_tag
281 inline void increment() {
282 bool ret =
ctx() ?
ctx()->recv(msg,
true) == state::success :
false;
294 iterator(std::shared_ptr<detail::channel::context> rhs) {
368 template <
typename F,
typename... As>
369 bool async(std::size_t resp_id, F&& f, As&&... as) {
370 using isv =
typename std::is_void<detail::function_return_type<F,As...>>::type;
375 std::forward<As>(as)...);
390 template<
class Rep,
class Period,
typename P>
391 bool timer(std::size_t resp_id,
const std::chrono::duration<Rep, Period>& timeout, P&& payload) {
394 [=]()
mutable -> decltype(std::forward<P>(payload)) {
395 std::this_thread::sleep_for(timeout);
396 return std::forward<P>(payload);
411 template<
class Rep,
class Period>
412 bool timer(std::size_t resp_id,
const std::chrono::duration<Rep, Period>& timeout) {
415 [=]()
mutable ->
void {
416 std::this_thread::sleep_for(timeout);
421 template <
typename F,
typename... As>
422 bool async_impl(std::true_type, std::size_t resp_id, F&& f, As&&... as) {
427 std::async([=]()
mutable {
428 f(std::forward<As>(as)...);
439 template <
typename F,
typename... As>
440 bool async_impl(std::false_type, std::size_t resp_id, F&& f, As&&... as) {
445 std::async([=]()
mutable {
446 auto result = f(std::forward<As>(as)...);
implementation of an input iterator for st::channel
Definition: channel.hpp:280
Interthread message passing queue.
Definition: channel.hpp:170
iterator end() const
Definition: channel.hpp:340
iterator begin() const
Definition: channel.hpp:328
bool async(std::size_t resp_id, F &&f, As &&... as)
asynchronously execute user Callable on a system thread
Definition: channel.hpp:369
bool timer(std::size_t resp_id, const std::chrono::duration< Rep, Period > &timeout)
start a timer
Definition: channel.hpp:412
state try_recv(st::message &msg)
do a non-blocking message receive over the channel
Definition: channel.hpp:239
bool timer(std::size_t resp_id, const std::chrono::duration< Rep, Period > &timeout, P &&payload)
start a timer
Definition: channel.hpp:391
void close(bool soft=true)
st::channel is set to the closed state
Definition: channel.hpp:194
bool recv(st::message &msg)
receive a message over the channel
Definition: channel.hpp:226
std::size_t blocked_receivers() const
Definition: channel.hpp:203
std::size_t queued() const
Definition: channel.hpp:246
static channel make()
Construct an allocated channel.
Definition: channel.hpp:177
bool send(As &&... as)
send an st::message with given parameters into the internal message queue
Definition: channel.hpp:262
bool closed() const
Definition: channel.hpp:186
interthread type erased message container
Definition: message.hpp:36
static message make()
construct a message
Definition: message.hpp:90
CRTP-templated interface to provide shared context api.
Definition: context.hpp:43
std::shared_ptr< detail::channel::context > & ctx() const
Definition: context.hpp:50