Coverage for triplelite / _graph.py: 92%

342 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 16:42 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7from collections.abc import Iterable 

8from typing import Iterator 

9 

10from triplelite._rdflib_bridge import to_rdflib as _to_rdflib 

11from triplelite._types import RDFTerm, Triple, _InternalPOS, _InternalSPO 

12 

13_EMPTY_DICT: dict = {} 

14_EMPTY_SET: set = set() 

15 

16 

17class TripleLite: 

18 __slots__ = ( 

19 "_spo", 

20 "_pos", 

21 "_indexed_predicates", 

22 "_len", 

23 "identifier", 

24 "_str_to_id", 

25 "_id_to_str", 

26 "_term_to_id", 

27 "_id_to_term", 

28 ) 

29 

30 def __init__( 

31 self, 

32 identifier: str | None = None, 

33 reverse_index_predicates: frozenset[str] | None = None, 

34 ) -> None: 

35 self._spo: _InternalSPO = {} 

36 self._len: int = 0 

37 self.identifier: str | None = identifier 

38 self._str_to_id: dict[str, int] = {} 

39 self._id_to_str: list[str] = [] 

40 self._term_to_id: dict[RDFTerm, int] = {} 

41 self._id_to_term: list[RDFTerm] = [] 

42 if reverse_index_predicates is not None: 

43 self._indexed_predicates: frozenset[int] | None = frozenset( 

44 self._intern_str(p) for p in reverse_index_predicates 

45 ) 

46 self._pos: _InternalPOS | None = {} 

47 else: 

48 self._indexed_predicates = None 

49 self._pos = None 

50 

51 def _intern_str(self, s: str) -> int: 

52 sid = self._str_to_id.get(s) 

53 if sid is not None: 

54 return sid 

55 sid = len(self._id_to_str) 

56 self._str_to_id[s] = sid 

57 self._id_to_str.append(s) 

58 return sid 

59 

60 def _intern_term(self, t: RDFTerm) -> int: 

61 tid = self._term_to_id.get(t) 

62 if tid is not None: 

63 return tid 

64 tid = len(self._id_to_term) 

65 self._term_to_id[t] = tid 

66 self._id_to_term.append(t) 

67 return tid 

68 

69 def add(self, triple: tuple[str, str, RDFTerm]) -> None: 

70 subject, predicate, obj = triple 

71 sid = self._intern_str(subject) 

72 pid = self._intern_str(predicate) 

73 oid = self._intern_term(obj) 

74 objects = self._spo.setdefault(sid, {}).setdefault(pid, set()) 

75 if oid not in objects: 

76 objects.add(oid) 

77 self._len += 1 

78 if self._pos is not None: 

79 indexed = self._indexed_predicates 

80 if not indexed or pid in indexed: 

81 self._pos.setdefault(pid, {}).setdefault(oid, set()).add(sid) 

82 

83 def add_many(self, triples: Iterable[tuple[str, str, RDFTerm]]) -> None: 

84 spo = self._spo 

85 pos = self._pos 

86 indexed = self._indexed_predicates 

87 spo_setdefault = spo.setdefault 

88 intern_str = self._intern_str 

89 intern_term = self._intern_term 

90 count = self._len 

91 if pos is not None: 

92 pos_setdefault = pos.setdefault 

93 check_indexed = bool(indexed) 

94 for subject, predicate, obj in triples: 

95 sid = intern_str(subject) 

96 pid = intern_str(predicate) 

97 oid = intern_term(obj) 

98 objects = spo_setdefault(sid, {}).setdefault(pid, set()) 

99 if oid not in objects: 

100 objects.add(oid) 

101 count += 1 

102 if not check_indexed or pid in indexed: 

103 pos_setdefault(pid, {}).setdefault(oid, set()).add(sid) 

104 else: 

105 for subject, predicate, obj in triples: 

106 sid = intern_str(subject) 

107 pid = intern_str(predicate) 

108 oid = intern_term(obj) 

109 objects = spo_setdefault(sid, {}).setdefault(pid, set()) 

110 if oid not in objects: 

111 objects.add(oid) 

112 count += 1 

113 self._len = count 

114 

115 def _remove_triple(self, sid: int, pid: int, oid: int) -> None: 

116 predicates = self._spo.get(sid) 

117 if predicates is None: 

118 return 

119 objects = predicates.get(pid) 

120 if objects is None: 

121 return 

122 if oid not in objects: 

123 return 

124 objects.discard(oid) 

125 self._len -= 1 

126 if not objects: 

127 del predicates[pid] 

128 if not predicates: 

129 del self._spo[sid] 

130 if self._pos is not None: 

131 obj_to_subjects = self._pos.get(pid) 

132 if obj_to_subjects is not None: 

133 subjects = obj_to_subjects.get(oid) 

134 if subjects is not None: 

135 subjects.discard(sid) 

136 if not subjects: 

137 del obj_to_subjects[oid] 

138 if not obj_to_subjects: 

139 del self._pos[pid] 

140 

141 def remove(self, triple: tuple[str | None, str | None, RDFTerm | None]) -> None: 

142 subject, predicate, obj = triple 

143 if subject is None and predicate is None and obj is None: 

144 self._spo.clear() 

145 self._len = 0 

146 if self._pos is not None: 

147 self._pos.clear() 

148 return 

149 sid = self._str_to_id.get(subject) if subject is not None else None 

150 pid = self._str_to_id.get(predicate) if predicate is not None else None 

151 oid = self._term_to_id.get(obj) if obj is not None else None 

152 if subject is not None and sid is None: 

153 return 

154 if predicate is not None and pid is None: 

155 return 

156 if obj is not None and oid is None: 

157 return 

158 if sid is not None and pid is not None and oid is not None: 

159 self._remove_triple(sid, pid, oid) 

160 return 

161 to_remove: list[tuple[int, int, int]] = [] 

162 spo = self._spo 

163 if sid is not None: 

164 predicates = spo.get(sid) 

165 if predicates is None: 

166 return 

167 if pid is not None: 

168 objects = predicates.get(pid) 

169 if objects is None: 

170 return 

171 to_remove.extend((sid, pid, o) for o in objects) 

172 else: 

173 for p, objects in predicates.items(): 

174 if oid is not None: 

175 if oid in objects: 

176 to_remove.append((sid, p, oid)) 

177 else: 

178 to_remove.extend((sid, p, o) for o in objects) 

179 else: 

180 for s, predicates in spo.items(): 

181 if pid is not None: 

182 objects = predicates.get(pid) 

183 if objects is None: 

184 continue 

185 if oid is not None: 

186 if oid in objects: 

187 to_remove.append((s, pid, oid)) 

188 else: 

189 to_remove.extend((s, pid, o) for o in objects) 

190 else: 

191 for p, objects in predicates.items(): 

192 if oid is not None: 

193 if oid in objects: 

194 to_remove.append((s, p, oid)) 

195 else: 

196 to_remove.extend((s, p, o) for o in objects) 

197 for s, p, o in to_remove: 

198 self._remove_triple(s, p, o) 

199 

200 def triples(self, pattern: tuple[str | None, str | None, RDFTerm | None]) -> Iterator[Triple]: 

201 subject, predicate, obj = pattern 

202 id_to_str = self._id_to_str 

203 id_to_term = self._id_to_term 

204 sid = self._str_to_id.get(subject) if subject is not None else None 

205 pid = self._str_to_id.get(predicate) if predicate is not None else None 

206 oid = self._term_to_id.get(obj) if obj is not None else None 

207 if subject is not None and sid is None: 

208 return 

209 if predicate is not None and pid is None: 

210 return 

211 if obj is not None and oid is None: 

212 return 

213 if sid is not None: 

214 predicates = self._spo.get(sid) 

215 if predicates is None: 

216 return 

217 s_str = id_to_str[sid] 

218 if pid is not None: 

219 objects = predicates.get(pid) 

220 if objects is None: 

221 return 

222 p_str = id_to_str[pid] 

223 if oid is not None: 

224 if oid in objects: 

225 yield s_str, p_str, id_to_term[oid] 

226 else: 

227 for o in objects: 

228 yield s_str, p_str, id_to_term[o] 

229 else: 

230 for p, objects in predicates.items(): 

231 p_str = id_to_str[p] 

232 for o in objects: 

233 if oid is None or o == oid: 

234 yield s_str, p_str, id_to_term[o] 

235 else: 

236 for s, predicates in self._spo.items(): 

237 s_str = id_to_str[s] 

238 for p, objects in predicates.items(): 

239 if pid is not None and p != pid: 

240 continue 

241 p_str = id_to_str[p] 

242 for o in objects: 

243 if oid is None or o == oid: 

244 yield s_str, p_str, id_to_term[o] 

245 

246 def objects(self, subject: str | None = None, predicate: str | None = None) -> Iterator[RDFTerm]: 

247 id_to_term = self._id_to_term 

248 if subject is not None and predicate is not None: 

249 sid = self._str_to_id.get(subject) 

250 if sid is None: 

251 return 

252 pid = self._str_to_id.get(predicate) 

253 if pid is None: 

254 return 

255 for oid in self._spo.get(sid, _EMPTY_DICT).get(pid, _EMPTY_SET): 

256 yield id_to_term[oid] 

257 return 

258 if subject is not None: 

259 sid = self._str_to_id.get(subject) 

260 if sid is None: 

261 return 

262 for objects in self._spo.get(sid, _EMPTY_DICT).values(): 

263 for oid in objects: 

264 yield id_to_term[oid] 

265 return 

266 if predicate is not None: 

267 pid = self._str_to_id.get(predicate) 

268 if pid is None: 

269 return 

270 for predicates in self._spo.values(): 

271 for oid in predicates.get(pid, _EMPTY_SET): 

272 yield id_to_term[oid] 

273 else: 

274 for predicates in self._spo.values(): 

275 for objects in predicates.values(): 

276 for oid in objects: 

277 yield id_to_term[oid] 

278 

279 def predicate_objects(self, subject: str | None = None) -> Iterator[tuple[str, RDFTerm]]: 

280 id_to_str = self._id_to_str 

281 id_to_term = self._id_to_term 

282 if subject is not None: 

283 sid = self._str_to_id.get(subject) 

284 if sid is None: 

285 return 

286 for pid, objects in self._spo.get(sid, _EMPTY_DICT).items(): 

287 p_str = id_to_str[pid] 

288 for oid in objects: 

289 yield p_str, id_to_term[oid] 

290 return 

291 for predicates in self._spo.values(): 

292 for pid, objects in predicates.items(): 

293 p_str = id_to_str[pid] 

294 for oid in objects: 

295 yield p_str, id_to_term[oid] 

296 

297 def subjects(self, predicate: str | None = None, object: RDFTerm | None = None) -> Iterator[str]: 

298 id_to_str = self._id_to_str 

299 pos = self._pos 

300 if pos is not None: 

301 pid = self._str_to_id.get(predicate) if predicate is not None else None 

302 oid = self._term_to_id.get(object) if object is not None else None 

303 if predicate is not None and pid is None: 

304 return 

305 if object is not None and oid is None: 

306 return 

307 if pid is not None and oid is not None: 

308 for sid in pos.get(pid, _EMPTY_DICT).get(oid, _EMPTY_SET): 

309 yield id_to_str[sid] 

310 return 

311 if pid is not None: 

312 for subject_set in pos.get(pid, _EMPTY_DICT).values(): 

313 for sid in subject_set: 

314 yield id_to_str[sid] 

315 return 

316 if oid is not None: 

317 for obj_to_subjects in pos.values(): 

318 for sid in obj_to_subjects.get(oid, _EMPTY_SET): 

319 yield id_to_str[sid] 

320 return 

321 seen: set[int] = set() 

322 for obj_to_subjects in pos.values(): 

323 for subject_set in obj_to_subjects.values(): 

324 for sid in subject_set: 

325 if sid not in seen: 

326 seen.add(sid) 

327 yield id_to_str[sid] 

328 return 

329 

330 if predicate is None and object is None: 

331 for sid in self._spo: 

332 yield id_to_str[sid] 

333 return 

334 pid = self._str_to_id.get(predicate) if predicate is not None else None 

335 oid = self._term_to_id.get(object) if object is not None else None 

336 if predicate is not None and pid is None: 

337 return 

338 if object is not None and oid is None: 

339 return 

340 for sid, predicates in self._spo.items(): 

341 if pid is not None: 

342 objects = predicates.get(pid) 

343 if objects is not None and (oid is None or oid in objects): 

344 yield id_to_str[sid] 

345 else: 

346 for objects in predicates.values(): 

347 if oid in objects: 

348 yield id_to_str[sid] 

349 break 

350 

351 def subgraph(self, subject: str) -> TripleLite | None: 

352 sid = self._str_to_id.get(subject) 

353 if sid is None: 

354 return None 

355 predicates = self._spo.get(sid) 

356 if predicates is None: 

357 return None 

358 id_to_str = self._id_to_str 

359 id_to_term = self._id_to_term 

360 graph = TripleLite() 

361 graph.add_many( 

362 (subject, id_to_str[pid], id_to_term[oid]) 

363 for pid, objects in predicates.items() 

364 for oid in objects 

365 ) 

366 return graph 

367 

368 def __contains__(self, triple: tuple[str, str, RDFTerm]) -> bool: 

369 subject, predicate, obj = triple 

370 sid = self._str_to_id.get(subject) 

371 if sid is None: 

372 return False 

373 pid = self._str_to_id.get(predicate) 

374 if pid is None: 

375 return False 

376 oid = self._term_to_id.get(obj) 

377 if oid is None: 

378 return False 

379 predicates = self._spo.get(sid) 

380 if predicates is None: 

381 return False 

382 objects = predicates.get(pid) 

383 if objects is None: 

384 return False 

385 return oid in objects 

386 

387 def __iter__(self) -> Iterator[Triple]: 

388 id_to_str = self._id_to_str 

389 id_to_term = self._id_to_term 

390 for sid, predicates in self._spo.items(): 

391 s_str = id_to_str[sid] 

392 for pid, objects in predicates.items(): 

393 p_str = id_to_str[pid] 

394 for oid in objects: 

395 yield s_str, p_str, id_to_term[oid] 

396 

397 def __len__(self) -> int: 

398 return self._len 

399 

400 def to_rdflib(self): 

401 return _to_rdflib(self)