14 #include "transaction.h"
17 #ifdef TRANSACTIONAL_STRICT_assert
19 #define STRICT_assert(expr) assert(expr)
20 #define STRICT_TEST(expr) expr
22 #define STRICT_assert(expr)
23 #define STRICT_TEST(expr)
26 namespace Transactional {
40 Node<XN>::ProcessCounter::ProcessCounter() {
43 cnt_t newv = oldv + (cnt_t)1u;
45 if(s_count.compare_set_strong(oldv, newv)) {
56 printf(
"%s@%p, ",
typeid(*this).name(), &node());
57 printf(
"BP@%p, ", node().
m_link.get());
61 printf(
"%d subnodes : [ \n", (
int)size());
62 for(
int i = 0; i < size(); i++) {
63 if(subpackets()->at(i)) {
64 subpackets()->at(i)->print_();
68 printf(
"%s@%p, w/o packet, ",
typeid(*this).name(), subnodes()->at(i).get());
81 if( !(payload()->m_serial - subpackets()->m_serial < 0x7fffffffffffffffLL))
84 for(
int i = 0; i < size(); i++) {
85 if( !subpackets()->at(i)) {
86 if( !rootpacket->missing()) {
87 if( !subnodes()->at(i)->reverseLookup(
93 if(subpackets()->at(i)->size())
94 if( !(subpackets()->m_serial - subpackets()->at(i)->subpackets()->m_serial < 0x7fffffffffffffffLL))
96 if(subpackets()->at(i)->missing() && (rootpacket.get() !=
this)) {
100 if( !subpackets()->at(i)->checkConsistensy(
101 subpackets()->at(i)->missing() ? rootpacket : subpackets()->at(i)))
107 fprintf(stderr,
"Line %d, losing consistensy on node %p:\n", line, &node());
108 rootpacket->print_();
116 m_bundledBy(), m_packet(x), m_ridx((
int)PACKET_STATE::PACKET_HAS_PRIORITY), m_bundle_serial(bundle_serial) {
120 int64_t bundle_serial) noexcept :
121 m_bundledBy(bp), m_packet(), m_ridx(), m_bundle_serial(bundle_serial) {
122 setReverseIndex(reverse_index);
126 m_bundledBy(x.m_bundledBy), m_packet(x.m_packet),
127 m_ridx(x.m_ridx), m_bundle_serial(bundle_serial) {}
132 printf(
"PacketWrapper: ");
133 if( !hasPriority()) {
134 printf(
"referred to BP@%p, ", bundledBy().
get());
136 printf(
"serial:%lld, ", (
long long)m_bundle_serial);
148 Node<XN>::Linkage::negotiate_internal(
typename NegotiationCounter::cnt_t &started_time,
float mult_wait) noexcept {
150 auto transaction_started_time = m_transaction_started_time;
151 if( !transaction_started_time)
153 auto dt = started_time - transaction_started_time;
156 auto dt2 = Node<XN>::NegotiationCounter::now() - transaction_started_time;
158 if(mult_wait * dt < dt2)
165 ms = std::max((
int)(dt2 / 10000), ms + 1);
167 fprintf(stderr,
"Nested transaction?, ");
168 fprintf(stderr,
"Negotiating, %f sec. requested, limited to 200ms.", ms*1e-3);
169 fprintf(stderr,
"for BP@%p\n",
this);
180 m_allocatorPacket(&
m_link->m_mempoolPacket),
181 m_allocatorPacketList(&
m_link->m_mempoolPacketList),
182 m_allocatorPacketWrapper(&
m_link->m_mempoolPacketWrapper) {
186 packet->m_payload.
reset(( *stl_funcPayloadCreator)(static_cast<XN&>( *
this)));
187 *stl_funcPayloadCreator =
nullptr;
195 Node<XN>::print_()
const {
207 return insert(tr, var);
214 packet->subpackets() = packet->size() ? std::make_shared<PacketList>( *packet->subpackets()) : std::make_shared<PacketList>();
215 packet->subpackets()->m_serial = tr.m_serial;
216 packet->m_missing =
true;
217 packet->subnodes() = packet->size() ? std::make_shared<NodeList>( *packet->subnodes()) : std::make_shared<NodeList>();
222 packet->subpackets()->resize(packet->size() + 1);
223 assert(std::find(packet->subnodes()->begin(), packet->subnodes()->end(), var) == packet->subnodes()->end());
224 packet->subnodes()->resize(packet->subpackets()->size());
225 packet->subnodes()->back() = var;
227 if(online_after_insertion) {
228 bool has_failed =
false;
232 if( !tr.
m_packet->node().commit(tr)) {
241 subwrapper = *var->m_link;
242 BundledStatus status = bundle_subpacket(0, var, subwrapper, subpacket_new,
243 tr.m_started_time, tr.m_serial);
244 if(status != BundledStatus::BUNDLE_SUCCESS) {
253 if( !tr.
m_packet->node().commit(tr)) {
262 newwrapper->packet() = subpacket_new;
263 packet->subpackets()->back() = subpacket_new;
266 if( !var->m_link->compareAndSet(subwrapper, newwrapper)) {
267 tr.m_oldpacket.reset(
new Packet( *tr.m_oldpacket));
273 tr[ *
this].catchEvent(var, packet->size() - 1);
274 tr[ *
this].listChangeEvent();
283 return release(tr, var);
290 if(packet->size() && packet->subpackets()->m_serial == serial)
291 packet->subpackets()->m_serial = SerialGenerator::SERIAL_NULL;
292 if(packet->payload()->m_serial == serial)
293 packet->payload()->m_serial = SerialGenerator::SERIAL_NULL;
297 if(wrapper->m_bundle_serial != serial)
300 if(packet->node().m_link->compareAndSet(wrapper, newwrapper))
303 for(
int i = 0; i < packet->size(); ++i) {
306 eraseSerials(subpacket, serial);
312 Node<XN>::lookupFailure()
const {
313 fprintf(stderr,
"Node not found during a lookup.\n");
314 throw NodeNotFoundError(
"Lookup failure.");
321 assert(packet->size());
323 packet->subpackets()->m_serial = tr.m_serial;
324 packet->subnodes().
reset(
new NodeList( *packet->subnodes()));
325 unsigned int idx = 0;
328 auto nit = packet->subnodes()->begin();
329 for(
auto pit = packet->subpackets()->begin(); pit != packet->subpackets()->end();) {
330 assert(nit != packet->subnodes()->end());
331 if(nit->get() == &*var) {
333 nullsubwrapper = *var->m_link;
334 if(nullsubwrapper->hasPriority()) {
335 if(nullsubwrapper->packet() != *pit) {
336 tr.m_oldpacket.reset(
new Packet( *tr.m_oldpacket));
341 shared_ptr<Linkage> bp(nullsubwrapper->bundledBy());
342 if((bp && (bp != m_link)) ||
343 ( !bp && (nullsubwrapper->packet() != *pit))) {
344 tr.m_oldpacket.reset(
new Packet( *tr.m_oldpacket));
349 newsubwrapper->packet() = *pit;
351 pit = packet->subpackets()->erase(pit);
352 nit = packet->subnodes()->erase(nit);
364 if( !packet->subpackets()->size()) {
365 packet->subpackets().
reset();
366 packet->m_missing =
false;
378 tr[ *
this].releaseEvent(var, old_idx);
379 tr[ *
this].listChangeEvent();
381 if( !newsubwrapper) {
386 eraseSerials(packet, tr.m_serial);
391 if( !tr.
m_packet->node().commit(tr)) {
392 tr.m_oldpacket.reset(
new Packet( *tr.m_oldpacket));
400 if( !var->m_link->compareAndSet(nullsubwrapper, newsubwrapper)) {
401 tr.m_oldpacket.reset(
new Packet( *tr.m_oldpacket));
413 if( !release(tr, tr.
list()->back())) {
424 return swap(tr, x, y);
432 packet->subpackets()->m_serial = tr.m_serial;
433 packet->m_missing =
true;
435 unsigned int idx = 0;
436 int x_idx = -1, y_idx = -1;
437 for(
auto nit = packet->subnodes()->begin(); nit != packet->subnodes()->end(); ++nit) {
444 if((x_idx < 0) || (y_idx < 0))
448 packet->subpackets()->at(x_idx) = py;
449 packet->subpackets()->at(y_idx) = px;
450 packet->subnodes()->at(x_idx) = y;
451 packet->subnodes()->at(y_idx) = x;
452 tr[ *
this].moveEvent(x_idx, y_idx);
453 tr[ *
this].listChangeEvent();
463 if( !superpacket->size())
466 if(wrapper->hasPriority())
468 shared_ptr<Linkage> linkage_upper(wrapper->bundledBy());
472 if(linkage_upper == superpacket->node().m_link)
473 foundpacket = &superpacket;
475 foundpacket = reverseLookupWithHint(linkage_upper,
476 superpacket, copy_branch, tr_serial, set_missing,
nullptr,
nullptr);
480 int ridx = wrapper->reverseIndex();
481 if( !( *foundpacket)->size() || (ridx >= ( *foundpacket)->size()))
484 if(( *foundpacket)->subpackets()->m_serial != tr_serial) {
485 *foundpacket = allocate_local_shared<Packet>(( *foundpacket)->node().m_allocatorPacket, **foundpacket);
487 ( *foundpacket)->subpackets() = std::allocate_shared<PacketList>(
488 ( *foundpacket)->node().m_allocatorPacketList, *( *foundpacket)->subpackets());
490 ( *foundpacket)->m_missing = ( *foundpacket)->m_missing || set_missing;
491 ( *foundpacket)->subpackets()->m_serial = tr_serial;
495 if( !p || (p->node().m_link != linkage)) {
499 *upperpacket = *foundpacket;
508 bool copy_branch, int64_t tr_serial,
bool set_missing,
511 if( !superpacket->subpackets())
514 if(superpacket->subpackets()->m_serial != tr_serial) {
515 superpacket = allocate_local_shared<Packet>(superpacket->node().m_allocatorPacket, *superpacket);
517 superpacket->subpackets()= std::allocate_shared<PacketList>(
518 superpacket->node().m_allocatorPacketList, *superpacket->subpackets());
520 superpacket->subpackets()->m_serial = tr_serial;
521 superpacket->m_missing = superpacket->m_missing || set_missing;
524 for(
unsigned int i = 0; i < superpacket->subnodes()->size(); i++) {
525 if(( *superpacket->subnodes())[i].
get() ==
this) {
528 *upperpacket = superpacket;
534 for(
unsigned int i = 0; i < superpacket->subnodes()->size(); i++) {
538 forwardLookup(subpacket, copy_branch, tr_serial, set_missing, upperpacket, index)) {
550 reverseLookup(shot.
m_packet,
false, 0,
false, &uppernode);
557 bool copy_branch, int64_t tr_serial,
bool set_missing, XN **uppernode) {
561 foundpacket = reverseLookupWithHint(m_link, superpacket,
562 copy_branch, tr_serial, set_missing, &upperpacket, &index);
568 foundpacket = forwardLookup(superpacket, copy_branch, tr_serial, set_missing,
569 &upperpacket, &index);
574 *uppernode =
static_cast<XN*
>(&upperpacket->node());
575 assert( &( *foundpacket)->node() ==
this);
581 inline typename Node<XN>::SnapshotStatus
582 Node<XN>::snapshotSupernode(
const shared_ptr<Linkage > &linkage, shared_ptr<Linkage> &linkage_super,
584 SnapshotMode mode, int64_t serial, CASInfoList *cas_infos) {
586 assert( !shot->hasPriority());
587 shared_ptr<Linkage > linkage_upper(shot->bundledBy());
588 linkage_super = linkage_upper;
589 if( !linkage_upper) {
590 if( *linkage == oldwrapper)
592 return SnapshotStatus::SNAPSHOT_NODE_MISSING;
593 return SnapshotStatus::SNAPSHOT_DISTURBED;
595 int reverse_index = shot->reverseIndex();
597 shot = *linkage_upper;
599 SnapshotStatus status = SnapshotStatus::SNAPSHOT_NODE_MISSING;
601 if( !shot_upper->hasPriority()) {
602 status = snapshotSupernode(linkage_upper, linkage_super, shot, &upperpacket,
603 mode, serial, cas_infos);
606 case SnapshotStatus::SNAPSHOT_DISTURBED:
609 case SnapshotStatus::SNAPSHOT_VOID_PACKET:
610 case SnapshotStatus::SNAPSHOT_NODE_MISSING:
612 upperpacket = &shot->packet();
613 status = SnapshotStatus::SNAPSHOT_SUCCESS;
615 case SnapshotStatus::SNAPSHOT_NODE_MISSING_AND_COLLIDED:
617 upperpacket = &shot->packet();
618 status = SnapshotStatus::SNAPSHOT_COLLIDED;
620 case SnapshotStatus::SNAPSHOT_COLLIDED:
621 case SnapshotStatus::SNAPSHOT_SUCCESS:
625 if( *linkage != oldwrapper)
626 return SnapshotStatus::SNAPSHOT_DISTURBED;
628 assert( *upperpacket);
629 int size = ( *upperpacket)->size();
630 int i = reverse_index;
631 for(
int cnt = 0;; ++cnt) {
633 if(status == SnapshotStatus::SNAPSHOT_COLLIDED)
634 return SnapshotStatus::SNAPSHOT_NODE_MISSING;
635 status = SnapshotStatus::SNAPSHOT_NODE_MISSING;
640 if(( *( *upperpacket)->subnodes())[i]->m_link == linkage) {
642 *subpacket = &( *( *upperpacket)->subpackets())[i];
645 if(mode == SnapshotMode::SNAPSHOT_FOR_UNBUNDLE) {
649 assert(( *upperpacket)->missing());
650 return SnapshotStatus::SNAPSHOT_VOID_PACKET;
658 assert( !shot_upper->packet() || (shot_upper->packet()->node().m_link == linkage_upper));
659 assert(( *upperpacket)->node().m_link == linkage_upper);
660 if(mode == SnapshotMode::SNAPSHOT_FOR_UNBUNDLE) {
661 if(status == SnapshotStatus::SNAPSHOT_COLLIDED) {
662 return SnapshotStatus::SNAPSHOT_COLLIDED;
664 if((serial != SerialGenerator::SERIAL_NULL) && (shot_upper->m_bundle_serial == serial)) {
666 if(status == SnapshotStatus::SNAPSHOT_NODE_MISSING)
667 return SnapshotStatus::SNAPSHOT_NODE_MISSING;
668 return SnapshotStatus::SNAPSHOT_COLLIDED;
672 if(shot == shot_upper) {
673 newwrapper = allocate_local_shared<PacketWrapper>
674 (shot_upper->packet()->node().m_allocatorPacketWrapper,
675 *shot_upper, shot_upper->m_bundle_serial);
680 assert(cas_infos->size());
682 newwrapper = allocate_local_shared<PacketWrapper>
683 ( (*p)->node().m_allocatorPacketWrapper,
684 *p, shot->m_bundle_serial);
690 cas_infos->emplace_back(linkage_upper, shot_upper, newwrapper);
691 p = &newwrapper->packet();
694 *p = allocate_local_shared<Packet>(
695 (*p)->node().m_allocatorPacket, **p);
698 ( *p)->m_missing =
true;
702 if((status == SnapshotStatus::SNAPSHOT_NODE_MISSING) && (serial != SerialGenerator::SERIAL_NULL) &&
703 (( !oldwrapper->hasPriority()) && (oldwrapper->m_bundle_serial == serial))) {
705 return SnapshotStatus::SNAPSHOT_NODE_MISSING_AND_COLLIDED;
713 Node<XN>::snapshot(
Snapshot<XN> &snapshot,
bool multi_nodal,
typename NegotiationCounter::cnt_t started_time)
const {
716 snapshot.m_serial = SerialGenerator::gen();
718 if(target->hasPriority()) {
721 if( !target->packet()->missing()) {
722 STRICT_assert(target->packet()->checkConsistensy(target->packet()));
728 shared_ptr<Linkage> linkage_super;
731 SnapshotStatus status = snapshotSupernode(m_link, linkage_super,
732 superwrapper, &foundpacket, SnapshotMode::SNAPSHOT_FOR_BUNDLE);
734 case SnapshotStatus::SNAPSHOT_SUCCESS: {
735 if( !( *foundpacket)->missing() || !multi_nodal) {
736 snapshot.m_packet = *foundpacket;
737 STRICT_assert(snapshot.m_packet->checkConsistensy(snapshot.m_packet));
741 UnbundledStatus status = unbundle(
nullptr, started_time, m_link, target);
743 case UnbundledStatus::UNBUNDLE_W_NEW_SUBVALUE:
744 case UnbundledStatus::UNBUNDLE_COLLIDED:
745 case UnbundledStatus::UNBUNDLE_SUBVALUE_HAS_CHANGED:
751 case SnapshotStatus::SNAPSHOT_DISTURBED:
754 case SnapshotStatus::SNAPSHOT_NODE_MISSING:
755 case SnapshotStatus::SNAPSHOT_VOID_PACKET:
757 if( !target->packet()->missing() || !multi_nodal) {
758 snapshot.m_packet = target->packet();
764 BundledStatus status =
const_cast<Node *
>(
this)->bundle(
765 target, started_time, snapshot.m_serial,
true);
767 case BundledStatus::BUNDLE_SUCCESS:
768 assert( !target->packet()->missing());
769 STRICT_assert(target->packet()->checkConsistensy(target->packet()));
775 snapshot.m_packet = target->packet();
779 typename Node<XN>::BundledStatus
781 const shared_ptr<Node> &subnode,
783 typename NegotiationCounter::cnt_t &started_time, int64_t bundle_serial) {
785 if( !subwrapper->hasPriority()) {
786 shared_ptr<Linkage > linkage(subwrapper->bundledBy());
787 bool need_for_unbundle =
false;
788 bool detect_collision =
false;
789 if(linkage == m_link) {
791 if(subpacket_new->missing()) {
792 need_for_unbundle =
true;
795 return BundledStatus::BUNDLE_SUCCESS;
798 if(subwrapper->packet()) {
803 return BundledStatus::BUNDLE_DISTURBED;
807 need_for_unbundle =
true;
808 detect_collision =
true;
810 if(need_for_unbundle) {
812 UnbundledStatus status = unbundle(detect_collision ? &bundle_serial :
nullptr, started_time,
813 subnode->m_link, subwrapper,
nullptr, &subwrapper_new, superwrapper);
815 case UnbundledStatus::UNBUNDLE_W_NEW_SUBVALUE:
816 subwrapper = subwrapper_new;
818 case UnbundledStatus::UNBUNDLE_COLLIDED:
820 subpacket_new.
reset();
821 return BundledStatus::BUNDLE_SUCCESS;
822 case UnbundledStatus::UNBUNDLE_SUBVALUE_HAS_CHANGED:
824 return BundledStatus::BUNDLE_DISTURBED;
828 if(subwrapper->packet()->missing()) {
829 assert(subwrapper->packet()->size());
830 BundledStatus status = subnode->bundle(subwrapper, started_time, bundle_serial,
false);
832 case BundledStatus::BUNDLE_SUCCESS:
834 case BundledStatus::BUNDLE_DISTURBED:
836 return BundledStatus::BUNDLE_DISTURBED;
839 subpacket_new = subwrapper->packet();
840 return BundledStatus::BUNDLE_SUCCESS;
844 typename Node<XN>::BundledStatus
846 typename NegotiationCounter::cnt_t &started_time, int64_t bundle_serial,
bool is_bundle_root) {
848 assert(oldsuperwrapper->packet());
849 assert(oldsuperwrapper->packet()->size());
850 assert(oldsuperwrapper->packet()->missing());
852 Node &supernode(oldsuperwrapper->packet()->node());
854 if( !oldsuperwrapper->hasPriority() ||
855 (oldsuperwrapper->m_bundle_serial != bundle_serial)) {
860 allocate_local_shared<PacketWrapper>(
861 oldsuperwrapper->packet()->node().m_allocatorPacketWrapper,
862 oldsuperwrapper->packet(), bundle_serial);
863 if( !supernode.m_link->compareAndSet(oldsuperwrapper, superwrapper)) {
864 return BundledStatus::BUNDLE_DISTURBED;
866 oldsuperwrapper = std::move(superwrapper);
873 allocate_local_shared<PacketWrapper>(
874 oldsuperwrapper->packet()->node().m_allocatorPacketWrapper,
875 *oldsuperwrapper, bundle_serial);
879 reverseLookup(superwrapper->packet(),
true, SerialGenerator::gen()));
880 assert(newpacket->size());
881 assert(newpacket->missing());
883 STRICT_assert(s_serial_abandoned != newpacket->subpackets()->m_serial);
886 newpacket->subpackets() = std::allocate_shared<PacketList>(
887 newpacket->node().m_allocatorPacketList, *newpacket->subpackets());
889 shared_ptr<PacketList> &subpackets(newpacket->subpackets());
890 shared_ptr<NodeList> &subnodes(newpacket->subnodes());
892 bool missing =
false;
893 for(
unsigned int i = 0; i < subpackets->size(); ++i) {
894 shared_ptr<Node> child(( *subnodes)[i]);
898 subwrapper = *child->m_link;
899 if(subwrapper == subwrappers_org[i])
901 BundledStatus status = bundle_subpacket( &oldsuperwrapper,
902 child, subwrapper, subpacket_new, started_time, bundle_serial);
904 case BundledStatus::BUNDLE_SUCCESS:
906 case BundledStatus::BUNDLE_DISTURBED:
908 if(oldsuperwrapper == *supernode.m_link)
912 subwrappers_org[i] = subwrapper;
914 if(subpacket_new->missing()) {
917 assert(&subpacket_new->node() == child.get());
925 assert( &supernode ==
this);
928 newpacket->m_missing =
true;
930 supernode.m_link->negotiate(started_time, 4.0f);
932 if( !supernode.m_link->compareAndSet(oldsuperwrapper, superwrapper)) {
935 return BundledStatus::BUNDLE_DISTURBED;
939 oldsuperwrapper = std::move(superwrapper);
942 bool changed_during_bundling =
false;
943 for(
unsigned int i = 0; i < subnodes->size(); i++) {
944 shared_ptr<Node> child(( *subnodes)[i]);
946 if(( *subpackets)[i])
947 null_linkage = allocate_local_shared<PacketWrapper>(
948 child->m_allocatorPacketWrapper, m_link, i, bundle_serial);
951 null_linkage = allocate_local_shared<PacketWrapper>(
952 child->m_allocatorPacketWrapper, *subwrappers_org[i], bundle_serial);
955 assert( !null_linkage->hasPriority());
957 if( !child->m_link->compareAndSet(subwrappers_org[i], null_linkage)) {
959 return BundledStatus::BUNDLE_DISTURBED;
960 if(oldsuperwrapper != *supernode.m_link)
961 return BundledStatus::BUNDLE_DISTURBED;
962 changed_during_bundling =
true;
966 if(changed_during_bundling)
969 superwrapper = allocate_local_shared<PacketWrapper>(
970 supernode.m_allocatorPacketWrapper, *oldsuperwrapper, bundle_serial);
975 reverseLookup(superwrapper->packet(),
true, SerialGenerator::gen()));
976 newpacket->m_missing =
false;
977 STRICT_assert(newpacket->checkConsistensy(newpacket));
980 if( !supernode.m_link->compareAndSet(oldsuperwrapper, superwrapper))
981 return BundledStatus::BUNDLE_DISTURBED;
982 oldsuperwrapper = std::move(superwrapper);
986 return BundledStatus::BUNDLE_SUCCESS;
1030 assert(tr.m_oldpacket != tr.
m_packet);
1031 assert(tr.isMultiNodal() || tr.
m_packet->subpackets() == tr.m_oldpacket->subpackets());
1032 assert(
this == &tr.
m_packet->node());
1035 m_allocatorPacketWrapper, tr.
m_packet, tr.m_serial);
1037 for(
int retry = 0;; ++retry) {
1039 if(wrapper->hasPriority()) {
1041 if(wrapper->packet() != tr.m_oldpacket) {
1042 if( !tr.isMultiNodal() && (wrapper->packet()->payload() == tr.m_oldpacket->payload())) {
1044 tr.
m_packet->subpackets() = wrapper->packet()->subpackets();
1045 tr.
m_packet->m_missing = wrapper->packet()->missing();
1048 STRICT_TEST(s_serial_abandoned = tr.m_serial);
1057 m_link->negotiate(tr.m_started_time, 4.0f);
1058 if(m_link->compareAndSet(wrapper, newwrapper)) {
1071 UnbundledStatus status = unbundle(
nullptr, tr.m_started_time, m_link, wrapper,
1072 tr.isMultiNodal() ? &tr.m_oldpacket :
nullptr, tr.isMultiNodal() ? &newwrapper :
nullptr);
1074 case UnbundledStatus::UNBUNDLE_W_NEW_SUBVALUE:
1075 if(tr.isMultiNodal())
1078 case UnbundledStatus::UNBUNDLE_SUBVALUE_HAS_CHANGED: {
1079 STRICT_TEST(s_serial_abandoned = tr.m_serial);
1083 case UnbundledStatus::UNBUNDLE_DISTURBED:
1091 typename Node<XN>::UnbundledStatus
1097 assert( !null_linkage->hasPriority());
1100 shared_ptr<Linkage> linkage_super;
1104 SnapshotStatus status = snapshotSupernode(sublinkage, linkage_super, superwrapper, &newsubpacket,
1105 SnapshotMode::SNAPSHOT_FOR_UNBUNDLE,
1106 bundle_serial ? *bundle_serial : SerialGenerator::SERIAL_NULL, &cas_infos);
1108 case SnapshotStatus::SNAPSHOT_SUCCESS:
1110 case SnapshotStatus::SNAPSHOT_DISTURBED:
1111 return UnbundledStatus::UNBUNDLE_DISTURBED;
1112 case SnapshotStatus::SNAPSHOT_VOID_PACKET:
1113 case SnapshotStatus::SNAPSHOT_NODE_MISSING:
1115 assert(newsubpacket);
1117 case SnapshotStatus::SNAPSHOT_NODE_MISSING_AND_COLLIDED:
1119 assert(newsubpacket);
1120 status = SnapshotStatus::SNAPSHOT_COLLIDED;
1122 case SnapshotStatus::SNAPSHOT_COLLIDED:
1126 if(oldsubpacket && ( *newsubpacket != *oldsubpacket))
1127 return UnbundledStatus::UNBUNDLE_SUBVALUE_HAS_CHANGED;
1129 for(
auto it = cas_infos.begin(); it != cas_infos.end(); ++it) {
1131 if( !it->linkage->compareAndSet(it->old_wrapper, it->new_wrapper))
1132 return UnbundledStatus::UNBUNDLE_DISTURBED;
1133 if(oldsuperwrapper) {
1134 if( ( *oldsuperwrapper)->packet()->node().m_link == it->linkage) {
1135 if( *oldsuperwrapper != it->old_wrapper)
1136 return UnbundledStatus::UNBUNDLE_DISTURBED;
1138 *oldsuperwrapper = it->new_wrapper;
1142 if(status == SnapshotStatus::SNAPSHOT_COLLIDED)
1143 return UnbundledStatus::UNBUNDLE_COLLIDED;
1147 newsubwrapper = *newsubwrapper_returned;
1149 newsubwrapper = allocate_local_shared<PacketWrapper>(
1150 ( *newsubpacket)->node().m_allocatorPacketWrapper,
1151 *newsubpacket, SerialGenerator::SERIAL_NULL);
1154 if( !sublinkage->compareAndSet(null_linkage, newsubwrapper))
1155 return UnbundledStatus::UNBUNDLE_SUBVALUE_HAS_CHANGED;
1157 if(newsubwrapper_returned)
1158 *newsubwrapper_returned = std::move(newsubwrapper);
1169 return UnbundledStatus::UNBUNDLE_W_NEW_SUBVALUE;