transaction.h
1 /***************************************************************************
2  Copyright (C) 2002-2015 Kentaro Kitagawa
3  kitagawa@phys.s.u-tokyo.ac.jp
4 
5  This program is free software; you can redistribute it and/or
6  modify it under the terms of the GNU Library General Public
7  License as published by the Free Software Foundation; either
8  version 2 of the License, or (at your option) any later version.
9 
10  You should have received a copy of the GNU Library General
11  Public License and a list of authors along with this program;
12  see the files COPYING and AUTHORS.
13  ***************************************************************************/
14 #ifndef TRANSACTION_H
15 #define TRANSACTION_H
16 
17 #include "support.h"
18 #include "threadlocal.h"
19 #include "atomic_smart_ptr.h"
20 #include <vector>
21 #include "atomic.h"
22 #include "allocator.h"
23 #include "xtime.h"
24 
25 namespace Transactional {
26 
27 //! \page stmintro Brief introduction of software transactional memory using the class Node
28 //! Tree-structure objects, consisting of Node and derived classes, work as
29 //! software transactional memory (STM) by accessing through Transaction or Snapshot class.\n
30 //! STM is a recent trend in a many-processor era for realizing scalable concurrent computing.
31 //! As opposed to pessimistic mutual exclusions (mutex, semaphore, spin lock, and so on),
32 //! STM takes an optimistic approach for writing,
33 //! and each thread does not wait for other threads. Instead, commitment of transactional writing
34 //! could sometimes fail and the operation will be restarted. The benefits of this optimistic approach are
35 //! scalability, composability, and freeness of deadlocks.\n
36 //! The class Transaction supports transactional accesses of data,
37 //! which were implemented as Node::Payload or T::Payload in derived classes T, and also handles
38 //! multiple insertion (hard-link)/removal (unlink) of objects to the tree.
39 //! Transaction / Snapshot for subtree can be operated at any node at any time by lock-free means.
40 //! Of course, transactions for separate subtrees do not disturb each other.
41 //! Since this STM is implemented based on the object model (i.e. not of address/log-based model),
42 //! accesses can be performed without huge additional costs.
43 //! Snapshot always holds consistency of the contents of Node::Payload including those for the subnodes,
44 //! and can be taken typically in O(1) time.
45 //! During a transaction, unavoidable cost is to copy-on-write Payload of the nodes referenced by
46 //! Transaction::operator[].\n
47 //! The smart pointer atomic_shared_ptr, which adds a support for lock-free atomic update on shared_ptr,
48 //! is a key material in this STM to realize snapshot reading and commitment of a transaction.\n
49 //! \n
50 //! Example 1 for snapshot reading: reading two variables in a snapshot.\n
51 //! \code { Snapshot<NodeA> shot1(node1);
52 //! double x = shot1[node1].m_x;
53 //! double y = shot1[node1].m_y;
54 //! }
55 //! \endcode\n
56 //! Example 2 for simple transactional writing: adding one atomically\n
57 //! \code node1.iterate_commit([](Transaction<NodeA> &tr1(node1)) {
58 //! tr1[node1].m_x = tr1[node1].m_x + 1;
59 //! });
60 //! \endcode\n
61 //! Example 3 for adding a child node.\n
62 //! \code node1.iterate_commit_if([](Transaction<NodeA> &tr1(node1)) {
63 //! return tr1.insert(node2));
64 //! });
65 //! \endcode \n
66 //! More real examples are shown in the test codes: transaction_test.cpp,
67 //! transaction_dynamic_node_test.cpp, transaction_negotiation_test.cpp.\n
68 //! \sa Node, Snapshot, Transaction.
69 //! \sa atomic_shared_ptr.
70 
71 template <class XN>
72 class Snapshot;
73 template <class XN>
75 
76 //! \brief This is a base class of nodes which carries data sets for itself (Payload) and for subnodes.\n
77 //! See \ref stmintro for basic ideas of this STM and code examples.
78 //!
79 //! \tparam XN a class type used in the smart pointers of NodeList. \a XN must be a derived class of Node<XN> itself.
80 //! \sa Snapshot, Transaction.
81 //! \sa XNode.
82 template <class XN>
83 class DECLSPEC_KAME Node {
84 public:
85  template <class T, typename... Args>
86  static T *create(Args&&... args);
87 
88  virtual ~Node();
89 
90  using NodeNotFoundError = std::domain_error;
91 
92  //! Adds a hard link to \a var.
93  //! The subnode \a var will be storaged in the list of shared_ptr<XN>, NodeList.
94  //! \param[in] online_after_insertion If true, \a var can be accessed through \a tr (not appropriate for a shared object).
95  //! \return True if succeeded.
96  //! \sa release(), releaseAll(), swap().
97  bool insert(Transaction<XN> &tr, const shared_ptr<XN> &var, bool online_after_insertion = false);
98  //! Adds a hard link to \a var.
99  //! The subnode \a var will be storaged in the list of shared_ptr<XN>, NodeList.
100  //! \sa release(), releaseAll(), swap().
101  void insert(const shared_ptr<XN> &var);
102  //! Removes a hard link to \a var from the list, NodeList.
103  //! \return True if succeeded.
104  //! \sa insert(), releaseAll(), swap().
105  bool release(Transaction<XN> &tr, const shared_ptr<XN> &var);
106  //! Removes a hard link to \a var from the list, NodeList.
107  //! \sa insert(), releaseAll(), swap().
108  void release(const shared_ptr<XN> &var);
109  //! Removes all links to the subnodes.
110  //! \sa insert(), release(), swap().
111  void releaseAll();
112  //! Swaps orders in the subnode list.
113  //! \return True if succeeded.
114  //! \sa insert(), release(), releaseAll().
115  bool swap(Transaction<XN> &tr, const shared_ptr<XN> &x, const shared_ptr<XN> &y);
116  //! Swaps orders in the subnode list.
117  //! \sa insert(), release(), releaseAll().
118  void swap(const shared_ptr<XN> &x, const shared_ptr<XN> &y);
119 
120  //! Finds the parent node in \a shot.
121  XN *upperNode(Snapshot<XN> &shot);
122 
123  //! Iterates a transaction covering the node and children.
124  //! \param Closure Typical: [=](Transaction<Node1> &tr){ somecode...}
125  template <typename Closure>
126  inline Snapshot<XN> iterate_commit(Closure);
127  //! Iterates a transaction covering the node and children. Skips the iteration when the closure returns false.
128  //! \param Closure Typical: [=](Transaction<Node1> &tr){ somecode...; return ret; }
129  template <typename Closure>
130  inline Snapshot<XN> iterate_commit_if(Closure);
131  //! Iterates a transaction covering the node and children, as long as the closure returns true.
132  //! \param Closure Typical: [=](Transaction<Node1> &tr){ somecode...; return ret; }
133  template <typename Closure>
134  inline void iterate_commit_while(Closure);
135 
136  //! Data holder and accessor for the node.
137  //! Derive Node<XN>::Payload as (\a subclass)::Payload.
138  //! The instances have to be capable of copy-construction and be safe to be shared reading.
139  struct Payload : public atomic_countable {
140  Payload() noexcept : m_serial(-1), m_tr(nullptr) {}
141  virtual ~Payload() = default;
142 
143  //! Points to the corresponding node.
144  XN &node() noexcept {return *m_node;}
145  //! Points to the corresponding node.
146  const XN &node() const noexcept {return *m_node;}
147  int64_t serial() const noexcept {return this->m_serial;}
148  Transaction<XN> &tr() noexcept { return *this->m_tr;}
149 
150  virtual void catchEvent(const shared_ptr<XN>&, int) {}
151  virtual void releaseEvent(const shared_ptr<XN>&, int) {}
152  virtual void moveEvent(unsigned int /*src_idx*/, unsigned int /*dst_idx*/) {}
153  virtual void listChangeEvent() {}
154 
155  private:
156  friend class Node;
157  friend class Transaction<XN>;
158  using rettype_clone = local_shared_ptr<Payload>;
159  virtual rettype_clone clone(Transaction<XN> &tr, int64_t serial) = 0;
160 
161  XN *m_node;
162  //! Serial number of the transaction.
163  int64_t m_serial;
164  Transaction<XN> *m_tr;
165  };
166 
167  void print_() const;
168 
170  using iterator = typename NodeList::iterator;
171  using const_iterator = typename NodeList::const_iterator;
172 
173  Node(const Node &) = delete; //non-copyable.
174  Node &operator=(const Node &) = delete; //non-copyable.
175 private:
176  struct Packet;
177 
178  struct PacketList;
179  struct PacketList : public fast_vector<local_shared_ptr<Packet>, 2> {
180  shared_ptr<NodeList> m_subnodes;
181  PacketList() : fast_vector<local_shared_ptr<Packet>, 2>(), m_serial(SerialGenerator::gen()) {}
182  ~PacketList() {this->clear();} //destroys payloads prior to nodes.
183  //! Serial number of the transaction.
184  int64_t m_serial;
185  };
186 
187  template <class P>
188  struct PayloadWrapper : public P::Payload {
189  virtual typename PayloadWrapper::rettype_clone clone(Transaction<XN> &tr, int64_t serial) {
190 // auto p = allocate_local_shared<PayloadWrapper>(this->m_node->m_allocatorPayload, *this);
191  auto p = make_local_shared<PayloadWrapper>( *this);
192  p->m_tr = &tr;
193  p->m_serial = serial;
194  return p;
195  }
196  PayloadWrapper() = delete;
197  PayloadWrapper& operator=(const PayloadWrapper &x) = delete;
198  PayloadWrapper(XN &node) noexcept : P::Payload(){ this->m_node = &node;}
199  PayloadWrapper(const PayloadWrapper &x) : P::Payload(x) {}
200  private:
201  };
202 
203  struct PacketWrapper;
204  struct Linkage;
205  //! A package containing \a Payload, sub-Packets, and a list of subnodes.\n
206  //! Not-"missing" packet is always up-to-date and can be a snapshot of the subtree,
207  //! and packets possessed by the sub-instances may be out-of-date.\n
208  //! "missing" indicates that the package lacks any Packet for subnodes, or
209  //! any content may be out-of-date.\n
210  struct Packet : public atomic_countable {
211  Packet() noexcept : m_missing(false) {}
212  int size() const noexcept {return subpackets() ? subpackets()->size() : 0;}
213  local_shared_ptr<Payload> &payload() noexcept { return m_payload;}
214  const local_shared_ptr<Payload> &payload() const noexcept { return m_payload;}
215  shared_ptr<NodeList> &subnodes() noexcept { return subpackets()->m_subnodes;}
216  shared_ptr<PacketList> &subpackets() noexcept { return m_subpackets;}
217  const shared_ptr<NodeList> &subnodes() const noexcept { return subpackets()->m_subnodes;}
218  const shared_ptr<PacketList> &subpackets() const noexcept { return m_subpackets;}
219 
220  //! Points to the corresponding node.
221  Node &node() noexcept {return payload()->node();}
222  //! Points to the corresponding node.
223  const Node &node() const noexcept {return payload()->node();}
224 
225  //! \return false if the packet contains the up-to-date subpackets for all the subnodes.
226  bool missing() const noexcept { return m_missing;}
227 
228  //! For debugging.
229  void print_() const;
230  //! For debugging.
231  bool checkConsistensy(const local_shared_ptr<Packet> &rootpacket) const;
232 
233  local_shared_ptr<Payload> m_payload;
234  shared_ptr<PacketList> m_subpackets;
235  bool m_missing;
236  };
237 
239  using cnt_t = int64_t;
240  static cnt_t now() noexcept {
241  auto tm = XTime::now();
242  return (cnt_t)tm.sec() * 1000000uL + tm.usec();
243  }
244  private:
245  };
246  struct ProcessCounter {
247  using cnt_t = uint16_t;
248  ProcessCounter();
249  operator cnt_t() const noexcept {return m_var;}
250  private:
251  cnt_t m_var;
252  static atomic<cnt_t> s_count;
253  };
254  //! Holds the current porcess(thread) internal ID. Taking non-zero value.
256  //! Generates a serial number assigned to bundling and transaction.\n
257  //! During a transaction, a serial is used for determining whether Payload or PacketList should be cloned.\n
258  //! During bundle(), it is used to prevent infinite loops due to unbundle()-ing itself.
260  enum : int64_t {SERIAL_NULL = 0};
261  struct cnt_t {
262  cnt_t() {
263  m_var = (int64_t)*stl_processID * 1000000000000uLL;
264  }
265  operator int64_t() const noexcept {return m_var;}
266  //16bit process ID + 48bit counter.
267  cnt_t &operator++(int) noexcept {
268  m_var = ((m_var + 1) & 0xffffffffffffuLL)
269  | ((m_var) & 0xffff000000000000uLL);
270  return *this;
271  }
272  int64_t m_var;
273  };
274  static int64_t current() noexcept {
275  return *stl_serial;
276  }
277  static int64_t gen() noexcept {
278  auto &v = *stl_serial;
279  v++;
280  return v;
281  }
282  static XThreadLocal<cnt_t> stl_serial;
283  };
284  //! A class wrapping Packet and providing indice and links for lookup.\n
285  //! If packet() is absent, a super node should have the up-to-date Packet.\n
286  //! If hasPriority() is not set, Packet in a super node may be latest.
287  struct PacketWrapper : public atomic_countable {
288  PacketWrapper(const local_shared_ptr<Packet> &x, int64_t bundle_serial) noexcept;
289  //! creates a wrapper not containing a packet but pointing to the upper node.
290  //! \param[in] bp \a m_link of the upper node.
291  //! \param[in] reverse_index The index for this node in the list of the upper node.
292  PacketWrapper(const shared_ptr<Linkage> &bp, int reverse_index, int64_t bundle_serial) noexcept;
293  PacketWrapper(const PacketWrapper &x, int64_t bundle_serial) noexcept;
294  bool hasPriority() const noexcept { return m_ridx == (int)PACKET_STATE::PACKET_HAS_PRIORITY; }
295  const local_shared_ptr<Packet> &packet() const noexcept {return m_packet;}
296  local_shared_ptr<Packet> &packet() noexcept {return m_packet;}
297 
298  //! Points to the upper node that should have the up-to-date Packet when this lacks priority.
299  shared_ptr<Linkage> bundledBy() const noexcept {return m_bundledBy.lock();}
300  //! The index for this node in the list of the upper node.
301  int reverseIndex() const noexcept {return m_ridx;}
302  void setReverseIndex(int i) noexcept {m_ridx = i;}
303 
304  void print_() const;
305  weak_ptr<Linkage> const m_bundledBy;
306  local_shared_ptr<Packet> m_packet;
307  int m_ridx;
308  int64_t m_bundle_serial;
309  enum class PACKET_STATE : int { PACKET_HAS_PRIORITY = -1};
310 
311  PacketWrapper(const PacketWrapper &) = delete;
312  };
313  struct Linkage : public atomic_shared_ptr<PacketWrapper> {
314  Linkage() noexcept : atomic_shared_ptr<PacketWrapper>(), m_transaction_started_time(0) {}
315  ~Linkage() {this->reset(); } //Packet should be freed before memory pools.
316  atomic<typename NegotiationCounter::cnt_t> m_transaction_started_time;
317  //! Puts a wait so that the slowest thread gains a chance to finish its transaction, if needed.
318  void negotiate(typename NegotiationCounter::cnt_t &started_time, float mult_wait = 6.0f) noexcept {
319  auto transaction_started_time = m_transaction_started_time;
320  if( !transaction_started_time)
321  return; //collision has not been detected.
322  negotiate_internal(started_time, mult_wait);
323  }
324  void negotiate_internal(typename NegotiationCounter::cnt_t &started_time, float mult_wait) noexcept;
325  MemoryPool m_mempoolPayload;
326  MemoryPool m_mempoolPacket;
327  MemoryPool m_mempoolPacketList;
328  MemoryPool m_mempoolPacketWrapper;
329  NegotiationCounter m_negotiationCounter;
330  };
331 
332  friend class Snapshot<XN>;
333  friend class Transaction<XN>;
334 
335  void snapshot(Snapshot<XN> &target, bool multi_nodal, typename NegotiationCounter::cnt_t started_time) const;
336  void snapshot(Transaction<XN> &target, bool multi_nodal) const {
337  m_link->negotiate(target.m_started_time, 4.0f);
338  snapshot(static_cast<Snapshot<XN> &>(target), multi_nodal, target.m_started_time);
339  target.m_oldpacket = target.m_packet;
340  }
341  enum class SnapshotStatus {SNAPSHOT_SUCCESS = 0, SNAPSHOT_DISTURBED = 1,
342  SNAPSHOT_VOID_PACKET = 2, SNAPSHOT_NODE_MISSING = 4,
343  SNAPSHOT_COLLIDED = 8, SNAPSHOT_NODE_MISSING_AND_COLLIDED = 12};
344  struct CASInfo {
345  CASInfo() = default;
346  CASInfo(const shared_ptr<Linkage> &b, const local_shared_ptr<PacketWrapper> &o,
347  const local_shared_ptr<PacketWrapper> &n) : linkage(b), old_wrapper(o), new_wrapper(n) {}
348  shared_ptr<Linkage> linkage;
349  local_shared_ptr<PacketWrapper> old_wrapper, new_wrapper;
350  };
352  enum class SnapshotMode {SNAPSHOT_FOR_UNBUNDLE, SNAPSHOT_FOR_BUNDLE};
353  static inline SnapshotStatus snapshotSupernode(const shared_ptr<Linkage> &linkage, shared_ptr<Linkage> &linkage_super,
355  SnapshotMode mode,
356  int64_t serial = SerialGenerator::SERIAL_NULL, CASInfoList *cas_infos = nullptr);
357 
358  //! Updates a packet to \a tr.m_packet if the current packet is unchanged (== \a tr.m_oldpacket).
359  //! If this node has been bundled at the super node, unbundle() will be called.
360  //! \sa Transaction<XN>::commit().
361  bool commit(Transaction<XN> &tr);
362 // bool commit_at_super(Transaction<XN> &tr);
363 
364  enum class BundledStatus {BUNDLE_SUCCESS, BUNDLE_DISTURBED};
365  //! Bundles all the subpackets so that the whole packet can be treated atomically.
366  //! Namely this function takes a snapshot.
367  //! All the subpackets held by \a m_link at the subnodes will be
368  //! cleared and each PacketWrapper::bundledBy() will point to its upper node.
369  //! \sa snapshot().
370  BundledStatus bundle(local_shared_ptr<PacketWrapper> &target,
371  typename NegotiationCounter::cnt_t &started_time, int64_t bundle_serial, bool is_bundle_root);
372  BundledStatus bundle_subpacket(local_shared_ptr<PacketWrapper> *superwrapper, const shared_ptr<Node> &subnode,
373  local_shared_ptr<PacketWrapper> &subwrapper, local_shared_ptr<Packet> &subpacket_new,
374  typename NegotiationCounter::cnt_t &started_time, int64_t bundle_serial);
375  enum class UnbundledStatus {UNBUNDLE_W_NEW_SUBVALUE,
376  UNBUNDLE_SUBVALUE_HAS_CHANGED,
377  UNBUNDLE_COLLIDED, UNBUNDLE_DISTURBED};
378  //! Unbundles a subpacket to \a sublinkage from a snapshot.
379  //! it performs unbundling for all the super nodes.
380  //! The super nodes will lose priorities against their lower nodes.
381  //! \param[in] bundle_serial If not zero, consistency/collision wil be checked.
382  //! \param[in] null_linkage The current value of \a sublinkage and should not contain \a packet().
383  //! \param[in] oldsubpacket If not zero, the packet will be compared with the packet inside the super packet.
384  //! \param[in,out] newsubwrapper If \a oldsubpacket and \a newsubwrapper are not zero, \a newsubwrapper will be a new value.
385  //! If \a oldsubpacket is zero, unloaded value of \a sublinkage will be substituted to \a newsubwrapper.
386  static UnbundledStatus unbundle(const int64_t *bundle_serial, typename NegotiationCounter::cnt_t &time_started,
387  const shared_ptr<Linkage> &sublinkage, const local_shared_ptr<PacketWrapper> &null_linkage,
388  const local_shared_ptr<Packet> *oldsubpacket = NULL,
389  local_shared_ptr<PacketWrapper> *newsubwrapper = NULL,
390  local_shared_ptr<PacketWrapper> *superwrapper = NULL);
391  //! The point where the packet is held.
392  shared_ptr<Linkage> m_link;
393  //! Allocators for memory pools in the Linkage.
395  allocator<Packet> m_allocatorPacket;
396  allocator<PacketList> m_allocatorPacketList;
397  allocator<PacketWrapper> m_allocatorPacketWrapper;
398 
399  //! finds the packet for this node in the (un)bundled \a packet.
400  //! \param[in,out] superpacket The bundled packet.
401  //! \param[in] copy_branch If ture, new packets and packet lists will be copy-created for writing.
402  //! \param[in] tr_serial The serial number associated with the transaction.
403  inline local_shared_ptr<Packet> *reverseLookup(local_shared_ptr<Packet> &superpacket,
404  bool copy_branch, int64_t tr_serial, bool set_missing, XN** uppernode);
405  inline local_shared_ptr<Packet> &reverseLookup(local_shared_ptr<Packet> &superpacket,
406  bool copy_branch, int64_t tr_serial = 0, bool set_missing = false);
407  const local_shared_ptr<Packet> &reverseLookup(const local_shared_ptr<Packet> &superpacket) const;
408  inline static local_shared_ptr<Packet> *reverseLookupWithHint(shared_ptr<Linkage > &linkage,
409  local_shared_ptr<Packet> &superpacket, bool copy_branch, int64_t tr_serial, bool set_missing,
410  local_shared_ptr<Packet> *upperpacket, int *index);
411  //! finds this node and a corresponding packet in the (un)bundled \a packet.
412  inline local_shared_ptr<Packet> *forwardLookup(local_shared_ptr<Packet> &superpacket,
413  bool copy_branch, int64_t tr_serial, bool set_missing,
414  local_shared_ptr<Packet> *upperpacket, int *index) const;
415 // static void fetchSubpackets(std::deque<local_shared_ptr<PacketWrapper> > &subwrappers,
416 // const local_shared_ptr<Packet> &packet);
417  static void eraseSerials(local_shared_ptr<Packet> &packet, int64_t serial);
418 protected:
419  //! Use \a create().
420  Node();
421 private:
422  using FuncPayloadCreator = Payload *(*)(XN &);
423  static XThreadLocal<FuncPayloadCreator> stl_funcPayloadCreator;
424  void lookupFailure() const;
426  bool copy_branch, int64_t tr_serial, bool set_missing, XN **uppernode);
427 };
428 
429 template <class XN>
430 template <class T, typename... Args>
431 T *Node<XN>::create(Args&&... args) {
432  *T::stl_funcPayloadCreator = [](XN &node)->Payload *{ return new PayloadWrapper<T>(node);};
433  return new T(std::forward<Args>(args)...);
434 }
435 
436 //! \brief This class takes a snapshot for a subtree.\n
437 //! See \ref stmintro for basic ideas of this STM and code examples.
438 //! \sa Node, Transaction, SingleSnapshot, SingleTransaction.
439 template <class XN>
440 class Snapshot {
441 public:
442  Snapshot(const Snapshot&x) noexcept = default;
443  Snapshot(Snapshot&&x) noexcept = default;
444  Snapshot(Node<XN> &node, const Snapshot &x) noexcept : m_packet(node.reverseLookup(x.m_packet)), m_serial(x.m_serial) {}
445  explicit Snapshot(const Transaction<XN>&x) noexcept : m_packet(x.m_packet), m_serial(x.m_serial) {}
446  explicit Snapshot(Transaction<XN>&&x) noexcept : m_packet(std::move(x.m_packet)), m_serial(x.m_serial) {}
447  Snapshot& operator=(const Snapshot&x) noexcept = default;
448  explicit Snapshot(const Node<XN>&node, bool multi_nodal = true) {
449  node.snapshot( *this, multi_nodal, Node<XN>::NegotiationCounter::now());
450  }
451 
452  //! \return Payload instance for \a node, which should be included in the snapshot.
453  template <class T>
454  const typename T::Payload &operator[](const shared_ptr<T> &node) const noexcept {
455  return operator[](const_cast<const T&>( *node));
456  }
457  //! \return Payload instance for \a node, which should be included in the snapshot.
458  template <class T>
459  const typename T::Payload &operator[](const T &node) const noexcept {
461  const local_shared_ptr<typename Node<XN>::Payload> &payload(packet->payload());
462  return *static_cast<const typename T::Payload*>(payload.get());
463  }
464  //! # of child nodes.
465  int size() const noexcept {return m_packet->size();}
466  //! The list of child nodes.
467  const shared_ptr<const typename Node<XN>::NodeList> list() const noexcept {
468  if( !size())
469  return shared_ptr<typename Node<XN>::NodeList>();
470  return m_packet->subnodes();
471  }
472  //! # of child nodes owned by \a node.
473  int size(const shared_ptr<Node<XN> > &node) const noexcept {
474  return node->reverseLookup(m_packet)->size();
475  }
476  //! The list of child nodes owned by \a node.
477  shared_ptr<const typename Node<XN>::NodeList> list(const shared_ptr<Node<XN> > &node) const noexcept {
478  auto const &packet(node->reverseLookup(m_packet));
479  if( !packet->size() )
480  return shared_ptr<typename Node<XN>::NodeList>();
481  return packet->subnodes();
482  }
483  //! Whether \a lower is a child of this or not.
484  bool isUpperOf(const XN &lower) const noexcept {
485  const shared_ptr<const typename Node<XN>::NodeList> lx(list());
486  if( !lx)
487  return false;
488  for(auto &&x: *lx) {
489  if(x.get() == &lower)
490  return true;
491  }
492  return false;
493  }
494 
495  void print() {
496  m_packet->print_();
497  }
498 
499  //! Stores an event immediately from \a talker with \a arg.
500  template <typename T, typename tArgRef>
501  void talk(T &talker, tArgRef arg) const { talker.talk( *this, std::forward<tArgRef>(arg)); }
502 protected:
503  friend class Node<XN>;
504  //! The snapshot.
506  int64_t m_serial;
507  Snapshot() = default;
508 };
509 //! \brief Snapshot class which does not care of contents (Payload) for subnodes.\n
510 //! See \ref stmintro for basic ideas of this STM and code examples.
511 //! \sa Node, Snapshot, Transaction, SingleTransaction.
512 template <class XN, typename T>
513 class SingleSnapshot : protected Snapshot<XN> {
514 public:
515  explicit SingleSnapshot(const T &node) : Snapshot<XN>(node, false) {}
516  SingleSnapshot(SingleSnapshot&&x) noexcept = default;
517 
518  //! \return a pointer to Payload for \a node.
519  const typename T::Payload *operator->() const {
520  return static_cast<const typename T::Payload *>(this->m_packet->payload().get());
521  }
522  //! \return reference to Payload for \a node.
523  const typename T::Payload &operator*() const {
524  return *operator->();
525  }
526 protected:
527 };
528 
529 }
530 #include "transaction_signal.h"
531 namespace Transactional {
532 
533 //! \brief A class supporting transactional writing for a subtree.\n
534 //! See \ref stmintro for basic ideas of this STM and code examples.\n
535 //!
536 //! Transaction inherits features of Snapshot.
537 //! Do something like the following to avoid a copy-on-write due to Transaction::operator[]():
538 //! \code
539 //! const Snapshot<NodeA> &shot(transaction_A);
540 //! double x = shot[chidnode].m_x; //reading a value of m_x stored in transaction_A.
541 //! \endcode
542 //! \sa Node, Snapshot, SingleSnapshot, SingleTransaction.
543 template <class XN>
544 class Transaction : public Snapshot<XN> {
545 public:
546  //! Be sure for the persistence of the \a node.
547  //! \param[in] multi_nodal If false, the snapshot and following commitment are not aware of the contents of the child nodes.
548  explicit Transaction(Node<XN>&node, bool multi_nodal = true) :
549  Snapshot<XN>(), m_oldpacket(), m_multi_nodal(multi_nodal) {
550  m_started_time = Node<XN>::NegotiationCounter::now();
551  node.snapshot( *this, multi_nodal);
552  assert( &this->m_packet->node() == &node);
553  assert( &this->m_oldpacket->node() == &node);
554  }
555  //! \param[in] x The snapshot containing the old value of \a node.
556  //! \param[in] multi_nodal If false, the snapshot and following commitment are not aware of the contents of the child nodes.
557  explicit Transaction(const Snapshot<XN> &x, bool multi_nodal = true) noexcept : Snapshot<XN>(x),
558  m_oldpacket(this->m_packet), m_multi_nodal(multi_nodal) {
559  m_started_time = Node<XN>::NegotiationCounter::now();
560  }
561  ~Transaction() {
562  //Do not leave the time stamp.
563  if(m_started_time && this->m_packet) { //not moved
564  Node<XN> &node(this->m_packet->node());
565  if(node.m_link->m_transaction_started_time == m_started_time) {
566  node.m_link->m_transaction_started_time = 0;
567  }
568  }
569  }
570  Transaction(Transaction&&x) noexcept = default;
571 
572  //! \return Copy-constructed Payload instance for \a node, which will be included in the commitment.
573  template <class T>
574  typename T::Payload &operator[](const shared_ptr<T> &node) {
575  return operator[]( *node);
576  }
577  //! \return Copy-constructed Payload instance for \a node, which will be included in the commitment.
578  template <class T>
579  typename T::Payload &operator[](T &node) {
580  assert(isMultiNodal() || ( &node == &this->m_packet->node()));
581  auto &payload(
582  node.reverseLookup(this->m_packet, true, this->m_serial)->payload());
583  if(payload->m_serial != this->m_serial) {
584  payload = payload->clone( *this, this->m_serial);
585  auto &p( *static_cast<typename T::Payload *>(payload.get()));
586  return p;
587  }
588  auto &p( *static_cast<typename T::Payload *>(payload.get()));
589  return p;
590  }
591  bool isMultiNodal() const noexcept {return m_multi_nodal;}
592 
593  //! Reserves an event, to be emitted from \a talker with \a arg after the transaction is committed.
594  template <typename T, typename ...Args>
595  void mark(T &talker, Args&&...args) {
596  if(auto m = talker.createMessage(this->m_serial, std::forward<Args>(args)...)) {
597  if( !m_messages)
598  m_messages.reset(new MessageList);
599  m_messages->push_back(m);
600  }
601  }
602  //! Cancels reserved events made toward \a x.
603  //! \return # of unmarked events.
604  int unmark(const shared_ptr<XListener> &x) {
605  int canceled = 0;
606  if(m_messages)
607  for(auto &&msg: *m_messages)
608  canceled += msg->unmark(x);
609  return canceled;
610  }
611 
612  bool isModified() const noexcept {
613  return (this->m_packet != this->m_oldpacket);
614  }
615  //! \return true if succeeded.
616  bool commit() {
617  Node<XN> &node(this->m_packet->node());
618  if( !isModified() || node.commit( *this)) {
619  finalizeCommitment(node);
620  return true;
621  }
622  return false;
623  }
624  //! Combination of commit() and operator++().
625  bool commitOrNext() {
626  if(commit())
627  return true;
628  ++( *this);
629  return false;
630  }
631  //! Prepares for a next transaction after taking a snapshot for \a supernode.
632  //! \return a snapshot for \a supernode.
634  Snapshot<XN> shot( *this); //for node persistence.
635  Node<XN> &node(this->m_packet->node());
636  this->operator++();
637  supernode.snapshot( *this, true);
638  Snapshot<XN> shot_super( *this);
639  Snapshot<XN> shot_this(node, shot_super);
640  this->Snapshot<XN>::operator=(shot_this);
641  this->m_oldpacket = this->m_packet;
642  return shot_super;
643  }
644  Transaction(const Transaction &tr) = delete; //non-copyable.
645  Transaction& operator=(const Transaction &tr) = delete; //non-copyable.
646 private:
647  friend class Node<XN>;
648 // bool commitAt(Node<XN> &supernode) {
649 // if(supernode.commit_at_super( *this)) {
650 // finalizeCommitment(this->m_packet->node());
651 // return true;
652 // }
653 // return false;
654 // }
655  //! Takes another snapshot and prepares for a next transaction.
657  Node<XN> &node(this->m_packet->node());
658  if(isMultiNodal()) {
659  auto time(node.m_link->m_transaction_started_time);
660  if( !time || (time > m_started_time))
661  node.m_link->m_transaction_started_time = m_started_time;
662  }
663  m_messages.reset();
664  this->m_packet->node().snapshot( *this, m_multi_nodal);
665  return *this;
666  }
667 
668  void finalizeCommitment(Node<XN> &node);
669 
671  const bool m_multi_nodal;
672  typename Node<XN>::NegotiationCounter::cnt_t m_started_time;
673  using MessageList = fast_vector<shared_ptr<Message_<Snapshot<XN>> >, 3>;
674  unique_ptr<MessageList> m_messages;
675 };
676 
677 //! \brief Transaction which does not care of contents (Payload) of subnodes.\n
678 //! See \ref stmintro for basic ideas of this STM and code examples.
679 //! \sa Node, Transaction, Snapshot, SingleSnapshot.
680 template <class XN, typename T>
681 class SingleTransaction : public Transaction<XN> {
682 public:
683  explicit SingleTransaction(T &node) : Transaction<XN>(node, false) {}
684 
685  //! \return Copy-constructed Payload instance for \a node, which will be included in the commitment.
686  typename T::Payload &operator*() noexcept {
687  return ( *this)[static_cast<T &>(this->m_packet->node())];
688  }
689  //! \return Copy-constructed Payload instance for \a node, which will be included in the commitment.
690  typename T::Payload *operator->() noexcept {
691  return &( **this);
692  }
693 protected:
694 };
695 
696 template <class XN>
697 void Transaction<XN>::finalizeCommitment(Node<XN> &node) {
698  //Clears the time stamp linked to this object.
699  if(node.m_link->m_transaction_started_time == m_started_time) {
700  node.m_link->m_transaction_started_time = 0;
701  }
702  m_started_time = 0;
703 
704  m_oldpacket.reset();
705  //Messaging.
706  if(m_messages) {
707  for(auto &&msg: *m_messages)
708  msg->talk( *this);
709  m_messages.reset();
710  }
711 }
712 
713 template <class XN>
714 template <typename Closure>
715 inline Snapshot<XN> Node<XN>::iterate_commit(Closure closure) {
716  for(Transaction<XN> tr( *this);;++tr) {
717  closure(tr);
718  if(tr.commit())
719  return std::move(tr);
720  }
721 }
722 template <class XN>
723 template <typename Closure>
724 inline Snapshot<XN> Node<XN>::iterate_commit_if(Closure closure) {
725  //std::is_integral<std::result_of<Closure>>::type
726  for(Transaction<XN> tr( *this);;++tr) {
727  if( !closure(tr))
728  continue; //skipping.
729  if(tr.commit())
730  return std::move(tr);
731  }
732 }
733 template <class XN>
734 template <typename Closure>
735 inline void Node<XN>::iterate_commit_while(Closure closure) {
736  for(Transaction<XN> tr( *this);;++tr) {
737  if( !closure(tr))
738  return;
739  if(tr.commit())
740  return;
741  }
742 }
743 
744 template <class XN>
747  bool copy_branch, int64_t tr_serial, bool set_missing, XN **uppernode) {
748  local_shared_ptr<Packet> *foundpacket;
749  if( &superpacket->node() == this)
750  foundpacket = &superpacket;
751  else
752  foundpacket = lookupFromChild(superpacket,
753  copy_branch, tr_serial, set_missing, uppernode);
754 
755  if(copy_branch && (( *foundpacket)->payload()->m_serial != tr_serial)) {
756  foundpacket->reset(new Packet( **foundpacket));
757 // *foundpacket = allocate_local_shared<Packet>(
758 // (*foundpacket)->node().m_allocatorPacket, **foundpacket);
759  }
760  return foundpacket;
761 }
762 
763 template <class XN>
766  bool copy_branch, int64_t tr_serial, bool set_missing) {
767  local_shared_ptr<Packet> *foundpacket = reverseLookup(superpacket, copy_branch, tr_serial, set_missing, 0);
768  if( !foundpacket)
769  lookupFailure();
770  return *foundpacket;
771 }
772 
773 template <class XN>
775 Node<XN>::reverseLookup(const local_shared_ptr<Packet> &superpacket) const {
776  local_shared_ptr<Packet> *foundpacket = const_cast<Node*>(this)->reverseLookup(
777  const_cast<local_shared_ptr<Packet> &>(superpacket), false, 0, false, 0);
778  if( !foundpacket)
779  lookupFailure();
780  return *foundpacket;
781 }
782 
783 
784 } //namespace Transactional
785 
786 #endif /*TRANSACTION_H*/

Generated for KAME4 by  doxygen 1.8.3