transaction_impl.h
1 /***************************************************************************
2  Copyright (C) 2002-2015 Kentaro Kitagawa
3  kitagawa@phys.s.u-tokyo.ac.jp
4 
5  This program is free software; you can redistribute it and/or
6  modify it under the terms of the GNU Library General Public
7  License as published by the Free Software Foundation; either
8  version 2 of the License, or (at your option) any later version.
9 
10  You should have received a copy of the GNU Library General
11  Public License and a list of authors along with this program;
12  see the files COPYING and AUTHORS.
13 ***************************************************************************/
14 #include "transaction.h"
15 #include <vector>
16 
17 #ifdef TRANSACTIONAL_STRICT_assert
18  #undef STRICT_assert
19  #define STRICT_assert(expr) assert(expr)
20  #define STRICT_TEST(expr) expr
21 #else
22  #define STRICT_assert(expr)
23  #define STRICT_TEST(expr)
24 #endif
25 
26 namespace Transactional {
27 
28 STRICT_TEST(static atomic<int64_t> s_serial_abandoned = -2);
29 
30 template <class XN>
31 XThreadLocal<typename Node<XN>::FuncPayloadCreator> Node<XN>::stl_funcPayloadCreator;
32 
33 template <class XN>
34 XThreadLocal<typename Node<XN>::SerialGenerator::cnt_t> Node<XN>::SerialGenerator::stl_serial;
35 template <class XN>
36 atomic<typename Node<XN>::ProcessCounter::cnt_t> Node<XN>::ProcessCounter::s_count = 0;
37 template <class XN>
38 XThreadLocal<typename Node<XN>::ProcessCounter> Node<XN>::stl_processID;
39 template <class XN>
40 Node<XN>::ProcessCounter::ProcessCounter() {
41  for(;;) {
42  cnt_t oldv = s_count;
43  cnt_t newv = oldv + (cnt_t)1u;
44  if( !newv) ++newv;
45  if(s_count.compare_set_strong(oldv, newv)) {
46  //avoids zero.
47  m_var = newv;
48  break;
49  }
50  }
51 }
52 template <class XN>
53 void
55  printf("Packet: ");
56  printf("%s@%p, ", typeid(*this).name(), &node());
57  printf("BP@%p, ", node().m_link.get());
58  if(missing())
59  printf("missing, ");
60  if(size()) {
61  printf("%d subnodes : [ \n", (int)size());
62  for(int i = 0; i < size(); i++) {
63  if(subpackets()->at(i)) {
64  subpackets()->at(i)->print_();
65  printf("; ");
66  }
67  else {
68  printf("%s@%p, w/o packet, ", typeid(*this).name(), subnodes()->at(i).get());
69  }
70  }
71  printf("]\n");
72  }
73  printf(";");
74 }
75 
76 template <class XN>
77 bool
79  try {
80  if(size()) {
81  if( !(payload()->m_serial - subpackets()->m_serial < 0x7fffffffffffffffLL))
82  throw __LINE__;
83  }
84  for(int i = 0; i < size(); i++) {
85  if( !subpackets()->at(i)) {
86  if( !rootpacket->missing()) {
87  if( !subnodes()->at(i)->reverseLookup(
88  const_cast<local_shared_ptr<Packet>&>(rootpacket), false, 0, false, 0))
89  throw __LINE__;
90  }
91  }
92  else {
93  if(subpackets()->at(i)->size())
94  if( !(subpackets()->m_serial - subpackets()->at(i)->subpackets()->m_serial < 0x7fffffffffffffffLL))
95  throw __LINE__;
96  if(subpackets()->at(i)->missing() && (rootpacket.get() != this)) {
97  if( !missing())
98  throw __LINE__;
99  }
100  if( !subpackets()->at(i)->checkConsistensy(
101  subpackets()->at(i)->missing() ? rootpacket : subpackets()->at(i)))
102  return false;
103  }
104  }
105  }
106  catch (int &line) {
107  fprintf(stderr, "Line %d, losing consistensy on node %p:\n", line, &node());
108  rootpacket->print_();
109  throw *this;
110  }
111  return true;
112 }
113 
114 template <class XN>
115 Node<XN>::PacketWrapper::PacketWrapper(const local_shared_ptr<Packet> &x, int64_t bundle_serial) noexcept :
116  m_bundledBy(), m_packet(x), m_ridx((int)PACKET_STATE::PACKET_HAS_PRIORITY), m_bundle_serial(bundle_serial) {
117 }
118 template <class XN>
119 Node<XN>::PacketWrapper::PacketWrapper(const shared_ptr<Linkage > &bp, int reverse_index,
120  int64_t bundle_serial) noexcept :
121  m_bundledBy(bp), m_packet(), m_ridx(), m_bundle_serial(bundle_serial) {
122  setReverseIndex(reverse_index);
123 }
124 template <class XN>
125 Node<XN>::PacketWrapper::PacketWrapper(const PacketWrapper &x, int64_t bundle_serial) noexcept :
126  m_bundledBy(x.m_bundledBy), m_packet(x.m_packet),
127  m_ridx(x.m_ridx), m_bundle_serial(bundle_serial) {}
128 
129 template <class XN>
130 void
132  printf("PacketWrapper: ");
133  if( !hasPriority()) {
134  printf("referred to BP@%p, ", bundledBy().get());
135  }
136  printf("serial:%lld, ", (long long)m_bundle_serial);
137  if(packet()) {
138  packet()->print_();
139  }
140  else {
141  printf("absent, ");
142  }
143  printf("\n");
144 }
145 
146 template <class XN>
147 void
148 Node<XN>::Linkage::negotiate_internal(typename NegotiationCounter::cnt_t &started_time, float mult_wait) noexcept {
149  for(int ms = 0;;) {
150  auto transaction_started_time = m_transaction_started_time;
151  if( !transaction_started_time)
152  break; //collision has not been detected.
153  auto dt = started_time - transaction_started_time;
154  if(dt <= 0)
155  break; //This thread is the oldest.
156  auto dt2 = Node<XN>::NegotiationCounter::now() - transaction_started_time;
157 
158  if(mult_wait * dt < dt2)
159  break;
160 
161 // static XThreadLocal<unsigned int> stl_seed;
162 // if((double)rand_r( &*stl_seed) / RAND_MAX > 20 * dt / dt2) {
163 // break; //performs anyway.
164 // }
165  ms = std::max((int)(dt2 / 10000), ms + 1);
166  if(ms > 200) {
167  fprintf(stderr, "Nested transaction?, ");
168  fprintf(stderr, "Negotiating, %f sec. requested, limited to 200ms.", ms*1e-3);
169  fprintf(stderr, "for BP@%p\n", this);
170  ms = 200;
171  }
172  msecsleep(ms);
173  }
174 }
175 
176 template <class XN>
178  m_link(std::make_shared<Linkage>()),
179  m_allocatorPayload(&m_link->m_mempoolPayload),
180  m_allocatorPacket(&m_link->m_mempoolPacket),
181  m_allocatorPacketList(&m_link->m_mempoolPacketList),
182  m_allocatorPacketWrapper(&m_link->m_mempoolPacketWrapper) {
183  local_shared_ptr<Packet> packet(new Packet());
184  m_link->reset(new PacketWrapper(packet, SerialGenerator::gen()));
185  //Use create() for this hack.
186  packet->m_payload.reset(( *stl_funcPayloadCreator)(static_cast<XN&>( *this)));
187  *stl_funcPayloadCreator = nullptr;
188 }
189 template <class XN>
190 Node<XN>::~Node() {
191  releaseAll();
192 }
193 template <class XN>
194 void
195 Node<XN>::print_() const {
196  local_shared_ptr<PacketWrapper> packet( *m_link);
197 // printf("Node:%p, ", this);
198 // printf("BP:%p, ", m_link.get());
199 // printf(" packet: ");
200  packet->print_();
201 }
202 
203 template <class XN>
204 void
205 Node<XN>::insert(const shared_ptr<XN> &var) {
206  iterate_commit_if([this, var](Transaction<XN> &tr)->bool {
207  return insert(tr, var);
208  });
209 }
210 template <class XN>
211 bool
212 Node<XN>::insert(Transaction<XN> &tr, const shared_ptr<XN> &var, bool online_after_insertion) {
213  local_shared_ptr<Packet> packet = reverseLookup(tr.m_packet, true, tr.m_serial, true);
214  packet->subpackets() = packet->size() ? std::make_shared<PacketList>( *packet->subpackets()) : std::make_shared<PacketList>();
215  packet->subpackets()->m_serial = tr.m_serial;
216  packet->m_missing = true;
217  packet->subnodes() = packet->size() ? std::make_shared<NodeList>( *packet->subnodes()) : std::make_shared<NodeList>();
218 // if( !packet->subpackets()->size()) {
219 // packet->subpackets()->reserve(4);
220 // packet->subnodes()->reserve(4);
221 // }
222  packet->subpackets()->resize(packet->size() + 1);
223  assert(std::find(packet->subnodes()->begin(), packet->subnodes()->end(), var) == packet->subnodes()->end());
224  packet->subnodes()->resize(packet->subpackets()->size());
225  packet->subnodes()->back() = var;
226 
227  if(online_after_insertion) {
228  bool has_failed = false;
229  //Tags serial.
230  local_shared_ptr<Packet> newpacket(tr.m_packet);
231  tr.m_packet.reset(new Packet( *tr.m_oldpacket));
232  if( !tr.m_packet->node().commit(tr)) {
233  printf("*\n");
234  has_failed = true;
235  }
236  tr.m_oldpacket = tr.m_packet;
237  tr.m_packet = newpacket;
238  for(;;) {
239  local_shared_ptr<Packet> subpacket_new;
241  subwrapper = *var->m_link;
242  BundledStatus status = bundle_subpacket(0, var, subwrapper, subpacket_new,
243  tr.m_started_time, tr.m_serial);
244  if(status != BundledStatus::BUNDLE_SUCCESS) {
245  continue;
246  }
247  if( !subpacket_new)
248  //Inserted twice inside the package.
249  break;
250 
251  //Marks for writing at subnode.
252  tr.m_packet.reset(new Packet( *tr.m_oldpacket));
253  if( !tr.m_packet->node().commit(tr)) {
254  printf("&\n");
255  has_failed = true;
256  }
257  tr.m_oldpacket = tr.m_packet;
258  tr.m_packet = newpacket;
259 
261  new PacketWrapper(m_link, packet->size() - 1, tr.m_serial));
262  newwrapper->packet() = subpacket_new;
263  packet->subpackets()->back() = subpacket_new;
264  if(has_failed)
265  return false;
266  if( !var->m_link->compareAndSet(subwrapper, newwrapper)) {
267  tr.m_oldpacket.reset(new Packet( *tr.m_oldpacket)); //Following commitment should fail.
268  return false;
269  }
270  break;
271  }
272  }
273  tr[ *this].catchEvent(var, packet->size() - 1);
274  tr[ *this].listChangeEvent();
275  STRICT_assert(tr.m_packet->checkConsistensy(tr.m_packet));
276  return true;
277 // printf("i");
278 }
279 template <class XN>
280 void
281 Node<XN>::release(const shared_ptr<XN> &var) {
282  iterate_commit_if([this, var](Transaction<XN> &tr)->bool {
283  return release(tr, var);
284  });
285 }
286 
287 template <class XN>
288 void
289 Node<XN>::eraseSerials(local_shared_ptr<Packet> &packet, int64_t serial) {
290  if(packet->size() && packet->subpackets()->m_serial == serial)
291  packet->subpackets()->m_serial = SerialGenerator::SERIAL_NULL;
292  if(packet->payload()->m_serial == serial)
293  packet->payload()->m_serial = SerialGenerator::SERIAL_NULL;
294 
295  for(;;) {
296  local_shared_ptr<PacketWrapper> wrapper( *packet->node().m_link);
297  if(wrapper->m_bundle_serial != serial)
298  break;
299  local_shared_ptr<PacketWrapper> newwrapper(new PacketWrapper( *wrapper, SerialGenerator::SERIAL_NULL));
300  if(packet->node().m_link->compareAndSet(wrapper, newwrapper))
301  break;
302  }
303  for(int i = 0; i < packet->size(); ++i) {
304  local_shared_ptr<Packet> &subpacket(( *packet->subpackets())[i]);
305  if(subpacket)
306  eraseSerials(subpacket, serial);
307  }
308 }
309 
310 template <class XN>
311 void
312 Node<XN>::lookupFailure() const {
313  fprintf(stderr, "Node not found during a lookup.\n");
314  throw NodeNotFoundError("Lookup failure.");
315 }
316 
317 template <class XN>
318 bool
319 Node<XN>::release(Transaction<XN> &tr, const shared_ptr<XN> &var) {
320  local_shared_ptr<Packet> packet = reverseLookup(tr.m_packet, true, tr.m_serial, true);
321  assert(packet->size());
322  packet->subpackets().reset(new PacketList( *packet->subpackets()));
323  packet->subpackets()->m_serial = tr.m_serial;
324  packet->subnodes().reset(new NodeList( *packet->subnodes()));
325  unsigned int idx = 0;
326  int old_idx = -1;
327  local_shared_ptr<PacketWrapper> nullsubwrapper, newsubwrapper;
328  auto nit = packet->subnodes()->begin();
329  for(auto pit = packet->subpackets()->begin(); pit != packet->subpackets()->end();) {
330  assert(nit != packet->subnodes()->end());
331  if(nit->get() == &*var) {
332  if( *pit) {
333  nullsubwrapper = *var->m_link;
334  if(nullsubwrapper->hasPriority()) {
335  if(nullsubwrapper->packet() != *pit) {
336  tr.m_oldpacket.reset(new Packet( *tr.m_oldpacket)); //Following commitment should fail.
337  return false;
338  }
339  }
340  else {
341  shared_ptr<Linkage> bp(nullsubwrapper->bundledBy());
342  if((bp && (bp != m_link)) ||
343  ( !bp && (nullsubwrapper->packet() != *pit))) {
344  tr.m_oldpacket.reset(new Packet( *tr.m_oldpacket)); //Following commitment should fail.
345  return false;
346  }
347  }
348  newsubwrapper.reset(new PacketWrapper(m_link, idx, SerialGenerator::SERIAL_NULL));
349  newsubwrapper->packet() = *pit;
350  }
351  pit = packet->subpackets()->erase(pit);
352  nit = packet->subnodes()->erase(nit);
353  old_idx = idx;
354  }
355  else {
356  ++nit;
357  ++pit;
358  ++idx;
359  }
360  }
361  if(old_idx < 0)
362  lookupFailure();
363 
364  if( !packet->subpackets()->size()) {
365  packet->subpackets().reset();
366  packet->m_missing = false;
367  }
368  else {
369 // if(packet->subpackets()->capacity() - packet->subpackets()->size() > 8) {
370 // packet->subpackets()->shrink_to_fit();
371 // packet->subnodes()->shrink_to_fit();
372 // }
373  }
374  if(tr.m_packet->size()) {
375  tr.m_packet->m_missing = true;
376  }
377 
378  tr[ *this].releaseEvent(var, old_idx);
379  tr[ *this].listChangeEvent();
380 
381  if( !newsubwrapper) {
382  //Packet of the released node is held by the other point inside the tr.m_packet.
383  return true;
384  }
385 
386  eraseSerials(packet, tr.m_serial);
387 
388  local_shared_ptr<Packet> newpacket(tr.m_packet);
389  tr.m_packet = tr.m_oldpacket;
390  tr.m_packet.reset(new Packet( *tr.m_packet));
391  if( !tr.m_packet->node().commit(tr)) {
392  tr.m_oldpacket.reset(new Packet( *tr.m_oldpacket)); //Following commitment should fail.
393  tr.m_packet = newpacket;
394  return false;
395  }
396  tr.m_oldpacket = tr.m_packet;
397  tr.m_packet = newpacket;
398 
399  //Unload the packet of the released node.
400  if( !var->m_link->compareAndSet(nullsubwrapper, newsubwrapper)) {
401  tr.m_oldpacket.reset(new Packet( *tr.m_oldpacket)); //Following commitment should fail.
402  return false;
403  }
404 // printf("r");
405  STRICT_assert(tr.m_packet->checkConsistensy(tr.m_packet));
406  return true;
407 }
408 template <class XN>
409 void
411  iterate_commit_if([this](Transaction<XN> &tr)->bool {
412  while(tr.size()) {
413  if( !release(tr, tr.list()->back())) {
414  return false;
415  }
416  }
417  return true;
418  });
419 }
420 template <class XN>
421 void
422 Node<XN>::swap(const shared_ptr<XN> &x, const shared_ptr<XN> &y) {
423  iterate_commit_if([this, x, y](Transaction<XN> &tr)->bool {
424  return swap(tr, x, y);
425  });
426 }
427 template <class XN>
428 bool
429 Node<XN>::swap(Transaction<XN> &tr, const shared_ptr<XN> &x, const shared_ptr<XN> &y) {
430  local_shared_ptr<Packet> packet = reverseLookup(tr.m_packet, true, tr.m_serial, true);
431  packet->subpackets().reset(packet->size() ? (new PacketList( *packet->subpackets())) : (new PacketList));
432  packet->subpackets()->m_serial = tr.m_serial;
433  packet->m_missing = true;
434  packet->subnodes().reset(packet->size() ? (new NodeList( *packet->subnodes())) : (new NodeList));
435  unsigned int idx = 0;
436  int x_idx = -1, y_idx = -1;
437  for(auto nit = packet->subnodes()->begin(); nit != packet->subnodes()->end(); ++nit) {
438  if( *nit == x)
439  x_idx = idx;
440  if( *nit == y)
441  y_idx = idx;
442  ++idx;
443  }
444  if((x_idx < 0) || (y_idx < 0))
445  lookupFailure();
446  local_shared_ptr<Packet> px = packet->subpackets()->at(x_idx);
447  local_shared_ptr<Packet> py = packet->subpackets()->at(y_idx);
448  packet->subpackets()->at(x_idx) = py;
449  packet->subpackets()->at(y_idx) = px;
450  packet->subnodes()->at(x_idx) = y;
451  packet->subnodes()->at(y_idx) = x;
452  tr[ *this].moveEvent(x_idx, y_idx);
453  tr[ *this].listChangeEvent();
454  STRICT_assert(tr.m_packet->checkConsistensy(tr.m_packet));
455  return true;
456 }
457 
458 template <class XN>
460 Node<XN>::reverseLookupWithHint(shared_ptr<Linkage> &linkage,
461  local_shared_ptr<Packet> &superpacket, bool copy_branch, int64_t tr_serial, bool set_missing,
462  local_shared_ptr<Packet> *upperpacket, int *index) {
463  if( !superpacket->size())
464  return nullptr;
465  local_shared_ptr<PacketWrapper> wrapper( *linkage);
466  if(wrapper->hasPriority())
467  return nullptr;
468  shared_ptr<Linkage> linkage_upper(wrapper->bundledBy());
469  if( !linkage_upper)
470  return nullptr;
471  local_shared_ptr<Packet> *foundpacket;
472  if(linkage_upper == superpacket->node().m_link)
473  foundpacket = &superpacket;
474  else {
475  foundpacket = reverseLookupWithHint(linkage_upper,
476  superpacket, copy_branch, tr_serial, set_missing, nullptr, nullptr);
477  if( !foundpacket)
478  return nullptr;
479  }
480  int ridx = wrapper->reverseIndex();
481  if( !( *foundpacket)->size() || (ridx >= ( *foundpacket)->size()))
482  return nullptr;
483  if(copy_branch) {
484  if(( *foundpacket)->subpackets()->m_serial != tr_serial) {
485  *foundpacket = allocate_local_shared<Packet>(( *foundpacket)->node().m_allocatorPacket, **foundpacket);
486 // foundpacket->reset(new Packet( **foundpacket));
487  ( *foundpacket)->subpackets() = std::allocate_shared<PacketList>(
488  ( *foundpacket)->node().m_allocatorPacketList, *( *foundpacket)->subpackets());
489 // ( *foundpacket)->subpackets().reset(new PacketList( *( *foundpacket)->subpackets()));
490  ( *foundpacket)->m_missing = ( *foundpacket)->m_missing || set_missing;
491  ( *foundpacket)->subpackets()->m_serial = tr_serial;
492  }
493  }
494  local_shared_ptr<Packet> &p(( *foundpacket)->subpackets()->at(ridx));
495  if( !p || (p->node().m_link != linkage)) {
496  return nullptr;
497  }
498  if(upperpacket) {
499  *upperpacket = *foundpacket;
500  *index = ridx;
501  }
502  return &p;
503 }
504 
505 template <class XN>
508  bool copy_branch, int64_t tr_serial, bool set_missing,
509  local_shared_ptr<Packet> *upperpacket, int *index) const {
510  assert(superpacket);
511  if( !superpacket->subpackets())
512  return nullptr;
513  if(copy_branch) {
514  if(superpacket->subpackets()->m_serial != tr_serial) {
515  superpacket = allocate_local_shared<Packet>(superpacket->node().m_allocatorPacket, *superpacket);
516 // superpacket.reset(new Packet( *superpacket));
517  superpacket->subpackets()= std::allocate_shared<PacketList>(
518  superpacket->node().m_allocatorPacketList, *superpacket->subpackets());
519 // superpacket->subpackets().reset(new PacketList( *superpacket->subpackets()));
520  superpacket->subpackets()->m_serial = tr_serial;
521  superpacket->m_missing = superpacket->m_missing || set_missing;
522  }
523  }
524  for(unsigned int i = 0; i < superpacket->subnodes()->size(); i++) {
525  if(( *superpacket->subnodes())[i].get() == this) {
526  local_shared_ptr<Packet> &subpacket(( *superpacket->subpackets())[i]);
527  if(subpacket) {
528  *upperpacket = superpacket;
529  *index = i;
530  return &subpacket;
531  }
532  }
533  }
534  for(unsigned int i = 0; i < superpacket->subnodes()->size(); i++) {
535  local_shared_ptr<Packet> &subpacket(( *superpacket->subpackets())[i]);
536  if(subpacket) {
538  forwardLookup(subpacket, copy_branch, tr_serial, set_missing, upperpacket, index)) {
539  return p;
540  }
541  }
542  }
543  return nullptr;
544 }
545 
546 template <class XN>
547 XN *
549  XN *uppernode = 0;
550  reverseLookup(shot.m_packet, false, 0, false, &uppernode);
551  return uppernode;
552 }
553 
554 template <class XN>
557  bool copy_branch, int64_t tr_serial, bool set_missing, XN **uppernode) {
558  local_shared_ptr<Packet> *foundpacket;
559  local_shared_ptr<Packet> upperpacket;
560  int index;
561  foundpacket = reverseLookupWithHint(m_link, superpacket,
562  copy_branch, tr_serial, set_missing, &upperpacket, &index);
563  if(foundpacket) {
564 // printf("$");
565  }
566  else {
567 // printf("!");
568  foundpacket = forwardLookup(superpacket, copy_branch, tr_serial, set_missing,
569  &upperpacket, &index);
570  if( !foundpacket)
571  return 0;
572  }
573  if(uppernode)
574  *uppernode = static_cast<XN*>(&upperpacket->node());
575  assert( &( *foundpacket)->node() == this);
576 
577  return foundpacket;
578 }
579 
580 template <class XN>
581 inline typename Node<XN>::SnapshotStatus
582 Node<XN>::snapshotSupernode(const shared_ptr<Linkage > &linkage, shared_ptr<Linkage> &linkage_super,
584  SnapshotMode mode, int64_t serial, CASInfoList *cas_infos) {
585  local_shared_ptr<PacketWrapper> oldwrapper(shot);
586  assert( !shot->hasPriority());
587  shared_ptr<Linkage > linkage_upper(shot->bundledBy());
588  linkage_super = linkage_upper;
589  if( !linkage_upper) {
590  if( *linkage == oldwrapper)
591  //Supernode has been destroyed.
592  return SnapshotStatus::SNAPSHOT_NODE_MISSING;
593  return SnapshotStatus::SNAPSHOT_DISTURBED;
594  }
595  int reverse_index = shot->reverseIndex();
596 
597  shot = *linkage_upper;
598  local_shared_ptr<PacketWrapper> shot_upper(shot);
599  SnapshotStatus status = SnapshotStatus::SNAPSHOT_NODE_MISSING;
600  local_shared_ptr<Packet> *upperpacket;
601  if( !shot_upper->hasPriority()) {
602  status = snapshotSupernode(linkage_upper, linkage_super, shot, &upperpacket,
603  mode, serial, cas_infos);
604  }
605  switch(status) {
606  case SnapshotStatus::SNAPSHOT_DISTURBED:
607  default:
608  return status;
609  case SnapshotStatus::SNAPSHOT_VOID_PACKET:
610  case SnapshotStatus::SNAPSHOT_NODE_MISSING:
611  shot = shot_upper;
612  upperpacket = &shot->packet();
613  status = SnapshotStatus::SNAPSHOT_SUCCESS;
614  break;
615  case SnapshotStatus::SNAPSHOT_NODE_MISSING_AND_COLLIDED:
616  shot = shot_upper;
617  upperpacket = &shot->packet();
618  status = SnapshotStatus::SNAPSHOT_COLLIDED;
619  break;
620  case SnapshotStatus::SNAPSHOT_COLLIDED:
621  case SnapshotStatus::SNAPSHOT_SUCCESS:
622  break;
623  }
624  //Checking if it is up-to-date.
625  if( *linkage != oldwrapper)
626  return SnapshotStatus::SNAPSHOT_DISTURBED;
627 
628  assert( *upperpacket);
629  int size = ( *upperpacket)->size();
630  int i = reverse_index;
631  for(int cnt = 0;; ++cnt) {
632  if(cnt >= size) {
633  if(status == SnapshotStatus::SNAPSHOT_COLLIDED)
634  return SnapshotStatus::SNAPSHOT_NODE_MISSING;
635  status = SnapshotStatus::SNAPSHOT_NODE_MISSING;
636  break;
637  }
638  if(i >= size)
639  i = 0;
640  if(( *( *upperpacket)->subnodes())[i]->m_link == linkage) {
641  //Requested node is found.
642  *subpacket = &( *( *upperpacket)->subpackets())[i];
643  reverse_index = i;
644  if( !**subpacket) {
645  if(mode == SnapshotMode::SNAPSHOT_FOR_UNBUNDLE) {
646  cas_infos->clear();
647  }
648 // printf("V\n");
649  assert(( *upperpacket)->missing());
650  return SnapshotStatus::SNAPSHOT_VOID_PACKET;
651  }
652  break;
653  }
654  //The index might be modified by swap().
655  ++i;
656  }
657 
658  assert( !shot_upper->packet() || (shot_upper->packet()->node().m_link == linkage_upper));
659  assert(( *upperpacket)->node().m_link == linkage_upper);
660  if(mode == SnapshotMode::SNAPSHOT_FOR_UNBUNDLE) {
661  if(status == SnapshotStatus::SNAPSHOT_COLLIDED) {
662  return SnapshotStatus::SNAPSHOT_COLLIDED;
663  }
664  if((serial != SerialGenerator::SERIAL_NULL) && (shot_upper->m_bundle_serial == serial)) {
665  //The node has been already bundled in the same snapshot.
666  if(status == SnapshotStatus::SNAPSHOT_NODE_MISSING)
667  return SnapshotStatus::SNAPSHOT_NODE_MISSING;
668  return SnapshotStatus::SNAPSHOT_COLLIDED;
669  }
670  local_shared_ptr<Packet> *p(upperpacket);
672  if(shot == shot_upper) {
673  newwrapper = allocate_local_shared<PacketWrapper>
674  (shot_upper->packet()->node().m_allocatorPacketWrapper,
675  *shot_upper, shot_upper->m_bundle_serial);
676 // newwrapper.reset(
677 // new PacketWrapper( *shot_upper, shot_upper->m_bundle_serial));
678  }
679  else {
680  assert(cas_infos->size());
681 // if(shot->packet()->missing()) {
682  newwrapper = allocate_local_shared<PacketWrapper>
683  ( (*p)->node().m_allocatorPacketWrapper,
684  *p, shot->m_bundle_serial);
685 // newwrapper.reset(
686 // new PacketWrapper( *p, shot->m_bundle_serial));
687 // }
688  }
689  if(newwrapper) {
690  cas_infos->emplace_back(linkage_upper, shot_upper, newwrapper);
691  p = &newwrapper->packet();
692  }
693  if(size) {
694  *p = allocate_local_shared<Packet>(
695  (*p)->node().m_allocatorPacket, **p);
696 // p->reset(new Packet( **p));
697 // ( *p)->subpackets().reset(new PacketList( *( *p)->subpackets()));
698  ( *p)->m_missing = true;
699 // if(status == SNAPSHOT_SUCCESS)
700 // *subpacket = &( *p)->subpackets()->at(reverse_index);
701  }
702  if((status == SnapshotStatus::SNAPSHOT_NODE_MISSING) && (serial != SerialGenerator::SERIAL_NULL) &&
703  (( !oldwrapper->hasPriority()) && (oldwrapper->m_bundle_serial == serial))) {
704  printf("!");
705  return SnapshotStatus::SNAPSHOT_NODE_MISSING_AND_COLLIDED;
706  }
707  }
708  return status;
709 }
710 
711 template <class XN>
712 void
713 Node<XN>::snapshot(Snapshot<XN> &snapshot, bool multi_nodal, typename NegotiationCounter::cnt_t started_time) const {
715  for(;;) {
716  snapshot.m_serial = SerialGenerator::gen();
717  target = *m_link;
718  if(target->hasPriority()) {
719  if( !multi_nodal)
720  break;
721  if( !target->packet()->missing()) {
722  STRICT_assert(target->packet()->checkConsistensy(target->packet()));
723  break;
724  }
725  }
726  else {
727  // Taking a snapshot inside the super packet.
728  shared_ptr<Linkage> linkage_super; //keeps memory pools in the Linkage alive.
729  local_shared_ptr<PacketWrapper> superwrapper(target);
730  local_shared_ptr<Packet> *foundpacket;
731  SnapshotStatus status = snapshotSupernode(m_link, linkage_super,
732  superwrapper, &foundpacket, SnapshotMode::SNAPSHOT_FOR_BUNDLE);
733  switch(status) {
734  case SnapshotStatus::SNAPSHOT_SUCCESS: {
735  if( !( *foundpacket)->missing() || !multi_nodal) {
736  snapshot.m_packet = *foundpacket;
737  STRICT_assert(snapshot.m_packet->checkConsistensy(snapshot.m_packet));
738  return;
739  }
740  // The packet is imperfect, and then re-bundling the subpackets.
741  UnbundledStatus status = unbundle(nullptr, started_time, m_link, target);
742  switch(status) {
743  case UnbundledStatus::UNBUNDLE_W_NEW_SUBVALUE:
744  case UnbundledStatus::UNBUNDLE_COLLIDED:
745  case UnbundledStatus::UNBUNDLE_SUBVALUE_HAS_CHANGED:
746  default:
747  break;
748  }
749  continue;
750  }
751  case SnapshotStatus::SNAPSHOT_DISTURBED:
752  default:
753  continue;
754  case SnapshotStatus::SNAPSHOT_NODE_MISSING:
755  case SnapshotStatus::SNAPSHOT_VOID_PACKET:
756  //The packet has been released.
757  if( !target->packet()->missing() || !multi_nodal) {
758  snapshot.m_packet = target->packet();
759  return;
760  }
761  break;
762  }
763  }
764  BundledStatus status = const_cast<Node *>(this)->bundle(
765  target, started_time, snapshot.m_serial, true);
766  switch (status) {
767  case BundledStatus::BUNDLE_SUCCESS:
768  assert( !target->packet()->missing());
769  STRICT_assert(target->packet()->checkConsistensy(target->packet()));
770  break;
771  default:
772  continue;
773  }
774  }
775  snapshot.m_packet = target->packet();
776 }
777 
778 template <class XN>
779 typename Node<XN>::BundledStatus
780 Node<XN>::bundle_subpacket(local_shared_ptr<PacketWrapper> *superwrapper,
781  const shared_ptr<Node> &subnode,
782  local_shared_ptr<PacketWrapper> &subwrapper, local_shared_ptr<Packet> &subpacket_new,
783  typename NegotiationCounter::cnt_t &started_time, int64_t bundle_serial) {
784 
785  if( !subwrapper->hasPriority()) {
786  shared_ptr<Linkage > linkage(subwrapper->bundledBy());
787  bool need_for_unbundle = false;
788  bool detect_collision = false;
789  if(linkage == m_link) {
790  if(subpacket_new) {
791  if(subpacket_new->missing()) {
792  need_for_unbundle = true;
793  }
794  else
795  return BundledStatus::BUNDLE_SUCCESS;
796  }
797  else {
798  if(subwrapper->packet()) {
799  //Re-inserted.
800 // need_for_unbundle = true;
801  }
802  else
803  return BundledStatus::BUNDLE_DISTURBED;
804  }
805  }
806  else {
807  need_for_unbundle = true;
808  detect_collision = true;
809  }
810  if(need_for_unbundle) {
811  local_shared_ptr<PacketWrapper> subwrapper_new;
812  UnbundledStatus status = unbundle(detect_collision ? &bundle_serial : nullptr, started_time,
813  subnode->m_link, subwrapper, nullptr, &subwrapper_new, superwrapper);
814  switch(status) {
815  case UnbundledStatus::UNBUNDLE_W_NEW_SUBVALUE:
816  subwrapper = subwrapper_new;
817  break;
818  case UnbundledStatus::UNBUNDLE_COLLIDED:
819  //The subpacket has already been included in the snapshot.
820  subpacket_new.reset();
821  return BundledStatus::BUNDLE_SUCCESS;
822  case UnbundledStatus::UNBUNDLE_SUBVALUE_HAS_CHANGED:
823  default:
824  return BundledStatus::BUNDLE_DISTURBED;
825  }
826  }
827  }
828  if(subwrapper->packet()->missing()) {
829  assert(subwrapper->packet()->size());
830  BundledStatus status = subnode->bundle(subwrapper, started_time, bundle_serial, false);
831  switch(status) {
832  case BundledStatus::BUNDLE_SUCCESS:
833  break;
834  case BundledStatus::BUNDLE_DISTURBED:
835  default:
836  return BundledStatus::BUNDLE_DISTURBED;
837  }
838  }
839  subpacket_new = subwrapper->packet();
840  return BundledStatus::BUNDLE_SUCCESS;
841 }
842 
843 template <class XN>
844 typename Node<XN>::BundledStatus
846  typename NegotiationCounter::cnt_t &started_time, int64_t bundle_serial, bool is_bundle_root) {
847 
848  assert(oldsuperwrapper->packet());
849  assert(oldsuperwrapper->packet()->size());
850  assert(oldsuperwrapper->packet()->missing());
851 
852  Node &supernode(oldsuperwrapper->packet()->node());
853 
854  if( !oldsuperwrapper->hasPriority() ||
855  (oldsuperwrapper->m_bundle_serial != bundle_serial)) {
856  //Tags serial.
857 // local_shared_ptr<PacketWrapper> superwrapper(
858 // new PacketWrapper(oldsuperwrapper->packet(), bundle_serial));
859  local_shared_ptr<PacketWrapper> superwrapper =
860  allocate_local_shared<PacketWrapper>(
861  oldsuperwrapper->packet()->node().m_allocatorPacketWrapper,
862  oldsuperwrapper->packet(), bundle_serial);
863  if( !supernode.m_link->compareAndSet(oldsuperwrapper, superwrapper)) {
864  return BundledStatus::BUNDLE_DISTURBED;
865  }
866  oldsuperwrapper = std::move(superwrapper);
867  }
868 
869  fast_vector<local_shared_ptr<PacketWrapper> > subwrappers_org(oldsuperwrapper->packet()->subpackets()->size());
870 
871  for(;;) {
872  local_shared_ptr<PacketWrapper> superwrapper =
873  allocate_local_shared<PacketWrapper>(
874  oldsuperwrapper->packet()->node().m_allocatorPacketWrapper,
875  *oldsuperwrapper, bundle_serial);
876 // local_shared_ptr<PacketWrapper> superwrapper(
877 // new PacketWrapper( *oldsuperwrapper, bundle_serial));
878  local_shared_ptr<Packet> &newpacket(
879  reverseLookup(superwrapper->packet(), true, SerialGenerator::gen()));
880  assert(newpacket->size());
881  assert(newpacket->missing());
882 
883  STRICT_assert(s_serial_abandoned != newpacket->subpackets()->m_serial);
884 
885  //copying all sub-packets from nodes to the new packet.
886  newpacket->subpackets() = std::allocate_shared<PacketList>(
887  newpacket->node().m_allocatorPacketList, *newpacket->subpackets());
888 // newpacket->subpackets().reset(new PacketList( *newpacket->subpackets()));
889  shared_ptr<PacketList> &subpackets(newpacket->subpackets());
890  shared_ptr<NodeList> &subnodes(newpacket->subnodes());
891 
892  bool missing = false;
893  for(unsigned int i = 0; i < subpackets->size(); ++i) {
894  shared_ptr<Node> child(( *subnodes)[i]);
895  local_shared_ptr<Packet> &subpacket_new(( *subpackets)[i]);
896  for(;;) {
898  subwrapper = *child->m_link;
899  if(subwrapper == subwrappers_org[i])
900  break;
901  BundledStatus status = bundle_subpacket( &oldsuperwrapper,
902  child, subwrapper, subpacket_new, started_time, bundle_serial);
903  switch(status) {
904  case BundledStatus::BUNDLE_SUCCESS:
905  break;
906  case BundledStatus::BUNDLE_DISTURBED:
907  default:
908  if(oldsuperwrapper == *supernode.m_link)
909  continue;
910  return status;
911  }
912  subwrappers_org[i] = subwrapper;
913  if(subpacket_new) {
914  if(subpacket_new->missing()) {
915  missing = true;
916  }
917  assert(&subpacket_new->node() == child.get());
918  }
919  else
920  missing = true;
921  break;
922  }
923  }
924  if(is_bundle_root) {
925  assert( &supernode == this);
926  missing = false;
927  }
928  newpacket->m_missing = true;
929 
930  supernode.m_link->negotiate(started_time, 4.0f);
931  //First checkpoint.
932  if( !supernode.m_link->compareAndSet(oldsuperwrapper, superwrapper)) {
933 // superwrapper = *supernode.m_link;
934 // if(superwrapper->m_bundle_serial != bundle_serial)
935  return BundledStatus::BUNDLE_DISTURBED;
936 // oldsuperwrapper = superwrapper;
937 // continue;
938  }
939  oldsuperwrapper = std::move(superwrapper);
940 
941  //clearing all packets on sub-nodes if not modified.
942  bool changed_during_bundling = false;
943  for(unsigned int i = 0; i < subnodes->size(); i++) {
944  shared_ptr<Node> child(( *subnodes)[i]);
945  local_shared_ptr<PacketWrapper> null_linkage;
946  if(( *subpackets)[i])
947  null_linkage = allocate_local_shared<PacketWrapper>(
948  child->m_allocatorPacketWrapper, m_link, i, bundle_serial);
949 // .reset(new PacketWrapper(m_link, i, bundle_serial));
950  else
951  null_linkage = allocate_local_shared<PacketWrapper>(
952  child->m_allocatorPacketWrapper, *subwrappers_org[i], bundle_serial);
953 // .reset(new PacketWrapper( *subwrappers_org[i], bundle_serial));
954 
955  assert( !null_linkage->hasPriority());
956  //Second checkpoint, the written bundle is valid or not.
957  if( !child->m_link->compareAndSet(subwrappers_org[i], null_linkage)) {
958  if(local_shared_ptr<PacketWrapper>( *child->m_link)->m_bundle_serial != bundle_serial)
959  return BundledStatus::BUNDLE_DISTURBED;
960  if(oldsuperwrapper != *supernode.m_link)
961  return BundledStatus::BUNDLE_DISTURBED;
962  changed_during_bundling = true;
963  break;
964  }
965  }
966  if(changed_during_bundling)
967  continue;
968 
969  superwrapper = allocate_local_shared<PacketWrapper>(
970  supernode.m_allocatorPacketWrapper, *oldsuperwrapper, bundle_serial);
971 
972 // superwrapper.reset(new PacketWrapper( *oldsuperwrapper, bundle_serial));
973  if( !missing) {
974  local_shared_ptr<Packet> &newpacket(
975  reverseLookup(superwrapper->packet(), true, SerialGenerator::gen()));
976  newpacket->m_missing = false;
977  STRICT_assert(newpacket->checkConsistensy(newpacket));
978  }
979 
980  if( !supernode.m_link->compareAndSet(oldsuperwrapper, superwrapper))
981  return BundledStatus::BUNDLE_DISTURBED;
982  oldsuperwrapper = std::move(superwrapper);
983 
984  break;
985  }
986  return BundledStatus::BUNDLE_SUCCESS;
987 }
988 
989 //template <class XN>
990 //void
991 //Node<XN>::fetchSubpackets(std::deque<local_shared_ptr<PacketWrapper> > &subwrappers,
992 // const local_shared_ptr<Packet> &packet) {
993 // for(int i = 0; i < packet->size(); ++i) {
994 // const local_shared_ptr<Packet> &subpacket(( *packet->subpackets())[i]);
995 // subwrappers.push_back( *( *packet->subnodes())[i]->m_link);
996 // if(subpacket)
997 // fetchSubpackets(subwrappers, subpacket);
998 // }
999 //}
1000 //template <class XN>
1001 //bool
1002 //Node<XN>::commit_at_super(Transaction<XN> &tr) {
1003 // Node &node(tr.m_packet->node());
1004 // for(Transaction<XN> tr_super( *this);; ++tr_super) {
1005 // local_shared_ptr<Packet> *packet
1006 // = node.reverseLookup(tr_super.m_packet, false, Packet::SERIAL_NULL, false, 0);
1007 // if( !packet)
1008 // return false; //Released.
1009 // if( *packet != tr.m_oldpacket) {
1010 // if( !tr.isMultiNodal() && (( *packet)->payload() == tr.m_oldpacket->payload())) {
1011 // //Single-node mode, the payload in the snapshot is unchanged.
1012 // tr.m_packet->subpackets() = ( *packet)->subpackets();
1013 // tr.m_packet->m_missing = ( *packet)->missing();
1014 // }
1015 // else {
1016 // return false;
1017 // }
1018 // }
1019 // node.reverseLookup(tr_super.m_packet, true, tr_super.m_serial, tr.m_packet->missing())
1020 // = tr.m_packet;
1021 // if(tr_super.commit()) {
1022 // tr.m_packet = tr_super.m_packet;
1023 // return true;
1024 // }
1025 // }
1026 //}
1027 template <class XN>
1028 bool
1030  assert(tr.m_oldpacket != tr.m_packet);
1031  assert(tr.isMultiNodal() || tr.m_packet->subpackets() == tr.m_oldpacket->subpackets());
1032  assert(this == &tr.m_packet->node());
1033 
1034  local_shared_ptr<PacketWrapper> newwrapper = allocate_local_shared<PacketWrapper>(
1035  m_allocatorPacketWrapper, tr.m_packet, tr.m_serial);
1036 // local_shared_ptr<PacketWrapper> newwrapper(new PacketWrapper(tr.m_packet, tr.m_serial));
1037  for(int retry = 0;; ++retry) {
1038  local_shared_ptr<PacketWrapper> wrapper( *m_link);
1039  if(wrapper->hasPriority()) {
1040  //Committing directly to the node.
1041  if(wrapper->packet() != tr.m_oldpacket) {
1042  if( !tr.isMultiNodal() && (wrapper->packet()->payload() == tr.m_oldpacket->payload())) {
1043  //Single-node mode, the payload in the snapshot is unchanged.
1044  tr.m_packet->subpackets() = wrapper->packet()->subpackets();
1045  tr.m_packet->m_missing = wrapper->packet()->missing();
1046  }
1047  else {
1048  STRICT_TEST(s_serial_abandoned = tr.m_serial);
1049 // fprintf(stderr, "F");
1050  return false;
1051  }
1052  }
1053 // STRICT_TEST(std::deque<local_shared_ptr<PacketWrapper> > subwrappers);
1054 // STRICT_TEST(fetchSubpackets(subwrappers, wrapper->packet()));
1055  STRICT_assert(tr.m_packet->checkConsistensy(tr.m_packet));
1056 
1057  m_link->negotiate(tr.m_started_time, 4.0f);
1058  if(m_link->compareAndSet(wrapper, newwrapper)) {
1059 // STRICT_TEST(if(wrapper->isBundled())
1060 // for(typename std::deque<local_shared_ptr<PacketWrapper> >::const_iterator
1061 // it = subwrappers.begin(); it != subwrappers.end(); ++it)
1062 // assert( !( *it)->hasPriority()));
1063  return true;
1064  }
1065  continue;
1066  }
1067 
1068 // if(retry == 0)
1069 // m_link->negotiate(tr.m_started_time, 4.0f);
1070  //Unbundling this node from the super packet.
1071  UnbundledStatus status = unbundle(nullptr, tr.m_started_time, m_link, wrapper,
1072  tr.isMultiNodal() ? &tr.m_oldpacket : nullptr, tr.isMultiNodal() ? &newwrapper : nullptr);
1073  switch(status) {
1074  case UnbundledStatus::UNBUNDLE_W_NEW_SUBVALUE:
1075  if(tr.isMultiNodal())
1076  return true;
1077  continue;
1078  case UnbundledStatus::UNBUNDLE_SUBVALUE_HAS_CHANGED: {
1079  STRICT_TEST(s_serial_abandoned = tr.m_serial);
1080 // fprintf(stderr, "F");
1081  return false;
1082  }
1083  case UnbundledStatus::UNBUNDLE_DISTURBED:
1084  default:
1085  continue;
1086  }
1087  }
1088 }
1089 
1090 template <class XN>
1091 typename Node<XN>::UnbundledStatus
1092 Node<XN>::unbundle(const int64_t *bundle_serial, typename NegotiationCounter::cnt_t &time_started,
1093  const shared_ptr<Linkage> &sublinkage, const local_shared_ptr<PacketWrapper> &null_linkage,
1094  const local_shared_ptr<Packet> *oldsubpacket, local_shared_ptr<PacketWrapper> *newsubwrapper_returned,
1095  local_shared_ptr<PacketWrapper> *oldsuperwrapper) {
1096 
1097  assert( !null_linkage->hasPriority());
1098 
1099 // Taking a snapshot inside the super packet.
1100  shared_ptr<Linkage> linkage_super;
1101  local_shared_ptr<PacketWrapper> superwrapper(null_linkage);
1102  local_shared_ptr<Packet> *newsubpacket;
1103  CASInfoList cas_infos;
1104  SnapshotStatus status = snapshotSupernode(sublinkage, linkage_super, superwrapper, &newsubpacket,
1105  SnapshotMode::SNAPSHOT_FOR_UNBUNDLE,
1106  bundle_serial ? *bundle_serial : SerialGenerator::SERIAL_NULL, &cas_infos);
1107  switch(status) {
1108  case SnapshotStatus::SNAPSHOT_SUCCESS:
1109  break;
1110  case SnapshotStatus::SNAPSHOT_DISTURBED:
1111  return UnbundledStatus::UNBUNDLE_DISTURBED;
1112  case SnapshotStatus::SNAPSHOT_VOID_PACKET:
1113  case SnapshotStatus::SNAPSHOT_NODE_MISSING:
1114  newsubpacket = const_cast<local_shared_ptr<Packet> *>( &null_linkage->packet());
1115  assert(newsubpacket);
1116  break;
1117  case SnapshotStatus::SNAPSHOT_NODE_MISSING_AND_COLLIDED:
1118  newsubpacket = const_cast<local_shared_ptr<Packet> *>( &null_linkage->packet());
1119  assert(newsubpacket);
1120  status = SnapshotStatus::SNAPSHOT_COLLIDED;
1121  break;
1122  case SnapshotStatus::SNAPSHOT_COLLIDED:
1123  break;
1124  }
1125 
1126  if(oldsubpacket && ( *newsubpacket != *oldsubpacket))
1127  return UnbundledStatus::UNBUNDLE_SUBVALUE_HAS_CHANGED;
1128 
1129  for(auto it = cas_infos.begin(); it != cas_infos.end(); ++it) {
1130 // it->linkage->negotiate(time_started, 2.0f);
1131  if( !it->linkage->compareAndSet(it->old_wrapper, it->new_wrapper))
1132  return UnbundledStatus::UNBUNDLE_DISTURBED;
1133  if(oldsuperwrapper) {
1134  if( ( *oldsuperwrapper)->packet()->node().m_link == it->linkage) {
1135  if( *oldsuperwrapper != it->old_wrapper)
1136  return UnbundledStatus::UNBUNDLE_DISTURBED;
1137 // printf("1\n");
1138  *oldsuperwrapper = it->new_wrapper;
1139  }
1140  }
1141  }
1142  if(status == SnapshotStatus::SNAPSHOT_COLLIDED)
1143  return UnbundledStatus::UNBUNDLE_COLLIDED;
1144 
1145  local_shared_ptr<PacketWrapper> newsubwrapper;
1146  if(oldsubpacket)
1147  newsubwrapper = *newsubwrapper_returned;
1148  else
1149  newsubwrapper = allocate_local_shared<PacketWrapper>(
1150  ( *newsubpacket)->node().m_allocatorPacketWrapper,
1151  *newsubpacket, SerialGenerator::SERIAL_NULL);
1152 // newsubwrapper.reset(new PacketWrapper( *newsubpacket, Packet::SERIAL_NULL));
1153 
1154  if( !sublinkage->compareAndSet(null_linkage, newsubwrapper))
1155  return UnbundledStatus::UNBUNDLE_SUBVALUE_HAS_CHANGED;
1156 
1157  if(newsubwrapper_returned)
1158  *newsubwrapper_returned = std::move(newsubwrapper);
1159 
1160 // if(oldsuperwrapper) {
1161 // if( &( *oldsuperwrapper)->packet()->node().m_link != &cas_infos.front().linkage)
1162 // return UNBUNDLE_DISTURBED;
1163 // if( *oldsuperwrapper != cas_infos.front().old_wrapper)
1164 // return UNBUNDLE_DISTURBED;
1165 // printf("1\n");
1166 // *oldsuperwrapper = cas_infos.front().new_wrapper;
1167 // }
1168 
1169  return UnbundledStatus::UNBUNDLE_W_NEW_SUBVALUE;
1170 }
1171 
1172 } //namespace Transactional
1173 

Generated for KAME4 by  doxygen 1.8.3