transaction_signal.h
1 /***************************************************************************
2  Copyright (C) 2002-2015 Kentaro Kitagawa
3  kitagawa@phys.s.u-tokyo.ac.jp
4 
5  This program is free software; you can redistribute it and/or
6  modify it under the terms of the GNU Library General Public
7  License as published by the Free Software Foundation; either
8  version 2 of the License, or (at your option) any later version.
9 
10  You should have received a copy of the GNU Library General
11  Public License and a list of authors along with this program;
12  see the files COPYING and AUTHORS.
13  ***************************************************************************/
14 #ifndef TRANSACTION_SIGNAL_H
15 #define TRANSACTION_SIGNAL_H
16 
17 #include "transaction.h"
18 #include <deque>
19 #include <tuple>
20 #include "xsignal.h"
21 
22 namespace Transactional {
23 
24 template <int N>
25 struct CallByTuple {
26  template <class Func, class R, typename TPL, typename... Args>
27  CallByTuple(Func f, R &r, TPL& t, Args&...args) {
28  CallByTuple<N - 1>(f, r, t, std::get<N - 1>(t), args...);
29  }
30 };
31 template <>
32 struct CallByTuple<0> {
33  template <class Func, class R, typename TPL, typename... Args>
34  CallByTuple(Func f, R &r, TPL&, Args&...args) {
35  (r.*f)(std::forward<Args>(args)...);
36  }
37 };
38 
39 template <typename...Args>
40 struct Event {
41  explicit Event(std::tuple<Args...>&& tpl) noexcept : tuple(std::move(tpl)) {}
42  Event(const Event&) = default;
43  Event(Event&&) = default;
44  Event &operator=(const Event&) = delete;
45 private:
46  std::tuple<Args...> tuple;
47 public:
48  template <class Func, class T>
49  void operator()(Func f, T &t) const {
50  CallByTuple<sizeof...(Args)>(f, t, tuple);
51  }
52 };
53 
54 template <class Event>
55 class ListenerBase : public XListener {
56 protected:
57  explicit ListenerBase(XListener::FLAGS flags) : XListener(flags), event() {}
58 public:
59  virtual void operator() (const Event&) const = 0;
60 protected:
61  template <class SS, typename...Args>
62  friend class Talker;
64 };
65 
66 template<class Event, class R, class Func>
67 struct ListenerRef : public ListenerBase<Event> {
68  ListenerRef(R &obj, Func f, XListener::FLAGS flags) noexcept :
69  ListenerBase<Event>(flags), m_func(f), m_obj(obj) { }
70  virtual void operator() (const Event& e) const override {
71  e(m_func, m_obj);
72  }
73 private:
74  Func m_func;
75  R &m_obj;
76 };
77 template<class Event, class R, class Func>
78 struct ListenerWeak : public ListenerBase<Event> {
79  ListenerWeak(const shared_ptr<R> &obj, Func f, XListener::FLAGS flags) noexcept :
80  ListenerBase<Event>(flags), m_func(f), m_obj(obj) { }
81  virtual void operator() (const Event& e) const override {
82  if(auto p = m_obj.lock() ) {
83  e(m_func, *p);
84  }
85  }
86 private:
87  Func m_func;
88  const weak_ptr<R> m_obj;
89 };
90 
91 template <class SS>
92 struct Message_ {
93  virtual ~Message_() = default;
94  virtual void talk(const SS &shot) = 0;
95  virtual int unmark(const shared_ptr<XListener> &x) = 0;
96 };
97 
98 //! M/M Listener and Talker model
99 //! \sa XListener, XSignalStore
100 //! \p tArg: value which will be derivered
101 template <class SS, typename...Args>
102 class Talker {
103 public:
104  virtual ~Talker() = default;
105 
106  template <class R, class T, typename...ArgRefs>
107  shared_ptr<XListener> connect(R& obj, void(T::*func)(ArgRefs...), int flags = 0);
108  template <class R, class T, typename...ArgRefs>
109  shared_ptr<XListener> connectWeakly(const shared_ptr<R> &obj,
110  void (T::*func)(ArgRefs...), int flags = 0);
111 
112  void connect(const shared_ptr<XListener> &x);
113  void disconnect(const shared_ptr<XListener> &);
114 
115  //! Requests a talk to connected listeners.
116  //! If a listener is not mainthread model, the listener will be called later.
117  //! \param arg passing argument to all listeners
118  //! If listener avoids duplication, lock won't be passed to listener.
119  struct Message;
120  template <typename...ArgRefs>
121  shared_ptr<Message> createMessage(int64_t tr_serial, ArgRefs&&... arg) const;
122  template <typename...ArgRefs>
123  void talk(const SS &shot, ArgRefs&&...args) const {
124  Message m(m_listeners, std::forward<ArgRefs>(args)...);
125  m.talk(shot);
126  }
127 
128  bool empty() const noexcept {return !m_listeners;}
129 private:
130  using Event_ = Event<SS, Args...>;
132  typedef std::vector<weak_ptr<Listener_> > ListenerList;
134  shared_ptr<ListenerList> m_listeners;
135 
136  void connect(const shared_ptr<Listener_> &);
137 
138  struct EventWrapper : public XTransaction_ {
139  EventWrapper(const shared_ptr<Listener_> &l) noexcept :
140  XTransaction_(), listener(l) {}
141  virtual ~EventWrapper() = default;
142  const shared_ptr<Listener_> listener;
143  virtual bool talkBuffered() = 0;
144  };
146  EventWrapperAllowDup(const shared_ptr<Listener_> &l, const Event_ &e) noexcept :
147  EventWrapper(l), event(e) {}
148  Event_ event;
149  virtual bool talkBuffered() override {
150  ( *this->listener)(std::move(event));
151  return false;
152  }
153  };
155  EventWrapperAvoidDup(const shared_ptr<Listener_> &l) : EventWrapper(l) {}
156  virtual bool talkBuffered() override {
157  bool skip = false;
158  if(this->listener->delay_ms()) {
159  long elapsed_ms = XTime::now().diff_msec(this->registered_time);
160  skip = ((long)this->listener->delay_ms() > elapsed_ms);
161  }
162  if( !skip) {
164  e.swap(this->listener->event);
165  assert(e.get());
166  ( *this->listener)( std::move(*e));
167  }
168  return skip;
169  }
170  };
171 public:
172  struct Message : public Message_<SS> {
173  template <class...ArgRefs>
174  Message(const shared_ptr<ListenerList> &l, ArgRefs&&...as) noexcept :
175  Message_<SS>(), listeners(l), args(std::forward<Args>(as)...) {}
176  shared_ptr<ListenerList> listeners;
177  std::tuple<Args...> args;
178  shared_ptr<UnmarkedListenerList> listeners_unmarked;
179  virtual void talk(const SS &shot) override;
180  virtual int unmark(const shared_ptr<XListener> &x) override {
181  if( !listeners)
182  return 0;
183  int canceled = 0;
184  for(auto &&y: *listeners) {
185  if(auto listener = y.lock()) {
186  if(listener == x) {
187  if( !listeners_unmarked)
188  listeners_unmarked.reset(new UnmarkedListenerList);
189  listeners_unmarked->push_back(x);
190  ++canceled;
191  }
192  }
193  }
194  return canceled;
195  }
196  };
197 };
198 
199 template <class SS, typename...Args>
200 class TalkerSingleton : public Talker<SS, Args...> {
201 public:
202  TalkerSingleton() : Talker<SS, Args...>(), m_transaction_serial(0) {}
203  TalkerSingleton(const TalkerSingleton &x) : Talker<SS, Args...>(x), m_transaction_serial(0) {}
204  template <typename...ArgRefs>
205  shared_ptr<typename TalkerSingleton::Message> createMessage(int64_t tr_serial, ArgRefs&&...args) const {
206  if(m_transaction_serial == tr_serial) {
207  if(auto m = m_marked.lock()) {
208  m->args = std::make_tuple(std::forward<ArgRefs>(args)...);
209  return nullptr;
210  }
211  }
212  auto m = Talker<SS, Args...>::createMessage(tr_serial, std::forward<ArgRefs>(args)...);
213  m_transaction_serial = tr_serial;
214  m_marked = m;
215  return m;
216  }
217 private:
218  mutable weak_ptr<typename Talker<SS, Args...>::Message> m_marked;
219  mutable int64_t m_transaction_serial;
220 };
221 
222 template <class SS, typename...Args>
223 template <typename...ArgRefs>
224 shared_ptr<typename Talker<SS, Args...>::Message> Talker<SS, Args...>::createMessage(int64_t, ArgRefs&&...args) const {
225  if( !m_listeners)
226  return nullptr;
227  return std::make_shared<Message>(m_listeners, std::forward<ArgRefs>(args)...);
228 }
229 
230 template <class SS, typename...Args>
231 template <class R, class T, typename...ArgRefs>
232 shared_ptr<XListener>
233 Talker<SS, Args...>::connect(R &obj, void(T::*func)(ArgRefs...), int flags) {
234  shared_ptr<Listener_> listener =
235  std::make_shared<ListenerRef<Talker<SS, Args...>::Event_, T, decltype(func)>>(
236  static_cast<T&>(obj), func, (XListener::FLAGS)flags);
237  connect(listener);
238  return listener;
239 }
240 
241 template <class SS, typename...Args>
242 template <class R, class T, typename...ArgRefs>
243 shared_ptr<XListener>
244 Talker<SS, Args...>::connectWeakly(const shared_ptr<R> &obj,
245  void(T::*func)(ArgRefs...), int flags) {
246  shared_ptr<Listener_> listener =
247  std::make_shared<ListenerWeak<Talker<SS, Args...>::Event_, T, decltype(func)>>(
248  static_pointer_cast<T>(obj), func, (XListener::FLAGS)flags);
249  connect(listener);
250  return listener;
251 }
252 template <class SS, typename...Args>
253 void
254 Talker<SS, Args...>::connect(const shared_ptr<XListener> &lx) {
255  auto listener = dynamic_pointer_cast<Listener_>(lx);
256  connect(listener);
257 }
258 template <class SS, typename...Args>
259 void
260 Talker<SS, Args...>::connect(const shared_ptr<Listener_> &lx) {
261  auto new_list = m_listeners ? std::make_shared<ListenerList>( *m_listeners) : std::make_shared<ListenerList>();
262  // clean-up dead listeners.
263  for(auto it = new_list->begin(); it != new_list->end();) {
264  if( !it->lock())
265  it = new_list->erase(it);
266  else
267  ++it;
268  }
269  new_list->push_back(lx);
270  new_list->shrink_to_fit();
271  m_listeners = new_list;
272 }
273 template <class SS, typename...Args>
274 void
275 Talker<SS, Args...>::disconnect(const shared_ptr<XListener> &lx) {
276  auto new_list = m_listeners ? std::make_shared<ListenerList>( *m_listeners) : std::make_shared<ListenerList>();
277  for(auto it = new_list->begin(); it != new_list->end();) {
278  if(auto listener = it->lock()) {
279  // clean dead listeners and matching one.
280  if( !listener || (lx == listener)) {
281  it = new_list->erase(it);
282  continue;
283  }
284  }
285  ++it;
286  }
287  if(new_list->empty())
288  new_list.reset();
289  else
290  new_list->shrink_to_fit();
291  m_listeners = new_list;
292 }
293 
294 template <class SS, typename...Args>
295 void
296 Talker<SS, Args...>::Message::talk(const SS &shot) {
297  if( !listeners) return;
298  Event_ event(std::tuple_cat(std::tie(shot), std::move(args)));
299  //Writing deferred events to event pool.
300  for(auto &&x: *listeners) {
301  if(auto listener = x.lock()) {
302  if(listeners_unmarked &&
303  (std::find(listeners_unmarked->begin(), listeners_unmarked->end(), listener) != listeners_unmarked->end()))
304  continue;
305  if(listener->flags() & XListener::FLAG_MAIN_THREAD_CALL) {
306  if(listener->flags() & XListener::FLAG_AVOID_DUP) {
307  atomic_unique_ptr<Event_> newevent(new Event_(event) );
308  newevent.swap(listener->event);
309  if( !newevent.get())
310  registerTransactionList(new EventWrapperAvoidDup(listener));
311  }
312  else {
313  if(isMainThread()) {
314  try {
315  ( *listener)(event);
316  }
317  catch (XKameError &e) {
318  e.print();
319  }
320  }
321  else {
322  registerTransactionList(new EventWrapperAllowDup(listener, event));
323  }
324  }
325  }
326  }
327  }
328  //Immediate events.
329  for(auto &&x: *listeners) {
330  if(auto listener = x.lock()) {
331  if(listeners_unmarked &&
332  (std::find(listeners_unmarked->begin(), listeners_unmarked->end(), listener) != listeners_unmarked->end()))
333  continue;
334  if( !(listener->flags() & XListener::FLAG_MAIN_THREAD_CALL)) {
335  try {
336  ( *listener)(event);
337  }
338  catch (XKameError &e) {
339  e.print();
340  }
341  }
342  }
343  }
344 }
345 
346 } //namespace Transactional
347 
348 #endif /*TRANSACTION_SIGNAL_H*/

Generated for KAME4 by  doxygen 1.8.3