Coverage for triplelite / _graph.py: 92%
342 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 16:42 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 16:42 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from __future__ import annotations
7from collections.abc import Iterable
8from typing import Iterator
10from triplelite._rdflib_bridge import to_rdflib as _to_rdflib
11from triplelite._types import RDFTerm, Triple, _InternalPOS, _InternalSPO
13_EMPTY_DICT: dict = {}
14_EMPTY_SET: set = set()
17class TripleLite:
18 __slots__ = (
19 "_spo",
20 "_pos",
21 "_indexed_predicates",
22 "_len",
23 "identifier",
24 "_str_to_id",
25 "_id_to_str",
26 "_term_to_id",
27 "_id_to_term",
28 )
30 def __init__(
31 self,
32 identifier: str | None = None,
33 reverse_index_predicates: frozenset[str] | None = None,
34 ) -> None:
35 self._spo: _InternalSPO = {}
36 self._len: int = 0
37 self.identifier: str | None = identifier
38 self._str_to_id: dict[str, int] = {}
39 self._id_to_str: list[str] = []
40 self._term_to_id: dict[RDFTerm, int] = {}
41 self._id_to_term: list[RDFTerm] = []
42 if reverse_index_predicates is not None:
43 self._indexed_predicates: frozenset[int] | None = frozenset(
44 self._intern_str(p) for p in reverse_index_predicates
45 )
46 self._pos: _InternalPOS | None = {}
47 else:
48 self._indexed_predicates = None
49 self._pos = None
51 def _intern_str(self, s: str) -> int:
52 sid = self._str_to_id.get(s)
53 if sid is not None:
54 return sid
55 sid = len(self._id_to_str)
56 self._str_to_id[s] = sid
57 self._id_to_str.append(s)
58 return sid
60 def _intern_term(self, t: RDFTerm) -> int:
61 tid = self._term_to_id.get(t)
62 if tid is not None:
63 return tid
64 tid = len(self._id_to_term)
65 self._term_to_id[t] = tid
66 self._id_to_term.append(t)
67 return tid
69 def add(self, triple: tuple[str, str, RDFTerm]) -> None:
70 subject, predicate, obj = triple
71 sid = self._intern_str(subject)
72 pid = self._intern_str(predicate)
73 oid = self._intern_term(obj)
74 objects = self._spo.setdefault(sid, {}).setdefault(pid, set())
75 if oid not in objects:
76 objects.add(oid)
77 self._len += 1
78 if self._pos is not None:
79 indexed = self._indexed_predicates
80 if not indexed or pid in indexed:
81 self._pos.setdefault(pid, {}).setdefault(oid, set()).add(sid)
83 def add_many(self, triples: Iterable[tuple[str, str, RDFTerm]]) -> None:
84 spo = self._spo
85 pos = self._pos
86 indexed = self._indexed_predicates
87 spo_setdefault = spo.setdefault
88 intern_str = self._intern_str
89 intern_term = self._intern_term
90 count = self._len
91 if pos is not None:
92 pos_setdefault = pos.setdefault
93 check_indexed = bool(indexed)
94 for subject, predicate, obj in triples:
95 sid = intern_str(subject)
96 pid = intern_str(predicate)
97 oid = intern_term(obj)
98 objects = spo_setdefault(sid, {}).setdefault(pid, set())
99 if oid not in objects:
100 objects.add(oid)
101 count += 1
102 if not check_indexed or pid in indexed:
103 pos_setdefault(pid, {}).setdefault(oid, set()).add(sid)
104 else:
105 for subject, predicate, obj in triples:
106 sid = intern_str(subject)
107 pid = intern_str(predicate)
108 oid = intern_term(obj)
109 objects = spo_setdefault(sid, {}).setdefault(pid, set())
110 if oid not in objects:
111 objects.add(oid)
112 count += 1
113 self._len = count
115 def _remove_triple(self, sid: int, pid: int, oid: int) -> None:
116 predicates = self._spo.get(sid)
117 if predicates is None:
118 return
119 objects = predicates.get(pid)
120 if objects is None:
121 return
122 if oid not in objects:
123 return
124 objects.discard(oid)
125 self._len -= 1
126 if not objects:
127 del predicates[pid]
128 if not predicates:
129 del self._spo[sid]
130 if self._pos is not None:
131 obj_to_subjects = self._pos.get(pid)
132 if obj_to_subjects is not None:
133 subjects = obj_to_subjects.get(oid)
134 if subjects is not None:
135 subjects.discard(sid)
136 if not subjects:
137 del obj_to_subjects[oid]
138 if not obj_to_subjects:
139 del self._pos[pid]
141 def remove(self, triple: tuple[str | None, str | None, RDFTerm | None]) -> None:
142 subject, predicate, obj = triple
143 if subject is None and predicate is None and obj is None:
144 self._spo.clear()
145 self._len = 0
146 if self._pos is not None:
147 self._pos.clear()
148 return
149 sid = self._str_to_id.get(subject) if subject is not None else None
150 pid = self._str_to_id.get(predicate) if predicate is not None else None
151 oid = self._term_to_id.get(obj) if obj is not None else None
152 if subject is not None and sid is None:
153 return
154 if predicate is not None and pid is None:
155 return
156 if obj is not None and oid is None:
157 return
158 if sid is not None and pid is not None and oid is not None:
159 self._remove_triple(sid, pid, oid)
160 return
161 to_remove: list[tuple[int, int, int]] = []
162 spo = self._spo
163 if sid is not None:
164 predicates = spo.get(sid)
165 if predicates is None:
166 return
167 if pid is not None:
168 objects = predicates.get(pid)
169 if objects is None:
170 return
171 to_remove.extend((sid, pid, o) for o in objects)
172 else:
173 for p, objects in predicates.items():
174 if oid is not None:
175 if oid in objects:
176 to_remove.append((sid, p, oid))
177 else:
178 to_remove.extend((sid, p, o) for o in objects)
179 else:
180 for s, predicates in spo.items():
181 if pid is not None:
182 objects = predicates.get(pid)
183 if objects is None:
184 continue
185 if oid is not None:
186 if oid in objects:
187 to_remove.append((s, pid, oid))
188 else:
189 to_remove.extend((s, pid, o) for o in objects)
190 else:
191 for p, objects in predicates.items():
192 if oid is not None:
193 if oid in objects:
194 to_remove.append((s, p, oid))
195 else:
196 to_remove.extend((s, p, o) for o in objects)
197 for s, p, o in to_remove:
198 self._remove_triple(s, p, o)
200 def triples(self, pattern: tuple[str | None, str | None, RDFTerm | None]) -> Iterator[Triple]:
201 subject, predicate, obj = pattern
202 id_to_str = self._id_to_str
203 id_to_term = self._id_to_term
204 sid = self._str_to_id.get(subject) if subject is not None else None
205 pid = self._str_to_id.get(predicate) if predicate is not None else None
206 oid = self._term_to_id.get(obj) if obj is not None else None
207 if subject is not None and sid is None:
208 return
209 if predicate is not None and pid is None:
210 return
211 if obj is not None and oid is None:
212 return
213 if sid is not None:
214 predicates = self._spo.get(sid)
215 if predicates is None:
216 return
217 s_str = id_to_str[sid]
218 if pid is not None:
219 objects = predicates.get(pid)
220 if objects is None:
221 return
222 p_str = id_to_str[pid]
223 if oid is not None:
224 if oid in objects:
225 yield s_str, p_str, id_to_term[oid]
226 else:
227 for o in objects:
228 yield s_str, p_str, id_to_term[o]
229 else:
230 for p, objects in predicates.items():
231 p_str = id_to_str[p]
232 for o in objects:
233 if oid is None or o == oid:
234 yield s_str, p_str, id_to_term[o]
235 else:
236 for s, predicates in self._spo.items():
237 s_str = id_to_str[s]
238 for p, objects in predicates.items():
239 if pid is not None and p != pid:
240 continue
241 p_str = id_to_str[p]
242 for o in objects:
243 if oid is None or o == oid:
244 yield s_str, p_str, id_to_term[o]
246 def objects(self, subject: str | None = None, predicate: str | None = None) -> Iterator[RDFTerm]:
247 id_to_term = self._id_to_term
248 if subject is not None and predicate is not None:
249 sid = self._str_to_id.get(subject)
250 if sid is None:
251 return
252 pid = self._str_to_id.get(predicate)
253 if pid is None:
254 return
255 for oid in self._spo.get(sid, _EMPTY_DICT).get(pid, _EMPTY_SET):
256 yield id_to_term[oid]
257 return
258 if subject is not None:
259 sid = self._str_to_id.get(subject)
260 if sid is None:
261 return
262 for objects in self._spo.get(sid, _EMPTY_DICT).values():
263 for oid in objects:
264 yield id_to_term[oid]
265 return
266 if predicate is not None:
267 pid = self._str_to_id.get(predicate)
268 if pid is None:
269 return
270 for predicates in self._spo.values():
271 for oid in predicates.get(pid, _EMPTY_SET):
272 yield id_to_term[oid]
273 else:
274 for predicates in self._spo.values():
275 for objects in predicates.values():
276 for oid in objects:
277 yield id_to_term[oid]
279 def predicate_objects(self, subject: str | None = None) -> Iterator[tuple[str, RDFTerm]]:
280 id_to_str = self._id_to_str
281 id_to_term = self._id_to_term
282 if subject is not None:
283 sid = self._str_to_id.get(subject)
284 if sid is None:
285 return
286 for pid, objects in self._spo.get(sid, _EMPTY_DICT).items():
287 p_str = id_to_str[pid]
288 for oid in objects:
289 yield p_str, id_to_term[oid]
290 return
291 for predicates in self._spo.values():
292 for pid, objects in predicates.items():
293 p_str = id_to_str[pid]
294 for oid in objects:
295 yield p_str, id_to_term[oid]
297 def subjects(self, predicate: str | None = None, object: RDFTerm | None = None) -> Iterator[str]:
298 id_to_str = self._id_to_str
299 pos = self._pos
300 if pos is not None:
301 pid = self._str_to_id.get(predicate) if predicate is not None else None
302 oid = self._term_to_id.get(object) if object is not None else None
303 if predicate is not None and pid is None:
304 return
305 if object is not None and oid is None:
306 return
307 if pid is not None and oid is not None:
308 for sid in pos.get(pid, _EMPTY_DICT).get(oid, _EMPTY_SET):
309 yield id_to_str[sid]
310 return
311 if pid is not None:
312 for subject_set in pos.get(pid, _EMPTY_DICT).values():
313 for sid in subject_set:
314 yield id_to_str[sid]
315 return
316 if oid is not None:
317 for obj_to_subjects in pos.values():
318 for sid in obj_to_subjects.get(oid, _EMPTY_SET):
319 yield id_to_str[sid]
320 return
321 seen: set[int] = set()
322 for obj_to_subjects in pos.values():
323 for subject_set in obj_to_subjects.values():
324 for sid in subject_set:
325 if sid not in seen:
326 seen.add(sid)
327 yield id_to_str[sid]
328 return
330 if predicate is None and object is None:
331 for sid in self._spo:
332 yield id_to_str[sid]
333 return
334 pid = self._str_to_id.get(predicate) if predicate is not None else None
335 oid = self._term_to_id.get(object) if object is not None else None
336 if predicate is not None and pid is None:
337 return
338 if object is not None and oid is None:
339 return
340 for sid, predicates in self._spo.items():
341 if pid is not None:
342 objects = predicates.get(pid)
343 if objects is not None and (oid is None or oid in objects):
344 yield id_to_str[sid]
345 else:
346 for objects in predicates.values():
347 if oid in objects:
348 yield id_to_str[sid]
349 break
351 def subgraph(self, subject: str) -> TripleLite | None:
352 sid = self._str_to_id.get(subject)
353 if sid is None:
354 return None
355 predicates = self._spo.get(sid)
356 if predicates is None:
357 return None
358 id_to_str = self._id_to_str
359 id_to_term = self._id_to_term
360 graph = TripleLite()
361 graph.add_many(
362 (subject, id_to_str[pid], id_to_term[oid])
363 for pid, objects in predicates.items()
364 for oid in objects
365 )
366 return graph
368 def __contains__(self, triple: tuple[str, str, RDFTerm]) -> bool:
369 subject, predicate, obj = triple
370 sid = self._str_to_id.get(subject)
371 if sid is None:
372 return False
373 pid = self._str_to_id.get(predicate)
374 if pid is None:
375 return False
376 oid = self._term_to_id.get(obj)
377 if oid is None:
378 return False
379 predicates = self._spo.get(sid)
380 if predicates is None:
381 return False
382 objects = predicates.get(pid)
383 if objects is None:
384 return False
385 return oid in objects
387 def __iter__(self) -> Iterator[Triple]:
388 id_to_str = self._id_to_str
389 id_to_term = self._id_to_term
390 for sid, predicates in self._spo.items():
391 s_str = id_to_str[sid]
392 for pid, objects in predicates.items():
393 p_str = id_to_str[pid]
394 for oid in objects:
395 yield s_str, p_str, id_to_term[oid]
397 def __len__(self) -> int:
398 return self._len
400 def to_rdflib(self):
401 return _to_rdflib(self)