Coverage for src / time_agnostic_library / ocdm_converter.py: 100%
247 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-03-21 11:54 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-03-21 11:54 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import gzip
6import re
7from collections import defaultdict
8from collections.abc import Callable
9from concurrent.futures import ThreadPoolExecutor
10from datetime import datetime
11from pathlib import Path
13PROV_NS = "http://www.w3.org/ns/prov#"
14OCO_NS = "https://w3id.org/oc/ontology/"
15DCTERMS_NS = "http://purl.org/dc/terms/"
16XSD_NS = "http://www.w3.org/2001/XMLSchema#"
18# Regex to parse an N-Triples line into (subject, predicate, object) in a single
19# C-level pass. Falls back to the character-by-character parser on mismatch.
20#
21# N-Triples line format: <subject> <predicate> <object> .
22#
23# Group 1 - subject: URI <http://...> or blank node _:id
24# Group 2 - predicate: always a URI <http://...>
25# Group 3 - object, one of:
26# - URI: <http://...>
27# - literal: "text" optionally followed by @lang or ^^<datatype>
28# - blank node: _:id
29_NT_RE = re.compile(
30 r"(<[^>]+>|_:\S+)\s+" # group 1: subject (URI or blank node)
31 r"(<[^>]+>)\s+" # group 2: predicate (URI)
32 r"(<[^>]+>" # group 3 option a: URI object
33 r'|"(?:[^"\\]|\\.)*"' # group 3 option b: quoted literal (handles escapes)
34 r"(?:@[a-zA-Z-]+|\^\^<[^>]+>)?" # optional language tag or datatype
35 r"|_:\S+)" # group 3 option c: blank node object
36 r"\s*\.\s*$" # trailing dot and whitespace
37)
40def parse_ntriples_line(
41 line: str,
42 object_normalizer: Callable[[str], str] | None = None,
43) -> tuple[str, str, str] | None:
44 line = line.strip()
45 if not line or line.startswith("#"):
46 return None
47 m = _NT_RE.match(line)
48 if m:
49 obj = m.group(3)
50 if object_normalizer:
51 obj = object_normalizer(obj)
52 return (m.group(1), m.group(2), obj)
53 if line.endswith(" ."):
54 line = line[:-2]
55 elif line.endswith("."):
56 line = line[:-1]
57 line = line.strip()
58 parts = []
59 i = 0
60 while i < len(line) and len(parts) < 3:
61 if line[i] == "<":
62 end = line.index(">", i)
63 parts.append(line[i:end + 1])
64 i = end + 1
65 elif line[i] == '"':
66 j = i + 1
67 while j < len(line):
68 if line[j] == "\\" and j + 1 < len(line):
69 j += 2
70 continue
71 if line[j] == '"':
72 break
73 j += 1
74 end_quote = j
75 rest_start = end_quote + 1
76 if rest_start < len(line) and line[rest_start:rest_start + 2] == "^^":
77 dt_start = rest_start + 2
78 if dt_start < len(line) and line[dt_start] == "<":
79 dt_end = line.index(">", dt_start)
80 parts.append(line[i:dt_end + 1])
81 i = dt_end + 1
82 else:
83 space = line.find(" ", dt_start)
84 if space == -1:
85 parts.append(line[i:])
86 i = len(line)
87 else:
88 parts.append(line[i:space])
89 i = space
90 elif rest_start < len(line) and line[rest_start] == "@":
91 space = line.find(" ", rest_start)
92 if space == -1:
93 parts.append(line[i:])
94 i = len(line)
95 else:
96 parts.append(line[i:space])
97 i = space
98 else:
99 parts.append(line[i:end_quote + 1])
100 i = end_quote + 1
101 elif line[i] == "_":
102 space = line.find(" ", i)
103 if space == -1:
104 parts.append(line[i:])
105 i = len(line)
106 else:
107 parts.append(line[i:space])
108 i = space
109 elif line[i] == " " or line[i] == "\t":
110 i += 1
111 else:
112 space = line.find(" ", i)
113 if space == -1:
114 parts.append(line[i:])
115 i = len(line)
116 else:
117 parts.append(line[i:space])
118 i = space
119 if len(parts) == 3:
120 obj = parts[2]
121 if object_normalizer:
122 obj = object_normalizer(obj)
123 return (parts[0], parts[1], obj)
124 return None
127def extract_subject_uri(s_term: str) -> str:
128 if s_term.startswith("<") and s_term.endswith(">"):
129 return s_term[1:-1]
130 return s_term
133def _open_ntriples(filepath: Path):
134 if filepath.suffix == ".gz":
135 return gzip.open(filepath, "rt", encoding="utf-8", errors="replace")
136 return open(filepath, "r", encoding="utf-8", errors="replace")
139def read_ntriples_file(
140 filepath: Path,
141 object_normalizer: Callable[[str], str] | None = None,
142) -> list[tuple[str, str, str]]:
143 triples = []
144 with _open_ntriples(filepath) as f:
145 for line in f:
146 parsed = parse_ntriples_line(line, object_normalizer)
147 if parsed:
148 triples.append(parsed)
149 return triples
152def group_triples_by_subject(
153 triples: list[tuple[str, str, str]],
154) -> dict[str, set[tuple[str, str]]]:
155 by_subject: dict[str, set[tuple[str, str]]] = defaultdict(set)
156 for s, p, o in triples:
157 uri = extract_subject_uri(s)
158 by_subject[uri].add((p, o))
159 return by_subject
162def _read_and_group(
163 filepath: Path,
164 object_normalizer: Callable[[str], str] | None = None,
165) -> dict[str, set[tuple[str, str]]]:
166 by_subject: dict[str, set[tuple[str, str]]] = defaultdict(set)
167 match = _NT_RE.match
168 with _open_ntriples(filepath) as f:
169 if object_normalizer:
170 for line in f:
171 m = match(line)
172 if m:
173 s, p, obj = m.groups()
174 obj = object_normalizer(obj)
175 uri = s[1:-1] if s[0] == "<" else s
176 by_subject[uri].add((p, obj))
177 else:
178 parsed = parse_ntriples_line(line, object_normalizer)
179 if parsed:
180 s, p, o = parsed
181 uri = s[1:-1] if s[0] == "<" and s[-1] == ">" else s
182 by_subject[uri].add((p, o))
183 else:
184 for line in f:
185 m = match(line)
186 if m:
187 s, p, obj = m.groups()
188 uri = s[1:-1] if s[0] == "<" else s
189 by_subject[uri].add((p, obj))
190 else:
191 parsed = parse_ntriples_line(line, object_normalizer)
192 if parsed:
193 s, p, o = parsed
194 uri = s[1:-1] if s[0] == "<" and s[-1] == ">" else s
195 by_subject[uri].add((p, o))
196 return by_subject
199def _format_timestamp(dt: datetime) -> str:
200 return dt.strftime("%Y-%m-%dT%H:%M:%S+00:00")
203def _build_update_query(
204 entity_uri: str,
205 data_graph_uri: str,
206 deleted_po: set[tuple[str, str]],
207 added_po: set[tuple[str, str]],
208) -> str:
209 parts = []
210 if deleted_po:
211 triples = " ".join(f"<{entity_uri}> {p} {o} ." for p, o in deleted_po)
212 parts.append(f"DELETE DATA {{ GRAPH <{data_graph_uri}> {{ {triples} }} }}")
213 if added_po:
214 triples = " ".join(f"<{entity_uri}> {p} {o} ." for p, o in added_po)
215 parts.append(f"INSERT DATA {{ GRAPH <{data_graph_uri}> {{ {triples} }} }}")
216 return "; ".join(parts)
219def _escape_sparql_for_nquads(query: str) -> str:
220 escaped = query.replace("\\", "\\\\")
221 escaped = escaped.replace('"', '\\"')
222 escaped = escaped.replace("\n", "\\n")
223 escaped = escaped.replace("\r", "\\r")
224 escaped = escaped.replace("\t", "\\t")
225 return escaped
228class OCDMConverter:
229 def __init__(
230 self,
231 data_graph_uri: str,
232 agent_uri: str,
233 object_normalizer: Callable[[str], str] | None = None,
234 ):
235 self.data_graph_uri = data_graph_uri
236 self.agent_uri = agent_uri
237 self.object_normalizer = object_normalizer
239 def convert_from_ic(
240 self,
241 ic_files: list[Path],
242 timestamps: list[datetime],
243 dataset_output: Path,
244 provenance_output: Path,
245 ) -> None:
246 all_entities: set[str] = set()
247 entity_changes: dict[str, list[tuple[int, set[tuple[str, str]], set[tuple[str, str]]]]] = defaultdict(list)
248 prev_by_subject: dict[str, set[tuple[str, str]]] = {}
249 latest_by_subject: dict[str, set[tuple[str, str]]] = {}
251 # Prefetch pipeline: a single background thread reads and parses the
252 # next IC file while the main thread diffs the current pair.
253 with ThreadPoolExecutor(max_workers=1) as executor:
254 future = executor.submit(_read_and_group, ic_files[0], self.object_normalizer)
256 for version_idx in range(len(ic_files)):
257 cur_by_subject = future.result()
259 if version_idx + 1 < len(ic_files):
260 future = executor.submit(
261 _read_and_group, ic_files[version_idx + 1], self.object_normalizer
262 )
264 all_entities.update(cur_by_subject.keys())
266 if version_idx > 0:
267 for entity_uri in prev_by_subject.keys() | cur_by_subject.keys():
268 prev_po = prev_by_subject.get(entity_uri, set())
269 cur_po = cur_by_subject.get(entity_uri, set())
270 deleted_po = prev_po - cur_po
271 added_po = cur_po - prev_po
272 if deleted_po or added_po:
273 entity_changes[entity_uri].append((version_idx, deleted_po, added_po))
275 prev_by_subject = cur_by_subject
276 if version_idx == len(ic_files) - 1:
277 latest_by_subject = cur_by_subject
279 self._write_ocdm_output(
280 all_entities, entity_changes, latest_by_subject,
281 timestamps, dataset_output, provenance_output,
282 )
284 def convert_from_cb(
285 self,
286 initial_snapshot: Path,
287 changesets: list[tuple[Path, Path]],
288 timestamps: list[datetime],
289 dataset_output: Path,
290 provenance_output: Path,
291 ) -> None:
292 all_entities: set[str] = set()
293 entity_changes: dict[str, list[tuple[int, set[tuple[str, str]], set[tuple[str, str]]]]] = defaultdict(list)
295 current_state: dict[str, set[tuple[str, str]]] = defaultdict(
296 set, _read_and_group(initial_snapshot, self.object_normalizer)
297 )
298 all_entities.update(current_state.keys())
300 # Read the added and deleted files of each changeset in parallel.
301 with ThreadPoolExecutor(max_workers=2) as executor:
302 for changeset_idx, (added_file, deleted_file) in enumerate(changesets):
303 version_idx = changeset_idx + 1
305 fut_del = executor.submit(_read_and_group, deleted_file, self.object_normalizer)
306 fut_add = executor.submit(_read_and_group, added_file, self.object_normalizer)
307 deleted_by_subject = fut_del.result()
308 added_by_subject = fut_add.result()
310 changed_entities = deleted_by_subject.keys() | added_by_subject.keys()
311 all_entities.update(changed_entities)
313 for entity_uri in changed_entities:
314 deleted_po = deleted_by_subject.get(entity_uri, set())
315 added_po = added_by_subject.get(entity_uri, set())
317 current_state[entity_uri] -= deleted_po
318 current_state[entity_uri] |= added_po
320 if not current_state[entity_uri]:
321 del current_state[entity_uri]
323 if deleted_po or added_po:
324 entity_changes[entity_uri].append((version_idx, deleted_po, added_po))
326 self._write_ocdm_output(
327 all_entities, entity_changes, current_state,
328 timestamps, dataset_output, provenance_output,
329 )
331 def _write_ocdm_output(
332 self,
333 all_entities: set[str],
334 entity_changes: dict[str, list[tuple[int, set[tuple[str, str]], set[tuple[str, str]]]]],
335 latest_by_subject: dict[str, set[tuple[str, str]]],
336 timestamps: list[datetime],
337 dataset_output: Path,
338 provenance_output: Path,
339 ) -> None:
340 dataset_output.parent.mkdir(parents=True, exist_ok=True)
341 provenance_output.parent.mkdir(parents=True, exist_ok=True)
342 sorted_entities = sorted(all_entities)
344 lines = []
345 for entity_uri in sorted_entities:
346 po_set = latest_by_subject.get(entity_uri, set())
347 for p, o in sorted(po_set):
348 lines.append(f"<{entity_uri}> {p} {o} <{self.data_graph_uri}> .\n")
349 with open(dataset_output, "w", encoding="utf-8") as f:
350 f.writelines(lines)
352 lines = []
353 for entity_uri in sorted_entities:
354 prov_graph = f"<{entity_uri}/prov/>"
355 changes = entity_changes.get(entity_uri, [])
357 se1_uri = f"<{entity_uri}/prov/se/1>"
358 t0 = _format_timestamp(timestamps[0])
360 lines.append(f'{se1_uri} <{PROV_NS}specializationOf> <{entity_uri}> {prov_graph} .\n')
361 lines.append(f'{se1_uri} <{PROV_NS}generatedAtTime> "{t0}"^^<{XSD_NS}dateTime> {prov_graph} .\n')
362 lines.append(f'{se1_uri} <{PROV_NS}wasAttributedTo> <{self.agent_uri}> {prov_graph} .\n')
363 lines.append(f'{se1_uri} <{DCTERMS_NS}description> "The entity has been created." {prov_graph} .\n')
365 for change_idx, (version_idx, deleted_po, added_po) in enumerate(changes):
366 se_num = change_idx + 2
367 se_uri = f"<{entity_uri}/prov/se/{se_num}>"
368 timestamp = _format_timestamp(timestamps[version_idx])
370 lines.append(f'{se_uri} <{PROV_NS}specializationOf> <{entity_uri}> {prov_graph} .\n')
371 lines.append(f'{se_uri} <{PROV_NS}generatedAtTime> "{timestamp}"^^<{XSD_NS}dateTime> {prov_graph} .\n')
372 lines.append(f'{se_uri} <{PROV_NS}wasAttributedTo> <{self.agent_uri}> {prov_graph} .\n')
374 update_query = _build_update_query(entity_uri, self.data_graph_uri, deleted_po, added_po)
375 escaped_query = _escape_sparql_for_nquads(update_query)
376 lines.append(f'{se_uri} <{OCO_NS}hasUpdateQuery> "{escaped_query}" {prov_graph} .\n')
377 lines.append(f'{se_uri} <{DCTERMS_NS}description> "The entity has been modified." {prov_graph} .\n')
379 prev_se_uri = f"<{entity_uri}/prov/se/{se_num - 1}>"
380 lines.append(f'{se_uri} <{PROV_NS}wasDerivedFrom> {prev_se_uri} {prov_graph} .\n')
382 if change_idx == len(changes) - 1 and entity_uri not in latest_by_subject:
383 lines.append(f'{se_uri} <{PROV_NS}invalidatedAtTime> "{timestamp}"^^<{XSD_NS}dateTime> {prov_graph} .\n')
385 with open(provenance_output, "w", encoding="utf-8") as f:
386 f.writelines(lines)