Coverage for oc_meta / run / migration / rdf_from_export.py: 0%
332 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2023 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE
17import argparse
18import gzip
19import logging
20import multiprocessing
21import os
22import re
23import time
24import uuid
25from functools import lru_cache
26from zipfile import ZIP_DEFLATED, ZipFile
28import orjson
29from rdflib import Dataset, URIRef
30from rdflib.exceptions import ParserError
31from rich_argparse import RichHelpFormatter
32from tqdm import tqdm
34# Variable used in several functions
35entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))$"
36prov_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))/prov/([a-z][a-z])/([1-9][0-9]*)$"
38@lru_cache(maxsize=1024)
39def _get_match_cached(regex: str, group: int, string: str) -> str:
40 match = re.match(regex, string)
41 if match is not None:
42 return match.group(group)
43 else:
44 return ""
46def get_base_iri(res: URIRef) -> str:
47 string_iri: str = str(res)
48 if "/prov/" in string_iri:
49 return _get_match_cached(prov_regex, 1, string_iri)
50 else:
51 return _get_match_cached(entity_regex, 1, string_iri)
53def get_short_name(res: URIRef) -> str:
54 string_iri: str = str(res)
55 if "/prov/" in string_iri:
56 return _get_match_cached(prov_regex, 5, string_iri)
57 else:
58 return _get_match_cached(entity_regex, 2, string_iri)
60def get_prov_subject_short_name(prov_res: URIRef) -> str:
61 string_iri: str = str(prov_res)
62 if "/prov/" in string_iri:
63 return _get_match_cached(prov_regex, 2, string_iri)
64 else:
65 return "" # non-provenance entities do not have a prov_subject!
67def get_prefix(res: URIRef) -> str:
68 string_iri: str = str(res)
69 if "/prov/" in string_iri:
70 return "" # provenance entities cannot have a supplier prefix
71 else:
72 return _get_match_cached(entity_regex, 3, string_iri)
74def get_prov_subject_prefix(prov_res: URIRef) -> str:
75 string_iri: str = str(prov_res)
76 if "/prov/" in string_iri:
77 return _get_match_cached(prov_regex, 3, string_iri)
78 else:
79 return "" # non-provenance entities do not have a prov_subject!
81def get_count(res: URIRef) -> str:
82 string_iri: str = str(res)
83 if "/prov/" in string_iri:
84 return _get_match_cached(prov_regex, 6, string_iri)
85 else:
86 return _get_match_cached(entity_regex, 4, string_iri)
88def get_prov_subject_count(prov_res: URIRef) -> str:
89 string_iri: str = str(prov_res)
90 if "/prov/" in string_iri:
91 return _get_match_cached(prov_regex, 4, string_iri)
92 else:
93 return "" # non-provenance entities do not have a prov_subject!
95def get_resource_number(res: URIRef) -> int:
96 string_iri: str = str(res)
97 if "/prov/" in string_iri:
98 match = _get_match_cached(prov_regex, 4, string_iri)
99 else:
100 match = _get_match_cached(entity_regex, 4, string_iri)
101 if not match:
102 logging.warning(f"Could not extract resource number from URI: {string_iri}")
103 return -1 # or some other default value
104 try:
105 return int(match)
106 except ValueError:
107 logging.error(f"Invalid resource number in URI: {string_iri}, extracted: {match}")
108 return -1 # or some other default value
110def find_local_line_id(res: URIRef, n_file_item: int = 1) -> int:
111 cur_number: int = get_resource_number(res)
113 cur_file_split: int = 0
114 while True:
115 if cur_number > cur_file_split:
116 cur_file_split += n_file_item
117 else:
118 cur_file_split -= n_file_item
119 break
121 return cur_number - cur_file_split
123def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_split: int,
124 n_file_item: int, is_json: bool = True):
125 """
126 This function is responsible for looking for the correct JSON file that contains the data related to the
127 resource identified by the variable 'string_iri'. This search takes into account the organisation in
128 directories and files, as well as the particular supplier prefix for bibliographic entities, if specified.
129 In case no supplier prefix is specified, the 'default_dir' (usually set to "_") is used instead.
130 """
131 string_iri: str = str(res)
133 cur_number: int = get_resource_number(res)
135 if cur_number == -1:
136 # Handle the error case
137 logging.error(f"Could not process URI: {string_iri}")
138 return None, None # or some default paths
140 # Find the correct file number where to save the resources
141 cur_file_split: int = 0
142 while True:
143 if cur_number > cur_file_split:
144 cur_file_split += n_file_item
145 else:
146 break
148 # The data have been split in multiple directories and it is not something related
149 # with the provenance data of the whole corpus (e.g. provenance agents)
150 if dir_split and not string_iri.startswith(base_iri + "prov/"):
151 # Find the correct directory number where to save the file
152 cur_split: int = 0
153 while True:
154 if cur_number > cur_split:
155 cur_split += dir_split
156 else:
157 break
159 if "/prov/" in string_iri: # provenance file of a bibliographic entity
160 subj_short_name: str = get_prov_subject_short_name(res)
161 short_name: str = get_short_name(res)
162 sub_folder: str = get_prov_subject_prefix(res)
163 file_extension: str = '.json' if is_json else '.nq'
164 if sub_folder == "":
165 sub_folder = default_dir
166 if sub_folder == "":
167 sub_folder = "_" # enforce default value
169 cur_dir_path: str = base_dir + subj_short_name + os.sep + sub_folder + \
170 os.sep + str(cur_split) + os.sep + str(cur_file_split) + os.sep + "prov"
171 cur_file_path: str = cur_dir_path + os.sep + short_name + file_extension
172 else: # regular bibliographic entity
173 short_name: str = get_short_name(res)
174 sub_folder: str = get_prefix(res)
175 file_extension: str = '.json' if is_json else '.nt'
176 if sub_folder == "":
177 sub_folder = default_dir
178 if sub_folder == "":
179 sub_folder = "_" # enforce default value
181 cur_dir_path: str = base_dir + short_name + os.sep + sub_folder + os.sep + str(cur_split)
182 cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + file_extension
183 # Enter here if no split is needed
184 elif dir_split == 0:
185 if "/prov/" in string_iri:
186 subj_short_name: str = get_prov_subject_short_name(res)
187 short_name: str = get_short_name(res)
188 sub_folder: str = get_prov_subject_prefix(res)
189 file_extension: str = '.json' if is_json else '.nq'
190 if sub_folder == "":
191 sub_folder = default_dir
192 if sub_folder == "":
193 sub_folder = "_" # enforce default value
195 cur_dir_path: str = base_dir + subj_short_name + os.sep + sub_folder + \
196 os.sep + str(cur_file_split) + os.sep + "prov"
197 cur_file_path: str = cur_dir_path + os.sep + short_name + file_extension
198 else:
199 short_name: str = get_short_name(res)
200 sub_folder: str = get_prefix(res)
201 file_extension: str = '.json' if is_json else '.nt'
202 if sub_folder == "":
203 sub_folder = default_dir
204 if sub_folder == "":
205 sub_folder = "_" # enforce default value
207 cur_dir_path: str = base_dir + short_name + os.sep + sub_folder
208 cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + file_extension
209 # Enter here if the data is about a provenance agent, e.g. /corpus/prov/
210 else:
211 short_name: str = get_short_name(res)
212 prefix: str = get_prefix(res)
213 count: str = get_count(res)
214 file_extension: str = '.json' if is_json else '.nq'
216 cur_dir_path: str = base_dir + short_name
217 cur_file_path: str = cur_dir_path + os.sep + prefix + count + file_extension
219 return cur_dir_path, cur_file_path
221def store(triples, graph_identifier, stored_g: Dataset) -> Dataset:
222 for triple in triples:
223 stored_g.add((triple[0], triple[1], triple[2], graph_identifier))
224 return stored_g
226def store_in_file(cur_g: Dataset, cur_file_path: str, zip_output: bool) -> None:
227 dir_path = os.path.dirname(cur_file_path)
228 if not os.path.exists(dir_path):
229 os.makedirs(dir_path, exist_ok=True)
231 cur_json_ld = orjson.loads(cur_g.serialize(format="json-ld"))
233 if zip_output:
234 with ZipFile(cur_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) as zip_file:
235 json_str = orjson.dumps(cur_json_ld).decode('utf-8')
236 zip_file.writestr(os.path.basename(cur_file_path.replace('.zip', '.json')), json_str)
237 else:
238 with open(cur_file_path, 'wb') as f:
239 f.write(orjson.dumps(cur_json_ld))
241def load_graph(file_path: str, cur_format: str = 'json-ld'):
242 loaded_graph = Dataset(default_union=True)
243 if file_path.endswith('.zip'):
244 with ZipFile(file=file_path, mode="r", compression=ZIP_DEFLATED, allowZip64=True) as archive:
245 for zf_name in archive.namelist():
246 with archive.open(zf_name) as f:
247 if cur_format == "json-ld":
248 json_ld_file = orjson.loads(f.read())
249 if isinstance(json_ld_file, dict):
250 json_ld_file = [json_ld_file]
251 for json_ld_resource in json_ld_file:
252 loaded_graph.parse(data=orjson.dumps(json_ld_resource).decode('utf-8'), format=cur_format)
253 else:
254 loaded_graph.parse(file=f, format=cur_format) # type: ignore[arg-type]
255 else:
256 with open(file_path, 'rb') as f:
257 if cur_format == "json-ld":
258 json_ld_file = orjson.loads(f.read())
259 if isinstance(json_ld_file, dict):
260 json_ld_file = [json_ld_file]
261 for json_ld_resource in json_ld_file:
262 loaded_graph.parse(data=orjson.dumps(json_ld_resource).decode('utf-8'), format=cur_format)
263 else:
264 loaded_graph.parse(file=f, format=cur_format) # type: ignore[arg-type]
266 return loaded_graph
268def process_graph(context, graph_identifier, output_root, base_iri, file_limit, item_limit, zip_output):
269 modifications_by_file = {}
270 triples = 0
271 unique_id = generate_unique_id()
273 for triple in context:
274 triples += len(triple)
275 entity_uri = triple[0]
276 _, cur_file_path = find_paths(entity_uri, output_root, base_iri, '_', file_limit, item_limit, True)
277 if cur_file_path is None:
278 logging.warning(f"Skipping triple due to invalid URI: {entity_uri}")
279 continue
281 # Estrai il nome base del file (numero) e aggiungi l'ID unico
282 base_name = os.path.splitext(os.path.basename(cur_file_path))[0]
283 new_file_name = f"{base_name}_{unique_id}"
285 cur_file_path = os.path.join(os.path.dirname(cur_file_path), new_file_name + ('.zip' if zip_output else '.json'))
287 if cur_file_path not in modifications_by_file:
288 modifications_by_file[cur_file_path] = {
289 "graph_identifier": graph_identifier,
290 "triples": []
291 }
292 modifications_by_file[cur_file_path]["triples"].append(triple)
294 for file_path, data in modifications_by_file.items():
295 stored_g = load_graph(file_path) if os.path.exists(file_path) else Dataset()
296 stored_g = store(data["triples"], data["graph_identifier"], stored_g)
297 store_in_file(stored_g, file_path, zip_output)
298 return triples
300def merge_files(output_root, base_file_name, file_extension, zip_output):
301 """Funzione per fondere i file generati dai diversi processi"""
302 files_to_merge = [f for f in os.listdir(output_root) if f.startswith(base_file_name) and f.endswith(file_extension)]
304 merged_graph = Dataset()
306 for file_path in files_to_merge:
307 cur_full_path = os.path.join(output_root, file_path)
308 loaded_graph = load_graph(cur_full_path)
309 merged_graph += loaded_graph
311 final_file_path = os.path.join(output_root, base_file_name + file_extension)
312 store_in_file(merged_graph, final_file_path, zip_output) # type: ignore[arg-type]
314def merge_files_in_directory(directory, zip_output, stop_file):
315 """Function to merge files in a specific directory"""
316 if check_stop_file(stop_file):
317 logging.info("Stop file detected. Stopping merge process.")
318 return
320 files = [f for f in os.listdir(directory) if f.endswith('.zip' if zip_output else '.json')]
322 # Group files by their base name (number without the unique ID)
323 file_groups = {}
324 for file in files:
325 match = re.match(r'^((?:\d+)|(?:se))(?:_[^.]+)?\.', file)
326 if match:
327 base_name = match.group(1)
328 if base_name not in file_groups:
329 file_groups[base_name] = []
330 file_groups[base_name].append(file)
332 for base_file_name, files_to_merge in file_groups.items():
333 if check_stop_file(stop_file):
334 logging.info("Stop file detected. Stopping merge process.")
335 return
337 # Only proceed with merging if there's at least one file with an underscore
338 if not any('_' in file for file in files_to_merge):
339 continue
341 merged_graph = Dataset()
343 for file_path in files_to_merge:
344 cur_full_path = os.path.join(directory, file_path)
345 loaded_graph = load_graph(cur_full_path)
346 for context in loaded_graph.graphs():
347 graph_identifier = context.identifier
348 for triple in context:
349 merged_graph.add(triple + (graph_identifier,)) # type: ignore[arg-type]
351 final_file_path = os.path.join(directory, f"{base_file_name}" + ('.zip' if zip_output else '.json'))
352 store_in_file(merged_graph, final_file_path, zip_output)
354 # Remove the original files after merging
355 for file_path in files_to_merge:
356 if file_path != os.path.basename(final_file_path):
357 os.remove(os.path.join(directory, file_path))
359def generate_unique_id():
360 return f"{int(time.time())}-{uuid.uuid4()}"
362def merge_files_wrapper(args):
363 directory, zip_output, stop_file = args
364 merge_files_in_directory(directory, zip_output, stop_file)
366def merge_all_files_parallel(output_root, zip_output, stop_file):
367 """Function to merge files in parallel"""
368 if check_stop_file(stop_file):
369 logging.info("Stop file detected. Stopping merge process.")
370 return
372 directories_to_process = []
373 for root, dirs, files in os.walk(output_root):
374 if any(f.endswith('.zip' if zip_output else '.json') for f in files):
375 directories_to_process.append(root)
377 with multiprocessing.Pool() as pool:
378 list(tqdm(pool.imap(merge_files_wrapper,
379 [(dir, zip_output, stop_file) for dir in directories_to_process]),
380 total=len(directories_to_process),
381 desc="Merging files in directories"))
383def process_file_content(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format):
384 with gzip.open(file_path, 'rb') as f:
385 content = f.read()
386 assert isinstance(content, bytes)
387 data = content.decode('utf-8')
388 graph = Dataset()
389 try:
390 graph.parse(data=data, format=rdf_format)
391 except ParserError as e:
392 logging.error(f"Failed to parse {file_path}: {e}")
393 return
395 for context in graph.graphs():
396 graph_identifier = context.identifier
397 process_graph(context, graph_identifier, output_root, base_iri, file_limit, item_limit, zip_output)
399def process_file_wrapper(args):
400 file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format, cache_file, stop_file = args
401 if check_stop_file(stop_file):
402 return
403 if not is_file_processed(file_path, cache_file):
404 process_file_content(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format)
405 mark_file_as_processed(file_path, cache_file)
407def process_chunk(chunk, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format, cache_file, stop_file):
408 with multiprocessing.Pool() as pool:
409 list(tqdm(pool.imap(process_file_wrapper,
410 [(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format, cache_file, stop_file)
411 for file_path in chunk]),
412 total=len(chunk),
413 desc="Processing files"))
415def create_cache_file(cache_file):
416 if cache_file:
417 if not os.path.exists(cache_file):
418 with open(cache_file, 'w', encoding='utf8'):
419 pass # Create an empty file
420 else:
421 logging.info("No cache file specified. Skipping cache creation.")
423def is_file_processed(file_path, cache_file):
424 if not cache_file:
425 return False
426 with open(cache_file, 'r', encoding='utf8') as f:
427 processed_files = f.read().splitlines()
428 return file_path in processed_files
430def mark_file_as_processed(file_path, cache_file):
431 if cache_file:
432 with open(cache_file, 'a', encoding='utf8') as f:
433 f.write(f"{file_path}\n")
434 else:
435 logging.debug(f"No cache file specified. Skipping marking {file_path} as processed.")
437def check_stop_file(stop_file):
438 return os.path.exists(stop_file)
440def main():
441 parser = argparse.ArgumentParser(
442 description="Process gzipped input files into OC Meta RDF",
443 formatter_class=RichHelpFormatter,
444 )
445 parser.add_argument('input_folder', type=str, help='Input folder containing gzipped input files')
446 parser.add_argument('output_root', type=str, help='Root folder for output OC Meta RDF files')
447 parser.add_argument('--base_iri', type=str, default='https://w3id.org/oc/meta/', help='The base URI of entities on Meta')
448 parser.add_argument('--file_limit', type=int, default=10000, help='Number of files per folder')
449 parser.add_argument('--item_limit', type=int, default=1000, help='Number of items per file')
450 parser.add_argument('-v', '--zip_output', default=True, dest='zip_output', action='store_true', help='Zip output json files')
451 parser.add_argument('--input_format', type=str, default='jsonld', choices=['jsonld', 'nquads'], help='Format of the input files')
452 parser.add_argument('--chunk_size', type=int, default=1000, help='Number of files to process before merging')
453 parser.add_argument('--cache_file', type=str, default=None, help='File to store processed file names (optional)')
454 parser.add_argument('--stop_file', type=str, default='./.stop', help='File to signal process termination')
455 args = parser.parse_args()
457 create_cache_file(args.cache_file)
459 file_extension = '.nq.gz' if args.input_format == 'nquads' else '.jsonld.gz'
460 rdf_format = 'nquads' if args.input_format == 'nquads' else 'json-ld'
462 files_to_process = [os.path.join(args.input_folder, file) for file in os.listdir(args.input_folder) if file.endswith(file_extension)]
463 chunks = [files_to_process[i:i + args.chunk_size] for i in range(0, len(files_to_process), args.chunk_size)]
464 for i, chunk in enumerate(tqdm(chunks, desc="Processing chunks")):
465 if check_stop_file(args.stop_file):
466 logging.info("Stop file detected. Gracefully terminating the process.")
467 break
468 logging.info(f"Processing chunk {i+1}/{len(chunks)}")
469 process_chunk(chunk, args.output_root, args.base_iri, args.file_limit, args.item_limit, args.zip_output, rdf_format, args.cache_file, args.stop_file)
470 logging.info(f"Merging files for chunk {i+1}")
471 merge_all_files_parallel(args.output_root, args.zip_output, args.stop_file)
473 logging.info("Processing complete")
475if __name__ == "__main__":
476 main()