Coverage for oc_meta / lib / merge_registry.py: 97%

133 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7 

8class EntityStore: 

9 """ 

10 Unified entity storage with merge tracking and bidirectional identifier index. 

11 

12 Combines entity data storage, merge tracking (Union-Find), and identifier 

13 lookups in both directions (entity→ids and id→entities). 

14 """ 

15 

16 def __init__(self) -> None: 

17 self._parent: dict[str, str] = {} 

18 self._merged: dict[str, set[str]] = {} 

19 self._wannabe_to_meta: dict[str, str] = {} 

20 self._entity_ids: dict[str, set[str]] = {} 

21 self._entity_titles: dict[str, str] = {} 

22 self._id_to_entities: dict[str, set[str]] = {} 

23 self._id_to_metaid: dict[str, str] = {} 

24 

25 def find(self, entity_id: str) -> str: 

26 """ 

27 Find the canonical (root) ID for an entity with path compression. 

28 

29 If the entity has a MetaID assigned, returns that MetaID. 

30 Otherwise returns the root of its Union-Find tree. 

31 """ 

32 if entity_id in self._wannabe_to_meta: 

33 return self._wannabe_to_meta[entity_id] 

34 

35 if entity_id not in self._parent: 

36 return entity_id 

37 

38 root = entity_id 

39 while self._parent[root] != root: 

40 root = self._parent[root] 

41 

42 current = entity_id 

43 while current != root: 

44 next_parent = self._parent[current] 

45 self._parent[current] = root 

46 current = next_parent 

47 

48 if root in self._wannabe_to_meta: 

49 return self._wannabe_to_meta[root] 

50 

51 return root 

52 

53 def merge(self, target: str, source: str) -> None: 

54 """ 

55 Register that source entity was merged into target entity. 

56 

57 After this call, find(source) will return find(target). 

58 """ 

59 target_root = self.find(target) 

60 source_root = self.find(source) 

61 

62 if target_root == source_root: 

63 return 

64 

65 if target_root not in self._parent: 

66 self._parent[target_root] = target_root 

67 self._merged[target_root] = set() 

68 if source_root not in self._parent: 

69 self._parent[source_root] = source_root 

70 self._merged[source_root] = set() 

71 

72 self._parent[source_root] = target_root 

73 

74 self._merged[target_root].add(source_root) 

75 self._merged[target_root].update(self._merged[source_root]) 

76 del self._merged[source_root] 

77 

78 def get_merged(self, canonical: str) -> set[str]: 

79 """ 

80 Get all entity IDs that were merged into the canonical entity. 

81 

82 Returns an empty set if no merges occurred. 

83 """ 

84 root = self.find(canonical) 

85 if "wannabe" in root and root in self._wannabe_to_meta: 

86 root = self._wannabe_to_meta[root] 

87 return self._merged.get(root, set()).copy() 

88 

89 def assign_meta(self, wannabe: str, meta: str) -> None: 

90 """ 

91 Assign a final MetaID to a wannabe entity. 

92 

93 Copies all identifiers and title from wannabe to meta and registers the mapping. 

94 After this call, find(wannabe) and find() of any entity merged 

95 into wannabe will return meta. 

96 """ 

97 # Find the root of the Union-Find tree. If wannabe_0 was merged into wannabe_1 

98 # which was merged into wannabe_2, the root is wannabe_2. If the wannabe was 

99 # never merged, the root is itself. 

100 root = wannabe 

101 if wannabe in self._parent: 

102 root = wannabe 

103 while self._parent[root] != root: 

104 root = self._parent[root] 

105 

106 # Register mapping for both: the root and the wannabe passed as argument. 

107 # This way find() works starting from either. 

108 self._wannabe_to_meta[root] = meta 

109 self._wannabe_to_meta[wannabe] = meta 

110 

111 # Transfer merge structure from root to meta. If root had merged entities, 

112 # move that set under key meta and add the root itself to the set. 

113 # Otherwise create a new set containing just the wannabe (or empty if 

114 # wannabe == meta, meaning the entity already existed in the triplestore). 

115 if root in self._merged: 

116 self._merged[meta] = self._merged.pop(root) 

117 self._merged[meta].add(root) 

118 else: 

119 self._merged[meta] = {wannabe} if wannabe != meta else set() 

120 

121 # Propagate mapping to all entities that were merged. This way calling 

122 # find("br/wannabe_0") after wannabe_0 → wannabe_1 → wannabe_2 → meta 

123 # still returns meta. 

124 for merged_id in list(self._merged[meta]): 

125 self._wannabe_to_meta[merged_id] = meta 

126 

127 # Copy identifiers from wannabe to meta 

128 ids = self._entity_ids[wannabe] if wannabe in self._entity_ids else set() 

129 if meta not in self._entity_ids: 

130 self._entity_ids[meta] = set() 

131 self._entity_ids[meta].update(ids) 

132 

133 # Update reverse index: before "doi:10.1234/abc" → {"br/wannabe_0"}, 

134 # after "doi:10.1234/abc" → {"br/0601"} 

135 for id_literal in ids: 

136 if id_literal in self._id_to_entities: 

137 self._id_to_entities[id_literal].discard(wannabe) 

138 self._id_to_entities[id_literal].add(meta) 

139 

140 # Copy title from wannabe to meta, but only if meta doesn't already have one. 

141 # If meta is a preexisting entity with a title from the triplestore, don't overwrite. 

142 if meta not in self._entity_titles and wannabe in self._entity_titles: 

143 self._entity_titles[meta] = self._entity_titles[wannabe] 

144 

145 def add_entity(self, entity_key: str, title: str = "") -> None: 

146 """Create new entity with empty ids set and optional title.""" 

147 self._entity_ids[entity_key] = set() 

148 self._entity_titles[entity_key] = title 

149 

150 def add_id(self, entity_key: str, identifier: str) -> None: 

151 """Add identifier to entity, updating both forward and reverse indexes.""" 

152 if entity_key not in self._entity_ids: 

153 self._entity_ids[entity_key] = set() 

154 self._entity_ids[entity_key].add(identifier) 

155 if identifier not in self._id_to_entities: 

156 self._id_to_entities[identifier] = set() 

157 self._id_to_entities[identifier].add(entity_key) 

158 

159 def get_ids(self, entity_key: str) -> set[str]: 

160 """Get all identifiers for entity.""" 

161 return self._entity_ids.get(entity_key, set()) 

162 

163 def get_title(self, entity_key: str) -> str: 

164 """Get title for entity.""" 

165 return self._entity_titles.get(entity_key, "") 

166 

167 def set_title(self, entity_key: str, title: str) -> None: 

168 """Set title for entity.""" 

169 self._entity_titles[entity_key] = title 

170 

171 def has_entity(self, entity_key: str) -> bool: 

172 """Check if entity exists in store.""" 

173 return entity_key in self._entity_ids 

174 

175 def remove_entity(self, entity_key: str) -> None: 

176 """Remove entity from store, cleaning both forward and reverse indexes.""" 

177 identifiers = self._entity_ids.pop(entity_key, set()) 

178 self._entity_titles.pop(entity_key, None) 

179 for identifier in identifiers: 

180 if identifier in self._id_to_entities: 

181 self._id_to_entities[identifier].discard(entity_key) 

182 if not self._id_to_entities[identifier]: 

183 del self._id_to_entities[identifier] 

184 

185 def merge_entities(self, target: str, source: str) -> None: 

186 """Merge source entity into target, combining ids and keeping target's title.""" 

187 self.merge(target, source) 

188 source_ids = self._entity_ids.pop(source, set()) 

189 source_title = self._entity_titles.pop(source, "") 

190 if target not in self._entity_ids: 

191 self._entity_ids[target] = set() 

192 self._entity_ids[target].update(source_ids) 

193 for identifier in source_ids: 

194 if identifier in self._id_to_entities: 

195 self._id_to_entities[identifier].discard(source) 

196 self._id_to_entities[identifier].add(target) 

197 if not self._entity_titles.get(target) and source_title: 

198 self._entity_titles[target] = source_title 

199 

200 def entities(self) -> dict[str, dict[str, set[str] | str]]: 

201 """ 

202 Get all entities as dict for backward compatibility. 

203 Returns: {entity_key: {"ids": set, "title": str}} 

204 """ 

205 return { 

206 key: {"ids": self._entity_ids[key].copy(), "title": self._entity_titles.get(key, "")} 

207 for key in self._entity_ids 

208 } 

209 

210 def find_entities(self, id_literal: str) -> set[str]: 

211 """ 

212 Find all entities that have an identifier. 

213 

214 Returns an empty set if the identifier is not registered. 

215 """ 

216 return self._id_to_entities.get(id_literal, set()).copy() 

217 

218 def find_entity(self, id_literal: str) -> str | None: 

219 """ 

220 Find the first entity that has an identifier. 

221 

222 Returns None if the identifier is not registered. 

223 """ 

224 entities = self._id_to_entities.get(id_literal, set()) 

225 return next(iter(entities), None) if entities else None 

226 

227 def update_id_entity(self, old_entity: str, new_entity: str) -> None: 

228 """ 

229 Update all identifiers from old_entity to point to new_entity. 

230 

231 Used when merging entities externally. 

232 """ 

233 for id_literal, entities in self._id_to_entities.items(): 

234 if old_entity in entities: 

235 entities.discard(old_entity) 

236 entities.add(new_entity) 

237 

238 def set_id_metaid(self, id_literal: str, metaid: str) -> None: 

239 """Store the MetaID for an identifier literal.""" 

240 self._id_to_metaid[id_literal] = metaid 

241 

242 def get_id_metaid(self, id_literal: str) -> str | None: 

243 """Get the MetaID for an identifier literal.""" 

244 return self._id_to_metaid.get(id_literal) 

245 

246 def get_id_metaids(self) -> dict[str, str]: 

247 """Get all identifier → MetaID mappings.""" 

248 return self._id_to_metaid.copy() 

249 

250 def __iter__(self): 

251 """Iterate over entity keys.""" 

252 return iter(self._entity_ids) 

253 

254 def __len__(self) -> int: 

255 """Return number of entities.""" 

256 return len(self._entity_ids) 

257 

258 def __contains__(self, entity_key: str) -> bool: 

259 """Check if entity exists.""" 

260 return entity_key in self._entity_ids