Source code for localuf.decoders.buf
from localuf.constants import Growth
from localuf.decoders.uf import UF, _Cluster
from localuf.type_aliases import Node
[docs]
class BUF(UF):
"""The graph used by Bucket UF.
Extends ``decoders.UF``.
Attributes:
* ``mvl`` an integer of the minimum vision length of
all active clusters at the start of a growth round.
Methods:
* ``_update_mvl``.
* ``_demote_cluster``.
Overriden attributes:
* ``buckets`` replaces ``active_clusters`` and maps each integer
to a set of all active clusters whose vision length is that integer.
It is important that its keys is in ascending order
as it is used by ``_update_mvl()``.
Overriden methods:
* ``reset``.
* ``load``.
* ``validate_syndrome``.
* ``_merge``.
* ``_update_self_after_union``.
"""
[docs]
def reset(self):
super().reset()
self.buckets: dict[int, set[_Cluster]] = {
vision_length: set() for vision_length
in range(1, self.CODE.N_EDGES+1)
}
self.mvl = None
[docs]
def load(self, syndrome):
super().load(syndrome)
for cluster in self.active_clusters:
self.buckets[len(cluster.vision)].add(cluster)
del self.active_clusters
self._update_mvl()
def _update_mvl(self):
"""Update ``mvl`` attribute."""
changed = False
for vision_length, bucket in self.buckets.items():
if bucket:
self.mvl = vision_length
changed = True
break
if not changed:
self.mvl = None
# SYNDROME VALIDATION (need tests)
[docs]
def validate(
self,
syndrome: set[Node],
log_history=False
):
"""Grow clusters (always shortest vision first) until they are all valid."""
self.load(syndrome)
if log_history: self.init_history()
while self.mvl is not None:
self._growth_round(
log_history,
clusters_to_grow=self.buckets[self.mvl]
)
self._update_mvl()
[docs]
def static_merge(self, cu, cv):
if cu == cv:
self._demote_cluster(cu)
else:
self._union(cu, cv)
[docs]
def dynamic_merge(self, cu, cv, e):
if cu == cv:
self.growth[e] = Growth.BURNT
self._demote_cluster(cu)
elif (cu.boundary and cv.boundary):
self.growth[e] = Growth.BURNT
self._demote_cluster(cu)
self._demote_cluster(cv)
else:
self._union(cu, cv)
def _demote_cluster(self, cluster: _Cluster):
"""If cluster active, move down a bucket as an edge has been removed from its vision. Else do nothing."""
vision_length = len(cluster.vision)
try:
self.buckets[vision_length+1].remove(cluster)
self.buckets[vision_length].add(cluster)
except KeyError: pass
def _update_self_after_union(self, larger, smaller, old_larger_vision_length):
"""Update attributes clusters and buckets after union of larger w/ smaller."""
# DELETE SMALLER CLUSTER
del self.clusters[smaller.root]
# discard instead of remove as smaller may be a node not in syndrome
self.buckets[len(smaller.vision)+1].discard(smaller) # +1 as we already removed (u, v) from all visions
# UPDATE buckets DICTIONARY
# discard instead of remove as larger could have had boundary before union, hence was not active
self.buckets[old_larger_vision_length].discard(larger)
if larger.odd and not larger.boundary:
self.buckets[len(larger.vision)].add(larger)