Coverage for src / time_agnostic_library / ocdm_converter.py: 100%

247 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-03-21 11:54 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import gzip 

6import re 

7from collections import defaultdict 

8from collections.abc import Callable 

9from concurrent.futures import ThreadPoolExecutor 

10from datetime import datetime 

11from pathlib import Path 

12 

13PROV_NS = "http://www.w3.org/ns/prov#" 

14OCO_NS = "https://w3id.org/oc/ontology/" 

15DCTERMS_NS = "http://purl.org/dc/terms/" 

16XSD_NS = "http://www.w3.org/2001/XMLSchema#" 

17 

18# Regex to parse an N-Triples line into (subject, predicate, object) in a single 

19# C-level pass. Falls back to the character-by-character parser on mismatch. 

20# 

21# N-Triples line format: <subject> <predicate> <object> . 

22# 

23# Group 1 - subject: URI <http://...> or blank node _:id 

24# Group 2 - predicate: always a URI <http://...> 

25# Group 3 - object, one of: 

26# - URI: <http://...> 

27# - literal: "text" optionally followed by @lang or ^^<datatype> 

28# - blank node: _:id 

29_NT_RE = re.compile( 

30 r"(<[^>]+>|_:\S+)\s+" # group 1: subject (URI or blank node) 

31 r"(<[^>]+>)\s+" # group 2: predicate (URI) 

32 r"(<[^>]+>" # group 3 option a: URI object 

33 r'|"(?:[^"\\]|\\.)*"' # group 3 option b: quoted literal (handles escapes) 

34 r"(?:@[a-zA-Z-]+|\^\^<[^>]+>)?" # optional language tag or datatype 

35 r"|_:\S+)" # group 3 option c: blank node object 

36 r"\s*\.\s*$" # trailing dot and whitespace 

37) 

38 

39 

40def parse_ntriples_line( 

41 line: str, 

42 object_normalizer: Callable[[str], str] | None = None, 

43) -> tuple[str, str, str] | None: 

44 line = line.strip() 

45 if not line or line.startswith("#"): 

46 return None 

47 m = _NT_RE.match(line) 

48 if m: 

49 obj = m.group(3) 

50 if object_normalizer: 

51 obj = object_normalizer(obj) 

52 return (m.group(1), m.group(2), obj) 

53 if line.endswith(" ."): 

54 line = line[:-2] 

55 elif line.endswith("."): 

56 line = line[:-1] 

57 line = line.strip() 

58 parts = [] 

59 i = 0 

60 while i < len(line) and len(parts) < 3: 

61 if line[i] == "<": 

62 end = line.index(">", i) 

63 parts.append(line[i:end + 1]) 

64 i = end + 1 

65 elif line[i] == '"': 

66 j = i + 1 

67 while j < len(line): 

68 if line[j] == "\\" and j + 1 < len(line): 

69 j += 2 

70 continue 

71 if line[j] == '"': 

72 break 

73 j += 1 

74 end_quote = j 

75 rest_start = end_quote + 1 

76 if rest_start < len(line) and line[rest_start:rest_start + 2] == "^^": 

77 dt_start = rest_start + 2 

78 if dt_start < len(line) and line[dt_start] == "<": 

79 dt_end = line.index(">", dt_start) 

80 parts.append(line[i:dt_end + 1]) 

81 i = dt_end + 1 

82 else: 

83 space = line.find(" ", dt_start) 

84 if space == -1: 

85 parts.append(line[i:]) 

86 i = len(line) 

87 else: 

88 parts.append(line[i:space]) 

89 i = space 

90 elif rest_start < len(line) and line[rest_start] == "@": 

91 space = line.find(" ", rest_start) 

92 if space == -1: 

93 parts.append(line[i:]) 

94 i = len(line) 

95 else: 

96 parts.append(line[i:space]) 

97 i = space 

98 else: 

99 parts.append(line[i:end_quote + 1]) 

100 i = end_quote + 1 

101 elif line[i] == "_": 

102 space = line.find(" ", i) 

103 if space == -1: 

104 parts.append(line[i:]) 

105 i = len(line) 

106 else: 

107 parts.append(line[i:space]) 

108 i = space 

109 elif line[i] == " " or line[i] == "\t": 

110 i += 1 

111 else: 

112 space = line.find(" ", i) 

113 if space == -1: 

114 parts.append(line[i:]) 

115 i = len(line) 

116 else: 

117 parts.append(line[i:space]) 

118 i = space 

119 if len(parts) == 3: 

120 obj = parts[2] 

121 if object_normalizer: 

122 obj = object_normalizer(obj) 

123 return (parts[0], parts[1], obj) 

124 return None 

125 

126 

127def extract_subject_uri(s_term: str) -> str: 

128 if s_term.startswith("<") and s_term.endswith(">"): 

129 return s_term[1:-1] 

130 return s_term 

131 

132 

133def _open_ntriples(filepath: Path): 

134 if filepath.suffix == ".gz": 

135 return gzip.open(filepath, "rt", encoding="utf-8", errors="replace") 

136 return open(filepath, "r", encoding="utf-8", errors="replace") 

137 

138 

139def read_ntriples_file( 

140 filepath: Path, 

141 object_normalizer: Callable[[str], str] | None = None, 

142) -> list[tuple[str, str, str]]: 

143 triples = [] 

144 with _open_ntriples(filepath) as f: 

145 for line in f: 

146 parsed = parse_ntriples_line(line, object_normalizer) 

147 if parsed: 

148 triples.append(parsed) 

149 return triples 

150 

151 

152def group_triples_by_subject( 

153 triples: list[tuple[str, str, str]], 

154) -> dict[str, set[tuple[str, str]]]: 

155 by_subject: dict[str, set[tuple[str, str]]] = defaultdict(set) 

156 for s, p, o in triples: 

157 uri = extract_subject_uri(s) 

158 by_subject[uri].add((p, o)) 

159 return by_subject 

160 

161 

162def _read_and_group( 

163 filepath: Path, 

164 object_normalizer: Callable[[str], str] | None = None, 

165) -> dict[str, set[tuple[str, str]]]: 

166 by_subject: dict[str, set[tuple[str, str]]] = defaultdict(set) 

167 match = _NT_RE.match 

168 with _open_ntriples(filepath) as f: 

169 if object_normalizer: 

170 for line in f: 

171 m = match(line) 

172 if m: 

173 s, p, obj = m.groups() 

174 obj = object_normalizer(obj) 

175 uri = s[1:-1] if s[0] == "<" else s 

176 by_subject[uri].add((p, obj)) 

177 else: 

178 parsed = parse_ntriples_line(line, object_normalizer) 

179 if parsed: 

180 s, p, o = parsed 

181 uri = s[1:-1] if s[0] == "<" and s[-1] == ">" else s 

182 by_subject[uri].add((p, o)) 

183 else: 

184 for line in f: 

185 m = match(line) 

186 if m: 

187 s, p, obj = m.groups() 

188 uri = s[1:-1] if s[0] == "<" else s 

189 by_subject[uri].add((p, obj)) 

190 else: 

191 parsed = parse_ntriples_line(line, object_normalizer) 

192 if parsed: 

193 s, p, o = parsed 

194 uri = s[1:-1] if s[0] == "<" and s[-1] == ">" else s 

195 by_subject[uri].add((p, o)) 

196 return by_subject 

197 

198 

199def _format_timestamp(dt: datetime) -> str: 

200 return dt.strftime("%Y-%m-%dT%H:%M:%S+00:00") 

201 

202 

203def _build_update_query( 

204 entity_uri: str, 

205 data_graph_uri: str, 

206 deleted_po: set[tuple[str, str]], 

207 added_po: set[tuple[str, str]], 

208) -> str: 

209 parts = [] 

210 if deleted_po: 

211 triples = " ".join(f"<{entity_uri}> {p} {o} ." for p, o in deleted_po) 

212 parts.append(f"DELETE DATA {{ GRAPH <{data_graph_uri}> {{ {triples} }} }}") 

213 if added_po: 

214 triples = " ".join(f"<{entity_uri}> {p} {o} ." for p, o in added_po) 

215 parts.append(f"INSERT DATA {{ GRAPH <{data_graph_uri}> {{ {triples} }} }}") 

216 return "; ".join(parts) 

217 

218 

219def _escape_sparql_for_nquads(query: str) -> str: 

220 escaped = query.replace("\\", "\\\\") 

221 escaped = escaped.replace('"', '\\"') 

222 escaped = escaped.replace("\n", "\\n") 

223 escaped = escaped.replace("\r", "\\r") 

224 escaped = escaped.replace("\t", "\\t") 

225 return escaped 

226 

227 

228class OCDMConverter: 

229 def __init__( 

230 self, 

231 data_graph_uri: str, 

232 agent_uri: str, 

233 object_normalizer: Callable[[str], str] | None = None, 

234 ): 

235 self.data_graph_uri = data_graph_uri 

236 self.agent_uri = agent_uri 

237 self.object_normalizer = object_normalizer 

238 

239 def convert_from_ic( 

240 self, 

241 ic_files: list[Path], 

242 timestamps: list[datetime], 

243 dataset_output: Path, 

244 provenance_output: Path, 

245 ) -> None: 

246 all_entities: set[str] = set() 

247 entity_changes: dict[str, list[tuple[int, set[tuple[str, str]], set[tuple[str, str]]]]] = defaultdict(list) 

248 prev_by_subject: dict[str, set[tuple[str, str]]] = {} 

249 latest_by_subject: dict[str, set[tuple[str, str]]] = {} 

250 

251 # Prefetch pipeline: a single background thread reads and parses the 

252 # next IC file while the main thread diffs the current pair. 

253 with ThreadPoolExecutor(max_workers=1) as executor: 

254 future = executor.submit(_read_and_group, ic_files[0], self.object_normalizer) 

255 

256 for version_idx in range(len(ic_files)): 

257 cur_by_subject = future.result() 

258 

259 if version_idx + 1 < len(ic_files): 

260 future = executor.submit( 

261 _read_and_group, ic_files[version_idx + 1], self.object_normalizer 

262 ) 

263 

264 all_entities.update(cur_by_subject.keys()) 

265 

266 if version_idx > 0: 

267 for entity_uri in prev_by_subject.keys() | cur_by_subject.keys(): 

268 prev_po = prev_by_subject.get(entity_uri, set()) 

269 cur_po = cur_by_subject.get(entity_uri, set()) 

270 deleted_po = prev_po - cur_po 

271 added_po = cur_po - prev_po 

272 if deleted_po or added_po: 

273 entity_changes[entity_uri].append((version_idx, deleted_po, added_po)) 

274 

275 prev_by_subject = cur_by_subject 

276 if version_idx == len(ic_files) - 1: 

277 latest_by_subject = cur_by_subject 

278 

279 self._write_ocdm_output( 

280 all_entities, entity_changes, latest_by_subject, 

281 timestamps, dataset_output, provenance_output, 

282 ) 

283 

284 def convert_from_cb( 

285 self, 

286 initial_snapshot: Path, 

287 changesets: list[tuple[Path, Path]], 

288 timestamps: list[datetime], 

289 dataset_output: Path, 

290 provenance_output: Path, 

291 ) -> None: 

292 all_entities: set[str] = set() 

293 entity_changes: dict[str, list[tuple[int, set[tuple[str, str]], set[tuple[str, str]]]]] = defaultdict(list) 

294 

295 current_state: dict[str, set[tuple[str, str]]] = defaultdict( 

296 set, _read_and_group(initial_snapshot, self.object_normalizer) 

297 ) 

298 all_entities.update(current_state.keys()) 

299 

300 # Read the added and deleted files of each changeset in parallel. 

301 with ThreadPoolExecutor(max_workers=2) as executor: 

302 for changeset_idx, (added_file, deleted_file) in enumerate(changesets): 

303 version_idx = changeset_idx + 1 

304 

305 fut_del = executor.submit(_read_and_group, deleted_file, self.object_normalizer) 

306 fut_add = executor.submit(_read_and_group, added_file, self.object_normalizer) 

307 deleted_by_subject = fut_del.result() 

308 added_by_subject = fut_add.result() 

309 

310 changed_entities = deleted_by_subject.keys() | added_by_subject.keys() 

311 all_entities.update(changed_entities) 

312 

313 for entity_uri in changed_entities: 

314 deleted_po = deleted_by_subject.get(entity_uri, set()) 

315 added_po = added_by_subject.get(entity_uri, set()) 

316 

317 current_state[entity_uri] -= deleted_po 

318 current_state[entity_uri] |= added_po 

319 

320 if not current_state[entity_uri]: 

321 del current_state[entity_uri] 

322 

323 if deleted_po or added_po: 

324 entity_changes[entity_uri].append((version_idx, deleted_po, added_po)) 

325 

326 self._write_ocdm_output( 

327 all_entities, entity_changes, current_state, 

328 timestamps, dataset_output, provenance_output, 

329 ) 

330 

331 def _write_ocdm_output( 

332 self, 

333 all_entities: set[str], 

334 entity_changes: dict[str, list[tuple[int, set[tuple[str, str]], set[tuple[str, str]]]]], 

335 latest_by_subject: dict[str, set[tuple[str, str]]], 

336 timestamps: list[datetime], 

337 dataset_output: Path, 

338 provenance_output: Path, 

339 ) -> None: 

340 dataset_output.parent.mkdir(parents=True, exist_ok=True) 

341 provenance_output.parent.mkdir(parents=True, exist_ok=True) 

342 sorted_entities = sorted(all_entities) 

343 

344 lines = [] 

345 for entity_uri in sorted_entities: 

346 po_set = latest_by_subject.get(entity_uri, set()) 

347 for p, o in sorted(po_set): 

348 lines.append(f"<{entity_uri}> {p} {o} <{self.data_graph_uri}> .\n") 

349 with open(dataset_output, "w", encoding="utf-8") as f: 

350 f.writelines(lines) 

351 

352 lines = [] 

353 for entity_uri in sorted_entities: 

354 prov_graph = f"<{entity_uri}/prov/>" 

355 changes = entity_changes.get(entity_uri, []) 

356 

357 se1_uri = f"<{entity_uri}/prov/se/1>" 

358 t0 = _format_timestamp(timestamps[0]) 

359 

360 lines.append(f'{se1_uri} <{PROV_NS}specializationOf> <{entity_uri}> {prov_graph} .\n') 

361 lines.append(f'{se1_uri} <{PROV_NS}generatedAtTime> "{t0}"^^<{XSD_NS}dateTime> {prov_graph} .\n') 

362 lines.append(f'{se1_uri} <{PROV_NS}wasAttributedTo> <{self.agent_uri}> {prov_graph} .\n') 

363 lines.append(f'{se1_uri} <{DCTERMS_NS}description> "The entity has been created." {prov_graph} .\n') 

364 

365 for change_idx, (version_idx, deleted_po, added_po) in enumerate(changes): 

366 se_num = change_idx + 2 

367 se_uri = f"<{entity_uri}/prov/se/{se_num}>" 

368 timestamp = _format_timestamp(timestamps[version_idx]) 

369 

370 lines.append(f'{se_uri} <{PROV_NS}specializationOf> <{entity_uri}> {prov_graph} .\n') 

371 lines.append(f'{se_uri} <{PROV_NS}generatedAtTime> "{timestamp}"^^<{XSD_NS}dateTime> {prov_graph} .\n') 

372 lines.append(f'{se_uri} <{PROV_NS}wasAttributedTo> <{self.agent_uri}> {prov_graph} .\n') 

373 

374 update_query = _build_update_query(entity_uri, self.data_graph_uri, deleted_po, added_po) 

375 escaped_query = _escape_sparql_for_nquads(update_query) 

376 lines.append(f'{se_uri} <{OCO_NS}hasUpdateQuery> "{escaped_query}" {prov_graph} .\n') 

377 lines.append(f'{se_uri} <{DCTERMS_NS}description> "The entity has been modified." {prov_graph} .\n') 

378 

379 prev_se_uri = f"<{entity_uri}/prov/se/{se_num - 1}>" 

380 lines.append(f'{se_uri} <{PROV_NS}wasDerivedFrom> {prev_se_uri} {prov_graph} .\n') 

381 

382 if change_idx == len(changes) - 1 and entity_uri not in latest_by_subject: 

383 lines.append(f'{se_uri} <{PROV_NS}invalidatedAtTime> "{timestamp}"^^<{XSD_NS}dateTime> {prov_graph} .\n') 

384 

385 with open(provenance_output, "w", encoding="utf-8") as f: 

386 f.writelines(lines)