Coverage for oc_meta / run / migration / rdf_from_export.py: 0%

332 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 17:25 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2023 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE 

16 

17import argparse 

18import gzip 

19import logging 

20import multiprocessing 

21import os 

22import re 

23import time 

24import uuid 

25from functools import lru_cache 

26from zipfile import ZIP_DEFLATED, ZipFile 

27 

28import orjson 

29from rdflib import Dataset, URIRef 

30from rdflib.exceptions import ParserError 

31from rich_argparse import RichHelpFormatter 

32from tqdm import tqdm 

33 

34# Variable used in several functions 

35entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))$" 

36prov_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))/prov/([a-z][a-z])/([1-9][0-9]*)$" 

37 

38@lru_cache(maxsize=1024) 

39def _get_match_cached(regex: str, group: int, string: str) -> str: 

40 match = re.match(regex, string) 

41 if match is not None: 

42 return match.group(group) 

43 else: 

44 return "" 

45 

46def get_base_iri(res: URIRef) -> str: 

47 string_iri: str = str(res) 

48 if "/prov/" in string_iri: 

49 return _get_match_cached(prov_regex, 1, string_iri) 

50 else: 

51 return _get_match_cached(entity_regex, 1, string_iri) 

52 

53def get_short_name(res: URIRef) -> str: 

54 string_iri: str = str(res) 

55 if "/prov/" in string_iri: 

56 return _get_match_cached(prov_regex, 5, string_iri) 

57 else: 

58 return _get_match_cached(entity_regex, 2, string_iri) 

59 

60def get_prov_subject_short_name(prov_res: URIRef) -> str: 

61 string_iri: str = str(prov_res) 

62 if "/prov/" in string_iri: 

63 return _get_match_cached(prov_regex, 2, string_iri) 

64 else: 

65 return "" # non-provenance entities do not have a prov_subject! 

66 

67def get_prefix(res: URIRef) -> str: 

68 string_iri: str = str(res) 

69 if "/prov/" in string_iri: 

70 return "" # provenance entities cannot have a supplier prefix 

71 else: 

72 return _get_match_cached(entity_regex, 3, string_iri) 

73 

74def get_prov_subject_prefix(prov_res: URIRef) -> str: 

75 string_iri: str = str(prov_res) 

76 if "/prov/" in string_iri: 

77 return _get_match_cached(prov_regex, 3, string_iri) 

78 else: 

79 return "" # non-provenance entities do not have a prov_subject! 

80 

81def get_count(res: URIRef) -> str: 

82 string_iri: str = str(res) 

83 if "/prov/" in string_iri: 

84 return _get_match_cached(prov_regex, 6, string_iri) 

85 else: 

86 return _get_match_cached(entity_regex, 4, string_iri) 

87 

88def get_prov_subject_count(prov_res: URIRef) -> str: 

89 string_iri: str = str(prov_res) 

90 if "/prov/" in string_iri: 

91 return _get_match_cached(prov_regex, 4, string_iri) 

92 else: 

93 return "" # non-provenance entities do not have a prov_subject! 

94 

95def get_resource_number(res: URIRef) -> int: 

96 string_iri: str = str(res) 

97 if "/prov/" in string_iri: 

98 match = _get_match_cached(prov_regex, 4, string_iri) 

99 else: 

100 match = _get_match_cached(entity_regex, 4, string_iri) 

101 if not match: 

102 logging.warning(f"Could not extract resource number from URI: {string_iri}") 

103 return -1 # or some other default value 

104 try: 

105 return int(match) 

106 except ValueError: 

107 logging.error(f"Invalid resource number in URI: {string_iri}, extracted: {match}") 

108 return -1 # or some other default value 

109 

110def find_local_line_id(res: URIRef, n_file_item: int = 1) -> int: 

111 cur_number: int = get_resource_number(res) 

112 

113 cur_file_split: int = 0 

114 while True: 

115 if cur_number > cur_file_split: 

116 cur_file_split += n_file_item 

117 else: 

118 cur_file_split -= n_file_item 

119 break 

120 

121 return cur_number - cur_file_split 

122 

123def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_split: int, 

124 n_file_item: int, is_json: bool = True): 

125 """ 

126 This function is responsible for looking for the correct JSON file that contains the data related to the 

127 resource identified by the variable 'string_iri'. This search takes into account the organisation in 

128 directories and files, as well as the particular supplier prefix for bibliographic entities, if specified. 

129 In case no supplier prefix is specified, the 'default_dir' (usually set to "_") is used instead. 

130 """ 

131 string_iri: str = str(res) 

132 

133 cur_number: int = get_resource_number(res) 

134 

135 if cur_number == -1: 

136 # Handle the error case 

137 logging.error(f"Could not process URI: {string_iri}") 

138 return None, None # or some default paths 

139 

140 # Find the correct file number where to save the resources 

141 cur_file_split: int = 0 

142 while True: 

143 if cur_number > cur_file_split: 

144 cur_file_split += n_file_item 

145 else: 

146 break 

147 

148 # The data have been split in multiple directories and it is not something related 

149 # with the provenance data of the whole corpus (e.g. provenance agents) 

150 if dir_split and not string_iri.startswith(base_iri + "prov/"): 

151 # Find the correct directory number where to save the file 

152 cur_split: int = 0 

153 while True: 

154 if cur_number > cur_split: 

155 cur_split += dir_split 

156 else: 

157 break 

158 

159 if "/prov/" in string_iri: # provenance file of a bibliographic entity 

160 subj_short_name: str = get_prov_subject_short_name(res) 

161 short_name: str = get_short_name(res) 

162 sub_folder: str = get_prov_subject_prefix(res) 

163 file_extension: str = '.json' if is_json else '.nq' 

164 if sub_folder == "": 

165 sub_folder = default_dir 

166 if sub_folder == "": 

167 sub_folder = "_" # enforce default value 

168 

169 cur_dir_path: str = base_dir + subj_short_name + os.sep + sub_folder + \ 

170 os.sep + str(cur_split) + os.sep + str(cur_file_split) + os.sep + "prov" 

171 cur_file_path: str = cur_dir_path + os.sep + short_name + file_extension 

172 else: # regular bibliographic entity 

173 short_name: str = get_short_name(res) 

174 sub_folder: str = get_prefix(res) 

175 file_extension: str = '.json' if is_json else '.nt' 

176 if sub_folder == "": 

177 sub_folder = default_dir 

178 if sub_folder == "": 

179 sub_folder = "_" # enforce default value 

180 

181 cur_dir_path: str = base_dir + short_name + os.sep + sub_folder + os.sep + str(cur_split) 

182 cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + file_extension 

183 # Enter here if no split is needed 

184 elif dir_split == 0: 

185 if "/prov/" in string_iri: 

186 subj_short_name: str = get_prov_subject_short_name(res) 

187 short_name: str = get_short_name(res) 

188 sub_folder: str = get_prov_subject_prefix(res) 

189 file_extension: str = '.json' if is_json else '.nq' 

190 if sub_folder == "": 

191 sub_folder = default_dir 

192 if sub_folder == "": 

193 sub_folder = "_" # enforce default value 

194 

195 cur_dir_path: str = base_dir + subj_short_name + os.sep + sub_folder + \ 

196 os.sep + str(cur_file_split) + os.sep + "prov" 

197 cur_file_path: str = cur_dir_path + os.sep + short_name + file_extension 

198 else: 

199 short_name: str = get_short_name(res) 

200 sub_folder: str = get_prefix(res) 

201 file_extension: str = '.json' if is_json else '.nt' 

202 if sub_folder == "": 

203 sub_folder = default_dir 

204 if sub_folder == "": 

205 sub_folder = "_" # enforce default value 

206 

207 cur_dir_path: str = base_dir + short_name + os.sep + sub_folder 

208 cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + file_extension 

209 # Enter here if the data is about a provenance agent, e.g. /corpus/prov/ 

210 else: 

211 short_name: str = get_short_name(res) 

212 prefix: str = get_prefix(res) 

213 count: str = get_count(res) 

214 file_extension: str = '.json' if is_json else '.nq' 

215 

216 cur_dir_path: str = base_dir + short_name 

217 cur_file_path: str = cur_dir_path + os.sep + prefix + count + file_extension 

218 

219 return cur_dir_path, cur_file_path 

220 

221def store(triples, graph_identifier, stored_g: Dataset) -> Dataset: 

222 for triple in triples: 

223 stored_g.add((triple[0], triple[1], triple[2], graph_identifier)) 

224 return stored_g 

225 

226def store_in_file(cur_g: Dataset, cur_file_path: str, zip_output: bool) -> None: 

227 dir_path = os.path.dirname(cur_file_path) 

228 if not os.path.exists(dir_path): 

229 os.makedirs(dir_path, exist_ok=True) 

230 

231 cur_json_ld = orjson.loads(cur_g.serialize(format="json-ld")) 

232 

233 if zip_output: 

234 with ZipFile(cur_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) as zip_file: 

235 json_str = orjson.dumps(cur_json_ld).decode('utf-8') 

236 zip_file.writestr(os.path.basename(cur_file_path.replace('.zip', '.json')), json_str) 

237 else: 

238 with open(cur_file_path, 'wb') as f: 

239 f.write(orjson.dumps(cur_json_ld)) 

240 

241def load_graph(file_path: str, cur_format: str = 'json-ld'): 

242 loaded_graph = Dataset(default_union=True) 

243 if file_path.endswith('.zip'): 

244 with ZipFile(file=file_path, mode="r", compression=ZIP_DEFLATED, allowZip64=True) as archive: 

245 for zf_name in archive.namelist(): 

246 with archive.open(zf_name) as f: 

247 if cur_format == "json-ld": 

248 json_ld_file = orjson.loads(f.read()) 

249 if isinstance(json_ld_file, dict): 

250 json_ld_file = [json_ld_file] 

251 for json_ld_resource in json_ld_file: 

252 loaded_graph.parse(data=orjson.dumps(json_ld_resource).decode('utf-8'), format=cur_format) 

253 else: 

254 loaded_graph.parse(file=f, format=cur_format) # type: ignore[arg-type] 

255 else: 

256 with open(file_path, 'rb') as f: 

257 if cur_format == "json-ld": 

258 json_ld_file = orjson.loads(f.read()) 

259 if isinstance(json_ld_file, dict): 

260 json_ld_file = [json_ld_file] 

261 for json_ld_resource in json_ld_file: 

262 loaded_graph.parse(data=orjson.dumps(json_ld_resource).decode('utf-8'), format=cur_format) 

263 else: 

264 loaded_graph.parse(file=f, format=cur_format) # type: ignore[arg-type] 

265 

266 return loaded_graph 

267 

268def process_graph(context, graph_identifier, output_root, base_iri, file_limit, item_limit, zip_output): 

269 modifications_by_file = {} 

270 triples = 0 

271 unique_id = generate_unique_id() 

272 

273 for triple in context: 

274 triples += len(triple) 

275 entity_uri = triple[0] 

276 _, cur_file_path = find_paths(entity_uri, output_root, base_iri, '_', file_limit, item_limit, True) 

277 if cur_file_path is None: 

278 logging.warning(f"Skipping triple due to invalid URI: {entity_uri}") 

279 continue 

280 

281 # Estrai il nome base del file (numero) e aggiungi l'ID unico 

282 base_name = os.path.splitext(os.path.basename(cur_file_path))[0] 

283 new_file_name = f"{base_name}_{unique_id}" 

284 

285 cur_file_path = os.path.join(os.path.dirname(cur_file_path), new_file_name + ('.zip' if zip_output else '.json')) 

286 

287 if cur_file_path not in modifications_by_file: 

288 modifications_by_file[cur_file_path] = { 

289 "graph_identifier": graph_identifier, 

290 "triples": [] 

291 } 

292 modifications_by_file[cur_file_path]["triples"].append(triple) 

293 

294 for file_path, data in modifications_by_file.items(): 

295 stored_g = load_graph(file_path) if os.path.exists(file_path) else Dataset() 

296 stored_g = store(data["triples"], data["graph_identifier"], stored_g) 

297 store_in_file(stored_g, file_path, zip_output) 

298 return triples 

299 

300def merge_files(output_root, base_file_name, file_extension, zip_output): 

301 """Funzione per fondere i file generati dai diversi processi""" 

302 files_to_merge = [f for f in os.listdir(output_root) if f.startswith(base_file_name) and f.endswith(file_extension)] 

303 

304 merged_graph = Dataset() 

305 

306 for file_path in files_to_merge: 

307 cur_full_path = os.path.join(output_root, file_path) 

308 loaded_graph = load_graph(cur_full_path) 

309 merged_graph += loaded_graph 

310 

311 final_file_path = os.path.join(output_root, base_file_name + file_extension) 

312 store_in_file(merged_graph, final_file_path, zip_output) # type: ignore[arg-type] 

313 

314def merge_files_in_directory(directory, zip_output, stop_file): 

315 """Function to merge files in a specific directory""" 

316 if check_stop_file(stop_file): 

317 logging.info("Stop file detected. Stopping merge process.") 

318 return 

319 

320 files = [f for f in os.listdir(directory) if f.endswith('.zip' if zip_output else '.json')] 

321 

322 # Group files by their base name (number without the unique ID) 

323 file_groups = {} 

324 for file in files: 

325 match = re.match(r'^((?:\d+)|(?:se))(?:_[^.]+)?\.', file) 

326 if match: 

327 base_name = match.group(1) 

328 if base_name not in file_groups: 

329 file_groups[base_name] = [] 

330 file_groups[base_name].append(file) 

331 

332 for base_file_name, files_to_merge in file_groups.items(): 

333 if check_stop_file(stop_file): 

334 logging.info("Stop file detected. Stopping merge process.") 

335 return 

336 

337 # Only proceed with merging if there's at least one file with an underscore 

338 if not any('_' in file for file in files_to_merge): 

339 continue 

340 

341 merged_graph = Dataset() 

342 

343 for file_path in files_to_merge: 

344 cur_full_path = os.path.join(directory, file_path) 

345 loaded_graph = load_graph(cur_full_path) 

346 for context in loaded_graph.graphs(): 

347 graph_identifier = context.identifier 

348 for triple in context: 

349 merged_graph.add(triple + (graph_identifier,)) # type: ignore[arg-type] 

350 

351 final_file_path = os.path.join(directory, f"{base_file_name}" + ('.zip' if zip_output else '.json')) 

352 store_in_file(merged_graph, final_file_path, zip_output) 

353 

354 # Remove the original files after merging 

355 for file_path in files_to_merge: 

356 if file_path != os.path.basename(final_file_path): 

357 os.remove(os.path.join(directory, file_path)) 

358 

359def generate_unique_id(): 

360 return f"{int(time.time())}-{uuid.uuid4()}" 

361 

362def merge_files_wrapper(args): 

363 directory, zip_output, stop_file = args 

364 merge_files_in_directory(directory, zip_output, stop_file) 

365 

366def merge_all_files_parallel(output_root, zip_output, stop_file): 

367 """Function to merge files in parallel""" 

368 if check_stop_file(stop_file): 

369 logging.info("Stop file detected. Stopping merge process.") 

370 return 

371 

372 directories_to_process = [] 

373 for root, dirs, files in os.walk(output_root): 

374 if any(f.endswith('.zip' if zip_output else '.json') for f in files): 

375 directories_to_process.append(root) 

376 

377 with multiprocessing.Pool() as pool: 

378 list(tqdm(pool.imap(merge_files_wrapper, 

379 [(dir, zip_output, stop_file) for dir in directories_to_process]), 

380 total=len(directories_to_process), 

381 desc="Merging files in directories")) 

382 

383def process_file_content(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format): 

384 with gzip.open(file_path, 'rb') as f: 

385 content = f.read() 

386 assert isinstance(content, bytes) 

387 data = content.decode('utf-8') 

388 graph = Dataset() 

389 try: 

390 graph.parse(data=data, format=rdf_format) 

391 except ParserError as e: 

392 logging.error(f"Failed to parse {file_path}: {e}") 

393 return 

394 

395 for context in graph.graphs(): 

396 graph_identifier = context.identifier 

397 process_graph(context, graph_identifier, output_root, base_iri, file_limit, item_limit, zip_output) 

398 

399def process_file_wrapper(args): 

400 file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format, cache_file, stop_file = args 

401 if check_stop_file(stop_file): 

402 return 

403 if not is_file_processed(file_path, cache_file): 

404 process_file_content(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format) 

405 mark_file_as_processed(file_path, cache_file) 

406 

407def process_chunk(chunk, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format, cache_file, stop_file): 

408 with multiprocessing.Pool() as pool: 

409 list(tqdm(pool.imap(process_file_wrapper, 

410 [(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format, cache_file, stop_file) 

411 for file_path in chunk]), 

412 total=len(chunk), 

413 desc="Processing files")) 

414 

415def create_cache_file(cache_file): 

416 if cache_file: 

417 if not os.path.exists(cache_file): 

418 with open(cache_file, 'w', encoding='utf8'): 

419 pass # Create an empty file 

420 else: 

421 logging.info("No cache file specified. Skipping cache creation.") 

422 

423def is_file_processed(file_path, cache_file): 

424 if not cache_file: 

425 return False 

426 with open(cache_file, 'r', encoding='utf8') as f: 

427 processed_files = f.read().splitlines() 

428 return file_path in processed_files 

429 

430def mark_file_as_processed(file_path, cache_file): 

431 if cache_file: 

432 with open(cache_file, 'a', encoding='utf8') as f: 

433 f.write(f"{file_path}\n") 

434 else: 

435 logging.debug(f"No cache file specified. Skipping marking {file_path} as processed.") 

436 

437def check_stop_file(stop_file): 

438 return os.path.exists(stop_file) 

439 

440def main(): 

441 parser = argparse.ArgumentParser( 

442 description="Process gzipped input files into OC Meta RDF", 

443 formatter_class=RichHelpFormatter, 

444 ) 

445 parser.add_argument('input_folder', type=str, help='Input folder containing gzipped input files') 

446 parser.add_argument('output_root', type=str, help='Root folder for output OC Meta RDF files') 

447 parser.add_argument('--base_iri', type=str, default='https://w3id.org/oc/meta/', help='The base URI of entities on Meta') 

448 parser.add_argument('--file_limit', type=int, default=10000, help='Number of files per folder') 

449 parser.add_argument('--item_limit', type=int, default=1000, help='Number of items per file') 

450 parser.add_argument('-v', '--zip_output', default=True, dest='zip_output', action='store_true', help='Zip output json files') 

451 parser.add_argument('--input_format', type=str, default='jsonld', choices=['jsonld', 'nquads'], help='Format of the input files') 

452 parser.add_argument('--chunk_size', type=int, default=1000, help='Number of files to process before merging') 

453 parser.add_argument('--cache_file', type=str, default=None, help='File to store processed file names (optional)') 

454 parser.add_argument('--stop_file', type=str, default='./.stop', help='File to signal process termination') 

455 args = parser.parse_args() 

456 

457 create_cache_file(args.cache_file) 

458 

459 file_extension = '.nq.gz' if args.input_format == 'nquads' else '.jsonld.gz' 

460 rdf_format = 'nquads' if args.input_format == 'nquads' else 'json-ld' 

461 

462 files_to_process = [os.path.join(args.input_folder, file) for file in os.listdir(args.input_folder) if file.endswith(file_extension)] 

463 chunks = [files_to_process[i:i + args.chunk_size] for i in range(0, len(files_to_process), args.chunk_size)] 

464 for i, chunk in enumerate(tqdm(chunks, desc="Processing chunks")): 

465 if check_stop_file(args.stop_file): 

466 logging.info("Stop file detected. Gracefully terminating the process.") 

467 break 

468 logging.info(f"Processing chunk {i+1}/{len(chunks)}") 

469 process_chunk(chunk, args.output_root, args.base_iri, args.file_limit, args.item_limit, args.zip_output, rdf_format, args.cache_file, args.stop_file) 

470 logging.info(f"Merging files for chunk {i+1}") 

471 merge_all_files_parallel(args.output_root, args.zip_output, args.stop_file) 

472 

473 logging.info("Processing complete") 

474 

475if __name__ == "__main__": 

476 main()