Coverage for oc_meta / lib / merge_registry.py: 97%
133 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from __future__ import annotations
8class EntityStore:
9 """
10 Unified entity storage with merge tracking and bidirectional identifier index.
12 Combines entity data storage, merge tracking (Union-Find), and identifier
13 lookups in both directions (entity→ids and id→entities).
14 """
16 def __init__(self) -> None:
17 self._parent: dict[str, str] = {}
18 self._merged: dict[str, set[str]] = {}
19 self._wannabe_to_meta: dict[str, str] = {}
20 self._entity_ids: dict[str, set[str]] = {}
21 self._entity_titles: dict[str, str] = {}
22 self._id_to_entities: dict[str, set[str]] = {}
23 self._id_to_metaid: dict[str, str] = {}
25 def find(self, entity_id: str) -> str:
26 """
27 Find the canonical (root) ID for an entity with path compression.
29 If the entity has a MetaID assigned, returns that MetaID.
30 Otherwise returns the root of its Union-Find tree.
31 """
32 if entity_id in self._wannabe_to_meta:
33 return self._wannabe_to_meta[entity_id]
35 if entity_id not in self._parent:
36 return entity_id
38 root = entity_id
39 while self._parent[root] != root:
40 root = self._parent[root]
42 current = entity_id
43 while current != root:
44 next_parent = self._parent[current]
45 self._parent[current] = root
46 current = next_parent
48 if root in self._wannabe_to_meta:
49 return self._wannabe_to_meta[root]
51 return root
53 def merge(self, target: str, source: str) -> None:
54 """
55 Register that source entity was merged into target entity.
57 After this call, find(source) will return find(target).
58 """
59 target_root = self.find(target)
60 source_root = self.find(source)
62 if target_root == source_root:
63 return
65 if target_root not in self._parent:
66 self._parent[target_root] = target_root
67 self._merged[target_root] = set()
68 if source_root not in self._parent:
69 self._parent[source_root] = source_root
70 self._merged[source_root] = set()
72 self._parent[source_root] = target_root
74 self._merged[target_root].add(source_root)
75 self._merged[target_root].update(self._merged[source_root])
76 del self._merged[source_root]
78 def get_merged(self, canonical: str) -> set[str]:
79 """
80 Get all entity IDs that were merged into the canonical entity.
82 Returns an empty set if no merges occurred.
83 """
84 root = self.find(canonical)
85 if "wannabe" in root and root in self._wannabe_to_meta:
86 root = self._wannabe_to_meta[root]
87 return self._merged.get(root, set()).copy()
89 def assign_meta(self, wannabe: str, meta: str) -> None:
90 """
91 Assign a final MetaID to a wannabe entity.
93 Copies all identifiers and title from wannabe to meta and registers the mapping.
94 After this call, find(wannabe) and find() of any entity merged
95 into wannabe will return meta.
96 """
97 # Find the root of the Union-Find tree. If wannabe_0 was merged into wannabe_1
98 # which was merged into wannabe_2, the root is wannabe_2. If the wannabe was
99 # never merged, the root is itself.
100 root = wannabe
101 if wannabe in self._parent:
102 root = wannabe
103 while self._parent[root] != root:
104 root = self._parent[root]
106 # Register mapping for both: the root and the wannabe passed as argument.
107 # This way find() works starting from either.
108 self._wannabe_to_meta[root] = meta
109 self._wannabe_to_meta[wannabe] = meta
111 # Transfer merge structure from root to meta. If root had merged entities,
112 # move that set under key meta and add the root itself to the set.
113 # Otherwise create a new set containing just the wannabe (or empty if
114 # wannabe == meta, meaning the entity already existed in the triplestore).
115 if root in self._merged:
116 self._merged[meta] = self._merged.pop(root)
117 self._merged[meta].add(root)
118 else:
119 self._merged[meta] = {wannabe} if wannabe != meta else set()
121 # Propagate mapping to all entities that were merged. This way calling
122 # find("br/wannabe_0") after wannabe_0 → wannabe_1 → wannabe_2 → meta
123 # still returns meta.
124 for merged_id in list(self._merged[meta]):
125 self._wannabe_to_meta[merged_id] = meta
127 # Copy identifiers from wannabe to meta
128 ids = self._entity_ids[wannabe] if wannabe in self._entity_ids else set()
129 if meta not in self._entity_ids:
130 self._entity_ids[meta] = set()
131 self._entity_ids[meta].update(ids)
133 # Update reverse index: before "doi:10.1234/abc" → {"br/wannabe_0"},
134 # after "doi:10.1234/abc" → {"br/0601"}
135 for id_literal in ids:
136 if id_literal in self._id_to_entities:
137 self._id_to_entities[id_literal].discard(wannabe)
138 self._id_to_entities[id_literal].add(meta)
140 # Copy title from wannabe to meta, but only if meta doesn't already have one.
141 # If meta is a preexisting entity with a title from the triplestore, don't overwrite.
142 if meta not in self._entity_titles and wannabe in self._entity_titles:
143 self._entity_titles[meta] = self._entity_titles[wannabe]
145 def add_entity(self, entity_key: str, title: str = "") -> None:
146 """Create new entity with empty ids set and optional title."""
147 self._entity_ids[entity_key] = set()
148 self._entity_titles[entity_key] = title
150 def add_id(self, entity_key: str, identifier: str) -> None:
151 """Add identifier to entity, updating both forward and reverse indexes."""
152 if entity_key not in self._entity_ids:
153 self._entity_ids[entity_key] = set()
154 self._entity_ids[entity_key].add(identifier)
155 if identifier not in self._id_to_entities:
156 self._id_to_entities[identifier] = set()
157 self._id_to_entities[identifier].add(entity_key)
159 def get_ids(self, entity_key: str) -> set[str]:
160 """Get all identifiers for entity."""
161 return self._entity_ids.get(entity_key, set())
163 def get_title(self, entity_key: str) -> str:
164 """Get title for entity."""
165 return self._entity_titles.get(entity_key, "")
167 def set_title(self, entity_key: str, title: str) -> None:
168 """Set title for entity."""
169 self._entity_titles[entity_key] = title
171 def has_entity(self, entity_key: str) -> bool:
172 """Check if entity exists in store."""
173 return entity_key in self._entity_ids
175 def remove_entity(self, entity_key: str) -> None:
176 """Remove entity from store, cleaning both forward and reverse indexes."""
177 identifiers = self._entity_ids.pop(entity_key, set())
178 self._entity_titles.pop(entity_key, None)
179 for identifier in identifiers:
180 if identifier in self._id_to_entities:
181 self._id_to_entities[identifier].discard(entity_key)
182 if not self._id_to_entities[identifier]:
183 del self._id_to_entities[identifier]
185 def merge_entities(self, target: str, source: str) -> None:
186 """Merge source entity into target, combining ids and keeping target's title."""
187 self.merge(target, source)
188 source_ids = self._entity_ids.pop(source, set())
189 source_title = self._entity_titles.pop(source, "")
190 if target not in self._entity_ids:
191 self._entity_ids[target] = set()
192 self._entity_ids[target].update(source_ids)
193 for identifier in source_ids:
194 if identifier in self._id_to_entities:
195 self._id_to_entities[identifier].discard(source)
196 self._id_to_entities[identifier].add(target)
197 if not self._entity_titles.get(target) and source_title:
198 self._entity_titles[target] = source_title
200 def entities(self) -> dict[str, dict[str, set[str] | str]]:
201 """
202 Get all entities as dict for backward compatibility.
203 Returns: {entity_key: {"ids": set, "title": str}}
204 """
205 return {
206 key: {"ids": self._entity_ids[key].copy(), "title": self._entity_titles.get(key, "")}
207 for key in self._entity_ids
208 }
210 def find_entities(self, id_literal: str) -> set[str]:
211 """
212 Find all entities that have an identifier.
214 Returns an empty set if the identifier is not registered.
215 """
216 return self._id_to_entities.get(id_literal, set()).copy()
218 def find_entity(self, id_literal: str) -> str | None:
219 """
220 Find the first entity that has an identifier.
222 Returns None if the identifier is not registered.
223 """
224 entities = self._id_to_entities.get(id_literal, set())
225 return next(iter(entities), None) if entities else None
227 def update_id_entity(self, old_entity: str, new_entity: str) -> None:
228 """
229 Update all identifiers from old_entity to point to new_entity.
231 Used when merging entities externally.
232 """
233 for id_literal, entities in self._id_to_entities.items():
234 if old_entity in entities:
235 entities.discard(old_entity)
236 entities.add(new_entity)
238 def set_id_metaid(self, id_literal: str, metaid: str) -> None:
239 """Store the MetaID for an identifier literal."""
240 self._id_to_metaid[id_literal] = metaid
242 def get_id_metaid(self, id_literal: str) -> str | None:
243 """Get the MetaID for an identifier literal."""
244 return self._id_to_metaid.get(id_literal)
246 def get_id_metaids(self) -> dict[str, str]:
247 """Get all identifier → MetaID mappings."""
248 return self._id_to_metaid.copy()
250 def __iter__(self):
251 """Iterate over entity keys."""
252 return iter(self._entity_ids)
254 def __len__(self) -> int:
255 """Return number of entities."""
256 return len(self._entity_ids)
258 def __contains__(self, entity_key: str) -> bool:
259 """Check if entity exists."""
260 return entity_key in self._entity_ids