Coverage for oc_meta / run / migration / rdf_from_export.py: 0%
332 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7import argparse
8import gzip
9import logging
10import multiprocessing
11import os
12import re
13import time
14import uuid
15from functools import lru_cache
16from zipfile import ZIP_DEFLATED, ZipFile
18import orjson
19from rdflib import Dataset, URIRef
20from rdflib.exceptions import ParserError
21from rich_argparse import RichHelpFormatter
22from tqdm import tqdm
24# Variable used in several functions
25entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))$"
26prov_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))/prov/([a-z][a-z])/([1-9][0-9]*)$"
28@lru_cache(maxsize=1024)
29def _get_match_cached(regex: str, group: int, string: str) -> str:
30 match = re.match(regex, string)
31 if match is not None:
32 return match.group(group)
33 else:
34 return ""
36def get_base_iri(res: URIRef) -> str:
37 string_iri: str = str(res)
38 if "/prov/" in string_iri:
39 return _get_match_cached(prov_regex, 1, string_iri)
40 else:
41 return _get_match_cached(entity_regex, 1, string_iri)
43def get_short_name(res: URIRef) -> str:
44 string_iri: str = str(res)
45 if "/prov/" in string_iri:
46 return _get_match_cached(prov_regex, 5, string_iri)
47 else:
48 return _get_match_cached(entity_regex, 2, string_iri)
50def get_prov_subject_short_name(prov_res: URIRef) -> str:
51 string_iri: str = str(prov_res)
52 if "/prov/" in string_iri:
53 return _get_match_cached(prov_regex, 2, string_iri)
54 else:
55 return "" # non-provenance entities do not have a prov_subject!
57def get_prefix(res: URIRef) -> str:
58 string_iri: str = str(res)
59 if "/prov/" in string_iri:
60 return "" # provenance entities cannot have a supplier prefix
61 else:
62 return _get_match_cached(entity_regex, 3, string_iri)
64def get_prov_subject_prefix(prov_res: URIRef) -> str:
65 string_iri: str = str(prov_res)
66 if "/prov/" in string_iri:
67 return _get_match_cached(prov_regex, 3, string_iri)
68 else:
69 return "" # non-provenance entities do not have a prov_subject!
71def get_count(res: URIRef) -> str:
72 string_iri: str = str(res)
73 if "/prov/" in string_iri:
74 return _get_match_cached(prov_regex, 6, string_iri)
75 else:
76 return _get_match_cached(entity_regex, 4, string_iri)
78def get_prov_subject_count(prov_res: URIRef) -> str:
79 string_iri: str = str(prov_res)
80 if "/prov/" in string_iri:
81 return _get_match_cached(prov_regex, 4, string_iri)
82 else:
83 return "" # non-provenance entities do not have a prov_subject!
85def get_resource_number(res: URIRef) -> int:
86 string_iri: str = str(res)
87 if "/prov/" in string_iri:
88 match = _get_match_cached(prov_regex, 4, string_iri)
89 else:
90 match = _get_match_cached(entity_regex, 4, string_iri)
91 if not match:
92 logging.warning(f"Could not extract resource number from URI: {string_iri}")
93 return -1 # or some other default value
94 try:
95 return int(match)
96 except ValueError:
97 logging.error(f"Invalid resource number in URI: {string_iri}, extracted: {match}")
98 return -1 # or some other default value
100def find_local_line_id(res: URIRef, n_file_item: int = 1) -> int:
101 cur_number: int = get_resource_number(res)
103 cur_file_split: int = 0
104 while True:
105 if cur_number > cur_file_split:
106 cur_file_split += n_file_item
107 else:
108 cur_file_split -= n_file_item
109 break
111 return cur_number - cur_file_split
113def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_split: int,
114 n_file_item: int, is_json: bool = True):
115 """
116 This function is responsible for looking for the correct JSON file that contains the data related to the
117 resource identified by the variable 'string_iri'. This search takes into account the organisation in
118 directories and files, as well as the particular supplier prefix for bibliographic entities, if specified.
119 In case no supplier prefix is specified, the 'default_dir' (usually set to "_") is used instead.
120 """
121 string_iri: str = str(res)
123 cur_number: int = get_resource_number(res)
125 if cur_number == -1:
126 # Handle the error case
127 logging.error(f"Could not process URI: {string_iri}")
128 return None, None # or some default paths
130 # Find the correct file number where to save the resources
131 cur_file_split: int = 0
132 while True:
133 if cur_number > cur_file_split:
134 cur_file_split += n_file_item
135 else:
136 break
138 # The data have been split in multiple directories and it is not something related
139 # with the provenance data of the whole corpus (e.g. provenance agents)
140 if dir_split and not string_iri.startswith(base_iri + "prov/"):
141 # Find the correct directory number where to save the file
142 cur_split: int = 0
143 while True:
144 if cur_number > cur_split:
145 cur_split += dir_split
146 else:
147 break
149 if "/prov/" in string_iri: # provenance file of a bibliographic entity
150 subj_short_name: str = get_prov_subject_short_name(res)
151 short_name: str = get_short_name(res)
152 sub_folder: str = get_prov_subject_prefix(res)
153 file_extension: str = '.json' if is_json else '.nq'
154 if sub_folder == "":
155 sub_folder = default_dir
156 if sub_folder == "":
157 sub_folder = "_" # enforce default value
159 cur_dir_path: str = base_dir + subj_short_name + os.sep + sub_folder + \
160 os.sep + str(cur_split) + os.sep + str(cur_file_split) + os.sep + "prov"
161 cur_file_path: str = cur_dir_path + os.sep + short_name + file_extension
162 else: # regular bibliographic entity
163 short_name: str = get_short_name(res)
164 sub_folder: str = get_prefix(res)
165 file_extension: str = '.json' if is_json else '.nt'
166 if sub_folder == "":
167 sub_folder = default_dir
168 if sub_folder == "":
169 sub_folder = "_" # enforce default value
171 cur_dir_path: str = base_dir + short_name + os.sep + sub_folder + os.sep + str(cur_split)
172 cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + file_extension
173 # Enter here if no split is needed
174 elif dir_split == 0:
175 if "/prov/" in string_iri:
176 subj_short_name: str = get_prov_subject_short_name(res)
177 short_name: str = get_short_name(res)
178 sub_folder: str = get_prov_subject_prefix(res)
179 file_extension: str = '.json' if is_json else '.nq'
180 if sub_folder == "":
181 sub_folder = default_dir
182 if sub_folder == "":
183 sub_folder = "_" # enforce default value
185 cur_dir_path: str = base_dir + subj_short_name + os.sep + sub_folder + \
186 os.sep + str(cur_file_split) + os.sep + "prov"
187 cur_file_path: str = cur_dir_path + os.sep + short_name + file_extension
188 else:
189 short_name: str = get_short_name(res)
190 sub_folder: str = get_prefix(res)
191 file_extension: str = '.json' if is_json else '.nt'
192 if sub_folder == "":
193 sub_folder = default_dir
194 if sub_folder == "":
195 sub_folder = "_" # enforce default value
197 cur_dir_path: str = base_dir + short_name + os.sep + sub_folder
198 cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + file_extension
199 # Enter here if the data is about a provenance agent, e.g. /corpus/prov/
200 else:
201 short_name: str = get_short_name(res)
202 prefix: str = get_prefix(res)
203 count: str = get_count(res)
204 file_extension: str = '.json' if is_json else '.nq'
206 cur_dir_path: str = base_dir + short_name
207 cur_file_path: str = cur_dir_path + os.sep + prefix + count + file_extension
209 return cur_dir_path, cur_file_path
211def store(triples, graph_identifier, stored_g: Dataset) -> Dataset:
212 for triple in triples:
213 stored_g.add((triple[0], triple[1], triple[2], graph_identifier))
214 return stored_g
216def store_in_file(cur_g: Dataset, cur_file_path: str, zip_output: bool) -> None:
217 dir_path = os.path.dirname(cur_file_path)
218 if not os.path.exists(dir_path):
219 os.makedirs(dir_path, exist_ok=True)
221 cur_json_ld = orjson.loads(cur_g.serialize(format="json-ld"))
223 if zip_output:
224 with ZipFile(cur_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) as zip_file:
225 json_str = orjson.dumps(cur_json_ld).decode('utf-8')
226 zip_file.writestr(os.path.basename(cur_file_path.replace('.zip', '.json')), json_str)
227 else:
228 with open(cur_file_path, 'wb') as f:
229 f.write(orjson.dumps(cur_json_ld))
231def load_graph(file_path: str, cur_format: str = 'json-ld'):
232 loaded_graph = Dataset(default_union=True)
233 if file_path.endswith('.zip'):
234 with ZipFile(file=file_path, mode="r", compression=ZIP_DEFLATED, allowZip64=True) as archive:
235 for zf_name in archive.namelist():
236 with archive.open(zf_name) as f:
237 if cur_format == "json-ld":
238 json_ld_file = orjson.loads(f.read())
239 if isinstance(json_ld_file, dict):
240 json_ld_file = [json_ld_file]
241 for json_ld_resource in json_ld_file:
242 loaded_graph.parse(data=orjson.dumps(json_ld_resource).decode('utf-8'), format=cur_format)
243 else:
244 loaded_graph.parse(file=f, format=cur_format) # type: ignore[arg-type]
245 else:
246 with open(file_path, 'rb') as f:
247 if cur_format == "json-ld":
248 json_ld_file = orjson.loads(f.read())
249 if isinstance(json_ld_file, dict):
250 json_ld_file = [json_ld_file]
251 for json_ld_resource in json_ld_file:
252 loaded_graph.parse(data=orjson.dumps(json_ld_resource).decode('utf-8'), format=cur_format)
253 else:
254 loaded_graph.parse(file=f, format=cur_format) # type: ignore[arg-type]
256 return loaded_graph
258def process_graph(context, graph_identifier, output_root, base_iri, file_limit, item_limit, zip_output):
259 modifications_by_file = {}
260 triples = 0
261 unique_id = generate_unique_id()
263 for triple in context:
264 triples += len(triple)
265 entity_uri = triple[0]
266 _, cur_file_path = find_paths(entity_uri, output_root, base_iri, '_', file_limit, item_limit, True)
267 if cur_file_path is None:
268 logging.warning(f"Skipping triple due to invalid URI: {entity_uri}")
269 continue
271 # Estrai il nome base del file (numero) e aggiungi l'ID unico
272 base_name = os.path.splitext(os.path.basename(cur_file_path))[0]
273 new_file_name = f"{base_name}_{unique_id}"
275 cur_file_path = os.path.join(os.path.dirname(cur_file_path), new_file_name + ('.zip' if zip_output else '.json'))
277 if cur_file_path not in modifications_by_file:
278 modifications_by_file[cur_file_path] = {
279 "graph_identifier": graph_identifier,
280 "triples": []
281 }
282 modifications_by_file[cur_file_path]["triples"].append(triple)
284 for file_path, data in modifications_by_file.items():
285 stored_g = load_graph(file_path) if os.path.exists(file_path) else Dataset()
286 stored_g = store(data["triples"], data["graph_identifier"], stored_g)
287 store_in_file(stored_g, file_path, zip_output)
288 return triples
290def merge_files(output_root, base_file_name, file_extension, zip_output):
291 """Funzione per fondere i file generati dai diversi processi"""
292 files_to_merge = [f for f in os.listdir(output_root) if f.startswith(base_file_name) and f.endswith(file_extension)]
294 merged_graph = Dataset()
296 for file_path in files_to_merge:
297 cur_full_path = os.path.join(output_root, file_path)
298 loaded_graph = load_graph(cur_full_path)
299 merged_graph += loaded_graph
301 final_file_path = os.path.join(output_root, base_file_name + file_extension)
302 store_in_file(merged_graph, final_file_path, zip_output) # type: ignore[arg-type]
304def merge_files_in_directory(directory, zip_output, stop_file):
305 """Function to merge files in a specific directory"""
306 if check_stop_file(stop_file):
307 logging.info("Stop file detected. Stopping merge process.")
308 return
310 files = [f for f in os.listdir(directory) if f.endswith('.zip' if zip_output else '.json')]
312 # Group files by their base name (number without the unique ID)
313 file_groups = {}
314 for file in files:
315 match = re.match(r'^((?:\d+)|(?:se))(?:_[^.]+)?\.', file)
316 if match:
317 base_name = match.group(1)
318 if base_name not in file_groups:
319 file_groups[base_name] = []
320 file_groups[base_name].append(file)
322 for base_file_name, files_to_merge in file_groups.items():
323 if check_stop_file(stop_file):
324 logging.info("Stop file detected. Stopping merge process.")
325 return
327 # Only proceed with merging if there's at least one file with an underscore
328 if not any('_' in file for file in files_to_merge):
329 continue
331 merged_graph = Dataset()
333 for file_path in files_to_merge:
334 cur_full_path = os.path.join(directory, file_path)
335 loaded_graph = load_graph(cur_full_path)
336 for context in loaded_graph.graphs():
337 graph_identifier = context.identifier
338 for triple in context:
339 merged_graph.add(triple + (graph_identifier,)) # type: ignore[arg-type]
341 final_file_path = os.path.join(directory, f"{base_file_name}" + ('.zip' if zip_output else '.json'))
342 store_in_file(merged_graph, final_file_path, zip_output)
344 # Remove the original files after merging
345 for file_path in files_to_merge:
346 if file_path != os.path.basename(final_file_path):
347 os.remove(os.path.join(directory, file_path))
349def generate_unique_id():
350 return f"{int(time.time())}-{uuid.uuid4()}"
352def merge_files_wrapper(args):
353 directory, zip_output, stop_file = args
354 merge_files_in_directory(directory, zip_output, stop_file)
356def merge_all_files_parallel(output_root, zip_output, stop_file):
357 """Function to merge files in parallel"""
358 if check_stop_file(stop_file):
359 logging.info("Stop file detected. Stopping merge process.")
360 return
362 directories_to_process = []
363 for root, dirs, files in os.walk(output_root):
364 if any(f.endswith('.zip' if zip_output else '.json') for f in files):
365 directories_to_process.append(root)
367 with multiprocessing.Pool() as pool:
368 list(tqdm(pool.imap(merge_files_wrapper,
369 [(dir, zip_output, stop_file) for dir in directories_to_process]),
370 total=len(directories_to_process),
371 desc="Merging files in directories"))
373def process_file_content(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format):
374 with gzip.open(file_path, 'rb') as f:
375 content = f.read()
376 assert isinstance(content, bytes)
377 data = content.decode('utf-8')
378 graph = Dataset()
379 try:
380 graph.parse(data=data, format=rdf_format)
381 except ParserError as e:
382 logging.error(f"Failed to parse {file_path}: {e}")
383 return
385 for context in graph.graphs():
386 graph_identifier = context.identifier
387 process_graph(context, graph_identifier, output_root, base_iri, file_limit, item_limit, zip_output)
389def process_file_wrapper(args):
390 file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format, cache_file, stop_file = args
391 if check_stop_file(stop_file):
392 return
393 if not is_file_processed(file_path, cache_file):
394 process_file_content(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format)
395 mark_file_as_processed(file_path, cache_file)
397def process_chunk(chunk, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format, cache_file, stop_file):
398 with multiprocessing.Pool() as pool:
399 list(tqdm(pool.imap(process_file_wrapper,
400 [(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format, cache_file, stop_file)
401 for file_path in chunk]),
402 total=len(chunk),
403 desc="Processing files"))
405def create_cache_file(cache_file):
406 if cache_file:
407 if not os.path.exists(cache_file):
408 with open(cache_file, 'w', encoding='utf8'):
409 pass # Create an empty file
410 else:
411 logging.info("No cache file specified. Skipping cache creation.")
413def is_file_processed(file_path, cache_file):
414 if not cache_file:
415 return False
416 with open(cache_file, 'r', encoding='utf8') as f:
417 processed_files = f.read().splitlines()
418 return file_path in processed_files
420def mark_file_as_processed(file_path, cache_file):
421 if cache_file:
422 with open(cache_file, 'a', encoding='utf8') as f:
423 f.write(f"{file_path}\n")
424 else:
425 logging.debug(f"No cache file specified. Skipping marking {file_path} as processed.")
427def check_stop_file(stop_file):
428 return os.path.exists(stop_file)
430def main():
431 parser = argparse.ArgumentParser(
432 description="Process gzipped input files into OC Meta RDF",
433 formatter_class=RichHelpFormatter,
434 )
435 parser.add_argument('input_folder', type=str, help='Input folder containing gzipped input files')
436 parser.add_argument('output_root', type=str, help='Root folder for output OC Meta RDF files')
437 parser.add_argument('--base_iri', type=str, default='https://w3id.org/oc/meta/', help='The base URI of entities on Meta')
438 parser.add_argument('--file_limit', type=int, default=10000, help='Number of files per folder')
439 parser.add_argument('--item_limit', type=int, default=1000, help='Number of items per file')
440 parser.add_argument('-v', '--zip_output', default=True, dest='zip_output', action='store_true', help='Zip output json files')
441 parser.add_argument('--input_format', type=str, default='jsonld', choices=['jsonld', 'nquads'], help='Format of the input files')
442 parser.add_argument('--chunk_size', type=int, default=1000, help='Number of files to process before merging')
443 parser.add_argument('--cache_file', type=str, default=None, help='File to store processed file names (optional)')
444 parser.add_argument('--stop_file', type=str, default='./.stop', help='File to signal process termination')
445 args = parser.parse_args()
447 create_cache_file(args.cache_file)
449 file_extension = '.nq.gz' if args.input_format == 'nquads' else '.jsonld.gz'
450 rdf_format = 'nquads' if args.input_format == 'nquads' else 'json-ld'
452 files_to_process = [os.path.join(args.input_folder, file) for file in os.listdir(args.input_folder) if file.endswith(file_extension)]
453 chunks = [files_to_process[i:i + args.chunk_size] for i in range(0, len(files_to_process), args.chunk_size)]
454 for i, chunk in enumerate(tqdm(chunks, desc="Processing chunks")):
455 if check_stop_file(args.stop_file):
456 logging.info("Stop file detected. Gracefully terminating the process.")
457 break
458 logging.info(f"Processing chunk {i+1}/{len(chunks)}")
459 process_chunk(chunk, args.output_root, args.base_iri, args.file_limit, args.item_limit, args.zip_output, rdf_format, args.cache_file, args.stop_file)
460 logging.info(f"Merging files for chunk {i+1}")
461 merge_all_files_parallel(args.output_root, args.zip_output, args.stop_file)
463 logging.info("Processing complete")
465if __name__ == "__main__":
466 main()