Coverage for oc_meta / run / migration / rdf_from_export.py: 0%

332 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7import argparse 

8import gzip 

9import logging 

10import multiprocessing 

11import os 

12import re 

13import time 

14import uuid 

15from functools import lru_cache 

16from zipfile import ZIP_DEFLATED, ZipFile 

17 

18import orjson 

19from rdflib import Dataset, URIRef 

20from rdflib.exceptions import ParserError 

21from rich_argparse import RichHelpFormatter 

22from tqdm import tqdm 

23 

24# Variable used in several functions 

25entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))$" 

26prov_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))/prov/([a-z][a-z])/([1-9][0-9]*)$" 

27 

28@lru_cache(maxsize=1024) 

29def _get_match_cached(regex: str, group: int, string: str) -> str: 

30 match = re.match(regex, string) 

31 if match is not None: 

32 return match.group(group) 

33 else: 

34 return "" 

35 

36def get_base_iri(res: URIRef) -> str: 

37 string_iri: str = str(res) 

38 if "/prov/" in string_iri: 

39 return _get_match_cached(prov_regex, 1, string_iri) 

40 else: 

41 return _get_match_cached(entity_regex, 1, string_iri) 

42 

43def get_short_name(res: URIRef) -> str: 

44 string_iri: str = str(res) 

45 if "/prov/" in string_iri: 

46 return _get_match_cached(prov_regex, 5, string_iri) 

47 else: 

48 return _get_match_cached(entity_regex, 2, string_iri) 

49 

50def get_prov_subject_short_name(prov_res: URIRef) -> str: 

51 string_iri: str = str(prov_res) 

52 if "/prov/" in string_iri: 

53 return _get_match_cached(prov_regex, 2, string_iri) 

54 else: 

55 return "" # non-provenance entities do not have a prov_subject! 

56 

57def get_prefix(res: URIRef) -> str: 

58 string_iri: str = str(res) 

59 if "/prov/" in string_iri: 

60 return "" # provenance entities cannot have a supplier prefix 

61 else: 

62 return _get_match_cached(entity_regex, 3, string_iri) 

63 

64def get_prov_subject_prefix(prov_res: URIRef) -> str: 

65 string_iri: str = str(prov_res) 

66 if "/prov/" in string_iri: 

67 return _get_match_cached(prov_regex, 3, string_iri) 

68 else: 

69 return "" # non-provenance entities do not have a prov_subject! 

70 

71def get_count(res: URIRef) -> str: 

72 string_iri: str = str(res) 

73 if "/prov/" in string_iri: 

74 return _get_match_cached(prov_regex, 6, string_iri) 

75 else: 

76 return _get_match_cached(entity_regex, 4, string_iri) 

77 

78def get_prov_subject_count(prov_res: URIRef) -> str: 

79 string_iri: str = str(prov_res) 

80 if "/prov/" in string_iri: 

81 return _get_match_cached(prov_regex, 4, string_iri) 

82 else: 

83 return "" # non-provenance entities do not have a prov_subject! 

84 

85def get_resource_number(res: URIRef) -> int: 

86 string_iri: str = str(res) 

87 if "/prov/" in string_iri: 

88 match = _get_match_cached(prov_regex, 4, string_iri) 

89 else: 

90 match = _get_match_cached(entity_regex, 4, string_iri) 

91 if not match: 

92 logging.warning(f"Could not extract resource number from URI: {string_iri}") 

93 return -1 # or some other default value 

94 try: 

95 return int(match) 

96 except ValueError: 

97 logging.error(f"Invalid resource number in URI: {string_iri}, extracted: {match}") 

98 return -1 # or some other default value 

99 

100def find_local_line_id(res: URIRef, n_file_item: int = 1) -> int: 

101 cur_number: int = get_resource_number(res) 

102 

103 cur_file_split: int = 0 

104 while True: 

105 if cur_number > cur_file_split: 

106 cur_file_split += n_file_item 

107 else: 

108 cur_file_split -= n_file_item 

109 break 

110 

111 return cur_number - cur_file_split 

112 

113def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_split: int, 

114 n_file_item: int, is_json: bool = True): 

115 """ 

116 This function is responsible for looking for the correct JSON file that contains the data related to the 

117 resource identified by the variable 'string_iri'. This search takes into account the organisation in 

118 directories and files, as well as the particular supplier prefix for bibliographic entities, if specified. 

119 In case no supplier prefix is specified, the 'default_dir' (usually set to "_") is used instead. 

120 """ 

121 string_iri: str = str(res) 

122 

123 cur_number: int = get_resource_number(res) 

124 

125 if cur_number == -1: 

126 # Handle the error case 

127 logging.error(f"Could not process URI: {string_iri}") 

128 return None, None # or some default paths 

129 

130 # Find the correct file number where to save the resources 

131 cur_file_split: int = 0 

132 while True: 

133 if cur_number > cur_file_split: 

134 cur_file_split += n_file_item 

135 else: 

136 break 

137 

138 # The data have been split in multiple directories and it is not something related 

139 # with the provenance data of the whole corpus (e.g. provenance agents) 

140 if dir_split and not string_iri.startswith(base_iri + "prov/"): 

141 # Find the correct directory number where to save the file 

142 cur_split: int = 0 

143 while True: 

144 if cur_number > cur_split: 

145 cur_split += dir_split 

146 else: 

147 break 

148 

149 if "/prov/" in string_iri: # provenance file of a bibliographic entity 

150 subj_short_name: str = get_prov_subject_short_name(res) 

151 short_name: str = get_short_name(res) 

152 sub_folder: str = get_prov_subject_prefix(res) 

153 file_extension: str = '.json' if is_json else '.nq' 

154 if sub_folder == "": 

155 sub_folder = default_dir 

156 if sub_folder == "": 

157 sub_folder = "_" # enforce default value 

158 

159 cur_dir_path: str = base_dir + subj_short_name + os.sep + sub_folder + \ 

160 os.sep + str(cur_split) + os.sep + str(cur_file_split) + os.sep + "prov" 

161 cur_file_path: str = cur_dir_path + os.sep + short_name + file_extension 

162 else: # regular bibliographic entity 

163 short_name: str = get_short_name(res) 

164 sub_folder: str = get_prefix(res) 

165 file_extension: str = '.json' if is_json else '.nt' 

166 if sub_folder == "": 

167 sub_folder = default_dir 

168 if sub_folder == "": 

169 sub_folder = "_" # enforce default value 

170 

171 cur_dir_path: str = base_dir + short_name + os.sep + sub_folder + os.sep + str(cur_split) 

172 cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + file_extension 

173 # Enter here if no split is needed 

174 elif dir_split == 0: 

175 if "/prov/" in string_iri: 

176 subj_short_name: str = get_prov_subject_short_name(res) 

177 short_name: str = get_short_name(res) 

178 sub_folder: str = get_prov_subject_prefix(res) 

179 file_extension: str = '.json' if is_json else '.nq' 

180 if sub_folder == "": 

181 sub_folder = default_dir 

182 if sub_folder == "": 

183 sub_folder = "_" # enforce default value 

184 

185 cur_dir_path: str = base_dir + subj_short_name + os.sep + sub_folder + \ 

186 os.sep + str(cur_file_split) + os.sep + "prov" 

187 cur_file_path: str = cur_dir_path + os.sep + short_name + file_extension 

188 else: 

189 short_name: str = get_short_name(res) 

190 sub_folder: str = get_prefix(res) 

191 file_extension: str = '.json' if is_json else '.nt' 

192 if sub_folder == "": 

193 sub_folder = default_dir 

194 if sub_folder == "": 

195 sub_folder = "_" # enforce default value 

196 

197 cur_dir_path: str = base_dir + short_name + os.sep + sub_folder 

198 cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + file_extension 

199 # Enter here if the data is about a provenance agent, e.g. /corpus/prov/ 

200 else: 

201 short_name: str = get_short_name(res) 

202 prefix: str = get_prefix(res) 

203 count: str = get_count(res) 

204 file_extension: str = '.json' if is_json else '.nq' 

205 

206 cur_dir_path: str = base_dir + short_name 

207 cur_file_path: str = cur_dir_path + os.sep + prefix + count + file_extension 

208 

209 return cur_dir_path, cur_file_path 

210 

211def store(triples, graph_identifier, stored_g: Dataset) -> Dataset: 

212 for triple in triples: 

213 stored_g.add((triple[0], triple[1], triple[2], graph_identifier)) 

214 return stored_g 

215 

216def store_in_file(cur_g: Dataset, cur_file_path: str, zip_output: bool) -> None: 

217 dir_path = os.path.dirname(cur_file_path) 

218 if not os.path.exists(dir_path): 

219 os.makedirs(dir_path, exist_ok=True) 

220 

221 cur_json_ld = orjson.loads(cur_g.serialize(format="json-ld")) 

222 

223 if zip_output: 

224 with ZipFile(cur_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) as zip_file: 

225 json_str = orjson.dumps(cur_json_ld).decode('utf-8') 

226 zip_file.writestr(os.path.basename(cur_file_path.replace('.zip', '.json')), json_str) 

227 else: 

228 with open(cur_file_path, 'wb') as f: 

229 f.write(orjson.dumps(cur_json_ld)) 

230 

231def load_graph(file_path: str, cur_format: str = 'json-ld'): 

232 loaded_graph = Dataset(default_union=True) 

233 if file_path.endswith('.zip'): 

234 with ZipFile(file=file_path, mode="r", compression=ZIP_DEFLATED, allowZip64=True) as archive: 

235 for zf_name in archive.namelist(): 

236 with archive.open(zf_name) as f: 

237 if cur_format == "json-ld": 

238 json_ld_file = orjson.loads(f.read()) 

239 if isinstance(json_ld_file, dict): 

240 json_ld_file = [json_ld_file] 

241 for json_ld_resource in json_ld_file: 

242 loaded_graph.parse(data=orjson.dumps(json_ld_resource).decode('utf-8'), format=cur_format) 

243 else: 

244 loaded_graph.parse(file=f, format=cur_format) # type: ignore[arg-type] 

245 else: 

246 with open(file_path, 'rb') as f: 

247 if cur_format == "json-ld": 

248 json_ld_file = orjson.loads(f.read()) 

249 if isinstance(json_ld_file, dict): 

250 json_ld_file = [json_ld_file] 

251 for json_ld_resource in json_ld_file: 

252 loaded_graph.parse(data=orjson.dumps(json_ld_resource).decode('utf-8'), format=cur_format) 

253 else: 

254 loaded_graph.parse(file=f, format=cur_format) # type: ignore[arg-type] 

255 

256 return loaded_graph 

257 

258def process_graph(context, graph_identifier, output_root, base_iri, file_limit, item_limit, zip_output): 

259 modifications_by_file = {} 

260 triples = 0 

261 unique_id = generate_unique_id() 

262 

263 for triple in context: 

264 triples += len(triple) 

265 entity_uri = triple[0] 

266 _, cur_file_path = find_paths(entity_uri, output_root, base_iri, '_', file_limit, item_limit, True) 

267 if cur_file_path is None: 

268 logging.warning(f"Skipping triple due to invalid URI: {entity_uri}") 

269 continue 

270 

271 # Estrai il nome base del file (numero) e aggiungi l'ID unico 

272 base_name = os.path.splitext(os.path.basename(cur_file_path))[0] 

273 new_file_name = f"{base_name}_{unique_id}" 

274 

275 cur_file_path = os.path.join(os.path.dirname(cur_file_path), new_file_name + ('.zip' if zip_output else '.json')) 

276 

277 if cur_file_path not in modifications_by_file: 

278 modifications_by_file[cur_file_path] = { 

279 "graph_identifier": graph_identifier, 

280 "triples": [] 

281 } 

282 modifications_by_file[cur_file_path]["triples"].append(triple) 

283 

284 for file_path, data in modifications_by_file.items(): 

285 stored_g = load_graph(file_path) if os.path.exists(file_path) else Dataset() 

286 stored_g = store(data["triples"], data["graph_identifier"], stored_g) 

287 store_in_file(stored_g, file_path, zip_output) 

288 return triples 

289 

290def merge_files(output_root, base_file_name, file_extension, zip_output): 

291 """Funzione per fondere i file generati dai diversi processi""" 

292 files_to_merge = [f for f in os.listdir(output_root) if f.startswith(base_file_name) and f.endswith(file_extension)] 

293 

294 merged_graph = Dataset() 

295 

296 for file_path in files_to_merge: 

297 cur_full_path = os.path.join(output_root, file_path) 

298 loaded_graph = load_graph(cur_full_path) 

299 merged_graph += loaded_graph 

300 

301 final_file_path = os.path.join(output_root, base_file_name + file_extension) 

302 store_in_file(merged_graph, final_file_path, zip_output) # type: ignore[arg-type] 

303 

304def merge_files_in_directory(directory, zip_output, stop_file): 

305 """Function to merge files in a specific directory""" 

306 if check_stop_file(stop_file): 

307 logging.info("Stop file detected. Stopping merge process.") 

308 return 

309 

310 files = [f for f in os.listdir(directory) if f.endswith('.zip' if zip_output else '.json')] 

311 

312 # Group files by their base name (number without the unique ID) 

313 file_groups = {} 

314 for file in files: 

315 match = re.match(r'^((?:\d+)|(?:se))(?:_[^.]+)?\.', file) 

316 if match: 

317 base_name = match.group(1) 

318 if base_name not in file_groups: 

319 file_groups[base_name] = [] 

320 file_groups[base_name].append(file) 

321 

322 for base_file_name, files_to_merge in file_groups.items(): 

323 if check_stop_file(stop_file): 

324 logging.info("Stop file detected. Stopping merge process.") 

325 return 

326 

327 # Only proceed with merging if there's at least one file with an underscore 

328 if not any('_' in file for file in files_to_merge): 

329 continue 

330 

331 merged_graph = Dataset() 

332 

333 for file_path in files_to_merge: 

334 cur_full_path = os.path.join(directory, file_path) 

335 loaded_graph = load_graph(cur_full_path) 

336 for context in loaded_graph.graphs(): 

337 graph_identifier = context.identifier 

338 for triple in context: 

339 merged_graph.add(triple + (graph_identifier,)) # type: ignore[arg-type] 

340 

341 final_file_path = os.path.join(directory, f"{base_file_name}" + ('.zip' if zip_output else '.json')) 

342 store_in_file(merged_graph, final_file_path, zip_output) 

343 

344 # Remove the original files after merging 

345 for file_path in files_to_merge: 

346 if file_path != os.path.basename(final_file_path): 

347 os.remove(os.path.join(directory, file_path)) 

348 

349def generate_unique_id(): 

350 return f"{int(time.time())}-{uuid.uuid4()}" 

351 

352def merge_files_wrapper(args): 

353 directory, zip_output, stop_file = args 

354 merge_files_in_directory(directory, zip_output, stop_file) 

355 

356def merge_all_files_parallel(output_root, zip_output, stop_file): 

357 """Function to merge files in parallel""" 

358 if check_stop_file(stop_file): 

359 logging.info("Stop file detected. Stopping merge process.") 

360 return 

361 

362 directories_to_process = [] 

363 for root, dirs, files in os.walk(output_root): 

364 if any(f.endswith('.zip' if zip_output else '.json') for f in files): 

365 directories_to_process.append(root) 

366 

367 with multiprocessing.Pool() as pool: 

368 list(tqdm(pool.imap(merge_files_wrapper, 

369 [(dir, zip_output, stop_file) for dir in directories_to_process]), 

370 total=len(directories_to_process), 

371 desc="Merging files in directories")) 

372 

373def process_file_content(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format): 

374 with gzip.open(file_path, 'rb') as f: 

375 content = f.read() 

376 assert isinstance(content, bytes) 

377 data = content.decode('utf-8') 

378 graph = Dataset() 

379 try: 

380 graph.parse(data=data, format=rdf_format) 

381 except ParserError as e: 

382 logging.error(f"Failed to parse {file_path}: {e}") 

383 return 

384 

385 for context in graph.graphs(): 

386 graph_identifier = context.identifier 

387 process_graph(context, graph_identifier, output_root, base_iri, file_limit, item_limit, zip_output) 

388 

389def process_file_wrapper(args): 

390 file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format, cache_file, stop_file = args 

391 if check_stop_file(stop_file): 

392 return 

393 if not is_file_processed(file_path, cache_file): 

394 process_file_content(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format) 

395 mark_file_as_processed(file_path, cache_file) 

396 

397def process_chunk(chunk, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format, cache_file, stop_file): 

398 with multiprocessing.Pool() as pool: 

399 list(tqdm(pool.imap(process_file_wrapper, 

400 [(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format, cache_file, stop_file) 

401 for file_path in chunk]), 

402 total=len(chunk), 

403 desc="Processing files")) 

404 

405def create_cache_file(cache_file): 

406 if cache_file: 

407 if not os.path.exists(cache_file): 

408 with open(cache_file, 'w', encoding='utf8'): 

409 pass # Create an empty file 

410 else: 

411 logging.info("No cache file specified. Skipping cache creation.") 

412 

413def is_file_processed(file_path, cache_file): 

414 if not cache_file: 

415 return False 

416 with open(cache_file, 'r', encoding='utf8') as f: 

417 processed_files = f.read().splitlines() 

418 return file_path in processed_files 

419 

420def mark_file_as_processed(file_path, cache_file): 

421 if cache_file: 

422 with open(cache_file, 'a', encoding='utf8') as f: 

423 f.write(f"{file_path}\n") 

424 else: 

425 logging.debug(f"No cache file specified. Skipping marking {file_path} as processed.") 

426 

427def check_stop_file(stop_file): 

428 return os.path.exists(stop_file) 

429 

430def main(): 

431 parser = argparse.ArgumentParser( 

432 description="Process gzipped input files into OC Meta RDF", 

433 formatter_class=RichHelpFormatter, 

434 ) 

435 parser.add_argument('input_folder', type=str, help='Input folder containing gzipped input files') 

436 parser.add_argument('output_root', type=str, help='Root folder for output OC Meta RDF files') 

437 parser.add_argument('--base_iri', type=str, default='https://w3id.org/oc/meta/', help='The base URI of entities on Meta') 

438 parser.add_argument('--file_limit', type=int, default=10000, help='Number of files per folder') 

439 parser.add_argument('--item_limit', type=int, default=1000, help='Number of items per file') 

440 parser.add_argument('-v', '--zip_output', default=True, dest='zip_output', action='store_true', help='Zip output json files') 

441 parser.add_argument('--input_format', type=str, default='jsonld', choices=['jsonld', 'nquads'], help='Format of the input files') 

442 parser.add_argument('--chunk_size', type=int, default=1000, help='Number of files to process before merging') 

443 parser.add_argument('--cache_file', type=str, default=None, help='File to store processed file names (optional)') 

444 parser.add_argument('--stop_file', type=str, default='./.stop', help='File to signal process termination') 

445 args = parser.parse_args() 

446 

447 create_cache_file(args.cache_file) 

448 

449 file_extension = '.nq.gz' if args.input_format == 'nquads' else '.jsonld.gz' 

450 rdf_format = 'nquads' if args.input_format == 'nquads' else 'json-ld' 

451 

452 files_to_process = [os.path.join(args.input_folder, file) for file in os.listdir(args.input_folder) if file.endswith(file_extension)] 

453 chunks = [files_to_process[i:i + args.chunk_size] for i in range(0, len(files_to_process), args.chunk_size)] 

454 for i, chunk in enumerate(tqdm(chunks, desc="Processing chunks")): 

455 if check_stop_file(args.stop_file): 

456 logging.info("Stop file detected. Gracefully terminating the process.") 

457 break 

458 logging.info(f"Processing chunk {i+1}/{len(chunks)}") 

459 process_chunk(chunk, args.output_root, args.base_iri, args.file_limit, args.item_limit, args.zip_output, rdf_format, args.cache_file, args.stop_file) 

460 logging.info(f"Merging files for chunk {i+1}") 

461 merge_all_files_parallel(args.output_root, args.zip_output, args.stop_file) 

462 

463 logging.info("Processing complete") 

464 

465if __name__ == "__main__": 

466 main()