Coverage for oc_meta / run / meta / generate_csv.py: 35%

394 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7from __future__ import annotations 

8 

9import csv 

10import os 

11import re 

12from argparse import ArgumentParser 

13from functools import lru_cache 

14import multiprocessing 

15from typing import Dict, List, Optional, Tuple 

16from zipfile import ZipFile 

17 

18import orjson 

19import redis 

20import yaml 

21 

22from oc_meta.lib.console import create_progress 

23from oc_meta.lib.file_manager import collect_zip_files 

24 

25csv.field_size_limit(2**31 - 1) 

26 

27FIELDNAMES = [ 

28 "id", 

29 "title", 

30 "author", 

31 "issue", 

32 "volume", 

33 "venue", 

34 "page", 

35 "pub_date", 

36 "type", 

37 "publisher", 

38 "editor", 

39] 

40 

41URI_TYPE_DICT = { 

42 "http://purl.org/spar/doco/Abstract": "abstract", 

43 "http://purl.org/spar/fabio/ArchivalDocument": "archival document", 

44 "http://purl.org/spar/fabio/AudioDocument": "audio document", 

45 "http://purl.org/spar/fabio/Book": "book", 

46 "http://purl.org/spar/fabio/BookChapter": "book chapter", 

47 "http://purl.org/spar/fabio/ExpressionCollection": "book section", 

48 "http://purl.org/spar/fabio/BookSeries": "book series", 

49 "http://purl.org/spar/fabio/BookSet": "book set", 

50 "http://purl.org/spar/fabio/ComputerProgram": "computer program", 

51 "http://purl.org/spar/doco/Part": "book part", 

52 "http://purl.org/spar/fabio/Expression": "", 

53 "http://purl.org/spar/fabio/DataFile": "dataset", 

54 "http://purl.org/spar/fabio/DataManagementPlan": "data management plan", 

55 "http://purl.org/spar/fabio/Thesis": "dissertation", 

56 "http://purl.org/spar/fabio/Editorial": "editorial", 

57 "http://purl.org/spar/fabio/Journal": "journal", 

58 "http://purl.org/spar/fabio/JournalArticle": "journal article", 

59 "http://purl.org/spar/fabio/JournalEditorial": "journal editorial", 

60 "http://purl.org/spar/fabio/JournalIssue": "journal issue", 

61 "http://purl.org/spar/fabio/JournalVolume": "journal volume", 

62 "http://purl.org/spar/fabio/Newspaper": "newspaper", 

63 "http://purl.org/spar/fabio/NewspaperArticle": "newspaper article", 

64 "http://purl.org/spar/fabio/NewspaperIssue": "newspaper issue", 

65 "http://purl.org/spar/fr/ReviewVersion": "peer review", 

66 "http://purl.org/spar/fabio/AcademicProceedings": "proceedings", 

67 "http://purl.org/spar/fabio/Preprint": "preprint", 

68 "http://purl.org/spar/fabio/Presentation": "presentation", 

69 "http://purl.org/spar/fabio/ProceedingsPaper": "proceedings article", 

70 "http://purl.org/spar/fabio/ReferenceBook": "reference book", 

71 "http://purl.org/spar/fabio/ReferenceEntry": "reference entry", 

72 "http://purl.org/spar/fabio/ReportDocument": "report", 

73 "http://purl.org/spar/fabio/RetractionNotice": "retraction notice", 

74 "http://purl.org/spar/fabio/Series": "series", 

75 "http://purl.org/spar/fabio/SpecificationDocument": "standard", 

76 "http://purl.org/spar/fabio/WebContent": "web content", 

77} 

78 

79_worker_redis: Optional[redis.Redis] = None 

80_worker_config: Optional[Tuple[str, int, int]] = None 

81 

82 

83def _init_worker( 

84 redis_host: str, redis_port: int, redis_db: int, input_dir: str, dir_split_number: int, items_per_file: int 

85) -> None: 

86 global _worker_redis, _worker_config 

87 _worker_redis = redis.Redis(host=redis_host, port=redis_port, db=redis_db, decode_responses=True) 

88 _worker_config = (input_dir, dir_split_number, items_per_file) 

89 

90 

91def _process_file_worker(filepath: str) -> Tuple[str, List[Dict[str, str]]]: 

92 assert _worker_redis is not None and _worker_config is not None 

93 input_dir, dir_split_number, items_per_file = _worker_config 

94 results = [] 

95 data = load_json_from_file(filepath) 

96 for graph in data: 

97 for entity in graph.get("@graph", []): 

98 entity_types = entity.get("@type", []) 

99 if ( 

100 "http://purl.org/spar/fabio/JournalVolume" in entity_types 

101 or "http://purl.org/spar/fabio/JournalIssue" in entity_types 

102 ): 

103 continue 

104 entity_id = entity.get("@id", "") 

105 if entity_id: 

106 omid = f"omid:br/{entity_id.split('/')[-1]}" 

107 if _worker_redis.sismember("processed_omids", omid): 

108 continue 

109 br_data = process_bibliographic_resource(entity, input_dir, dir_split_number, items_per_file) 

110 if br_data: 

111 results.append(br_data) 

112 return (filepath, results) 

113 

114 

115def init_redis_connection( 

116 host: str = "localhost", port: int = 6379, db: int = 2 

117) -> redis.Redis: 

118 client = redis.Redis(host=host, port=port, db=db, decode_responses=True) 

119 client.ping() 

120 return client 

121 

122 

123def is_omid_processed(omid: str, redis_client: redis.Redis) -> bool: 

124 return bool(redis_client.sismember("processed_omids", omid)) 

125 

126 

127def load_processed_omids_to_redis(output_dir: str, redis_client: redis.Redis) -> int: 

128 redis_client.delete("processed_omids") 

129 

130 if not os.path.exists(output_dir): 

131 return 0 

132 

133 csv.field_size_limit(2**31 - 1) 

134 

135 count = 0 

136 BATCH_SIZE = 1000 

137 csv_files = [f for f in os.listdir(output_dir) if f.endswith(".csv")] 

138 

139 with create_progress() as progress: 

140 task = progress.add_task("Loading existing identifiers", total=len(csv_files)) 

141 

142 for filename in csv_files: 

143 filepath = os.path.join(output_dir, filename) 

144 with open(filepath, "r", encoding="utf-8") as f: 

145 reader = csv.DictReader(f) 

146 batch_pipe = redis_client.pipeline() 

147 batch_count = 0 

148 

149 for row in reader: 

150 omids = [ 

151 id_part.strip() 

152 for id_part in row["id"].split() 

153 if id_part.startswith("omid:br/") 

154 ] 

155 for omid in omids: 

156 batch_pipe.sadd("processed_omids", omid) 

157 batch_count += 1 

158 count += 1 

159 

160 if batch_count >= BATCH_SIZE: 

161 batch_pipe.execute() 

162 batch_pipe = redis_client.pipeline() 

163 batch_count = 0 

164 

165 if batch_count > 0: 

166 batch_pipe.execute() 

167 

168 progress.update(task, advance=1) 

169 

170 return count 

171 

172 

173def load_checkpoint(checkpoint_file: str) -> set: 

174 if not os.path.exists(checkpoint_file): 

175 return set() 

176 with open(checkpoint_file, "r") as f: 

177 return set(line.strip() for line in f if line.strip()) 

178 

179 

180def mark_file_processed(checkpoint_file: str, filepath: str) -> None: 

181 with open(checkpoint_file, "a") as f: 

182 f.write(filepath + "\n") 

183 

184 

185def find_file( 

186 rdf_dir: str, dir_split_number: int, items_per_file: int, uri: str 

187) -> Optional[str]: 

188 entity_regex: str = ( 

189 r"^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$" 

190 ) 

191 entity_match = re.match(entity_regex, uri) 

192 if entity_match: 

193 cur_number = int(entity_match.group(4)) 

194 cur_file_split = ( 

195 (cur_number - 1) // items_per_file 

196 ) * items_per_file + items_per_file 

197 cur_split = ( 

198 (cur_number - 1) // dir_split_number 

199 ) * dir_split_number + dir_split_number 

200 

201 short_name = entity_match.group(2) 

202 sub_folder = entity_match.group(3) or "" 

203 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split)) 

204 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + ".zip" 

205 

206 return cur_file_path if os.path.exists(cur_file_path) else None 

207 return None 

208 

209 

210@lru_cache(maxsize=2000) 

211def load_json_from_file(filepath: str) -> list: 

212 with ZipFile(filepath, "r") as zip_file: 

213 json_filename = zip_file.namelist()[0] 

214 with zip_file.open(json_filename) as json_file: 

215 return orjson.loads(json_file.read()) 

216 

217 

218def process_identifier(id_data: dict) -> Optional[str]: 

219 try: 

220 id_schema = id_data["http://purl.org/spar/datacite/usesIdentifierScheme"][0][ 

221 "@id" 

222 ].split("/datacite/")[1] 

223 literal_value = id_data[ 

224 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue" 

225 ][0]["@value"] 

226 return f"{id_schema}:{literal_value}" 

227 except (KeyError, IndexError): 

228 return None 

229 

230 

231def process_responsible_agent( 

232 ra_data: dict, ra_uri: str, rdf_dir: str, dir_split_number: int, items_per_file: int 

233) -> Optional[str]: 

234 try: 

235 family_name = ra_data.get("http://xmlns.com/foaf/0.1/familyName", [{}])[0].get( 

236 "@value", "" 

237 ) 

238 given_name = ra_data.get("http://xmlns.com/foaf/0.1/givenName", [{}])[0].get( 

239 "@value", "" 

240 ) 

241 foaf_name = ra_data.get("http://xmlns.com/foaf/0.1/name", [{}])[0].get( 

242 "@value", "" 

243 ) 

244 

245 if family_name or given_name: 

246 if family_name and given_name: 

247 name = f"{family_name}, {given_name}" 

248 elif family_name: 

249 name = f"{family_name}," 

250 else: 

251 name = f", {given_name}" 

252 elif foaf_name: 

253 name = foaf_name 

254 else: 

255 return None 

256 

257 omid = ra_uri.split("/")[-1] 

258 identifiers = [f"omid:ra/{omid}"] 

259 

260 if "http://purl.org/spar/datacite/hasIdentifier" in ra_data: 

261 for identifier in ra_data["http://purl.org/spar/datacite/hasIdentifier"]: 

262 id_uri = identifier["@id"] 

263 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri) 

264 if id_file: 

265 id_data = load_json_from_file(id_file) 

266 for graph in id_data: 

267 for entity in graph.get("@graph", []): 

268 if entity["@id"] == id_uri: 

269 id_value = process_identifier(entity) 

270 if id_value: 

271 identifiers.append(id_value) 

272 

273 if identifiers: 

274 return f"{name} [{' '.join(identifiers)}]" 

275 return name 

276 except (KeyError, IndexError): 

277 return None 

278 

279 

280def process_venue_title( 

281 venue_data: dict, 

282 venue_uri: str, 

283 rdf_dir: str, 

284 dir_split_number: int, 

285 items_per_file: int, 

286) -> str: 

287 venue_title = venue_data.get("http://purl.org/dc/terms/title", [{}])[0].get( 

288 "@value", "" 

289 ) 

290 if not venue_title: 

291 return "" 

292 

293 omid = venue_uri.split("/")[-1] 

294 identifiers = [f"omid:br/{omid}"] 

295 

296 if "http://purl.org/spar/datacite/hasIdentifier" in venue_data: 

297 for identifier in venue_data["http://purl.org/spar/datacite/hasIdentifier"]: 

298 id_uri = identifier["@id"] 

299 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri) 

300 if id_file: 

301 id_data = load_json_from_file(id_file) 

302 for graph in id_data: 

303 for entity in graph.get("@graph", []): 

304 if entity["@id"] == id_uri: 

305 id_value = process_identifier(entity) 

306 if id_value: 

307 identifiers.append(id_value) 

308 

309 return f"{venue_title} [{' '.join(identifiers)}]" if identifiers else venue_title 

310 

311 

312def process_hierarchical_venue( 

313 entity: dict, 

314 rdf_dir: str, 

315 dir_split_number: int, 

316 items_per_file: int, 

317 visited: Optional[set] = None, 

318 depth: int = 0, 

319) -> Dict[str, str]: 

320 result = {"volume": "", "issue": "", "venue": ""} 

321 

322 if visited is None: 

323 visited = set() 

324 

325 entity_id = entity.get("@id", "") 

326 if entity_id in visited or depth > 5: 

327 print(f"Warning: Cycle detected in venue hierarchy at: {entity_id}") 

328 return result 

329 visited.add(entity_id) 

330 

331 entity_types = entity.get("@type", []) 

332 

333 if "http://purl.org/spar/fabio/JournalIssue" in entity_types: 

334 result["issue"] = entity.get( 

335 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}] 

336 )[0].get("@value", "") 

337 elif "http://purl.org/spar/fabio/JournalVolume" in entity_types: 

338 result["volume"] = entity.get( 

339 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}] 

340 )[0].get("@value", "") 

341 else: 

342 result["venue"] = process_venue_title( 

343 entity, entity["@id"], rdf_dir, dir_split_number, items_per_file 

344 ) 

345 return result 

346 

347 if "http://purl.org/vocab/frbr/core#partOf" in entity: 

348 parent_uri = entity["http://purl.org/vocab/frbr/core#partOf"][0]["@id"] 

349 parent_file = find_file(rdf_dir, dir_split_number, items_per_file, parent_uri) 

350 if parent_file: 

351 parent_data = load_json_from_file(parent_file) 

352 for graph in parent_data: 

353 for parent_entity in graph.get("@graph", []): 

354 if parent_entity["@id"] == parent_uri: 

355 parent_info = process_hierarchical_venue( 

356 parent_entity, 

357 rdf_dir, 

358 dir_split_number, 

359 items_per_file, 

360 visited, 

361 depth + 1, 

362 ) 

363 for key, value in parent_info.items(): 

364 if not result[key]: 

365 result[key] = value 

366 

367 return result 

368 

369 

370def find_first_ar_by_role( 

371 agent_roles: Dict, next_relations: Dict, role_type: str 

372) -> Optional[str]: 

373 role_ars = { 

374 ar_uri: ar_data 

375 for ar_uri, ar_data in agent_roles.items() 

376 if role_type 

377 in ar_data.get("http://purl.org/spar/pro/withRole", [{}])[0].get("@id", "") 

378 } 

379 

380 role_next_relations = { 

381 ar_uri: next_ar 

382 for ar_uri, next_ar in next_relations.items() 

383 if ar_uri in role_ars and next_ar in role_ars 

384 } 

385 

386 referenced_ars = set(role_next_relations.values()) 

387 for ar_uri in role_ars: 

388 if ar_uri not in referenced_ars: 

389 return ar_uri 

390 

391 return next(iter(role_ars)) if role_ars else None 

392 

393 

394def process_bibliographic_resource( 

395 br_data: dict, rdf_dir: str, dir_split_number: int, items_per_file: int 

396) -> Optional[Dict[str, str]]: 

397 br_types = br_data.get("@type", []) 

398 if ( 

399 "http://purl.org/spar/fabio/JournalVolume" in br_types 

400 or "http://purl.org/spar/fabio/JournalIssue" in br_types 

401 ): 

402 return None 

403 

404 output = {field: "" for field in FIELDNAMES} 

405 

406 try: 

407 entity_id = br_data.get("@id", "") 

408 identifiers = [f'omid:br/{entity_id.split("/")[-1]}'] if entity_id else [] 

409 

410 output["title"] = br_data.get("http://purl.org/dc/terms/title", [{}])[0].get( 

411 "@value", "" 

412 ) 

413 output["pub_date"] = br_data.get( 

414 "http://prismstandard.org/namespaces/basic/2.0/publicationDate", [{}] 

415 )[0].get("@value", "") 

416 

417 br_types = [ 

418 t 

419 for t in br_data.get("@type", []) 

420 if t != "http://purl.org/spar/fabio/Expression" 

421 ] 

422 output["type"] = URI_TYPE_DICT.get(br_types[0], "") if br_types else "" 

423 

424 if "http://purl.org/spar/datacite/hasIdentifier" in br_data: 

425 for identifier in br_data["http://purl.org/spar/datacite/hasIdentifier"]: 

426 id_uri = identifier["@id"] 

427 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri) 

428 if id_file: 

429 id_data = load_json_from_file(id_file) 

430 for graph in id_data: 

431 for entity in graph.get("@graph", []): 

432 if entity["@id"] == id_uri: 

433 id_value = process_identifier(entity) 

434 if id_value: 

435 identifiers.append(id_value) 

436 output["id"] = " ".join(identifiers) 

437 

438 authors = [] 

439 editors = [] 

440 publishers = [] 

441 agent_roles = {} 

442 next_relations = {} 

443 

444 if "http://purl.org/spar/pro/isDocumentContextFor" in br_data: 

445 for ar_data in br_data["http://purl.org/spar/pro/isDocumentContextFor"]: 

446 ar_uri = ar_data["@id"] 

447 ar_file = find_file(rdf_dir, dir_split_number, items_per_file, ar_uri) 

448 if ar_file: 

449 ar_data = load_json_from_file(ar_file) 

450 for graph in ar_data: 

451 for entity in graph.get("@graph", []): 

452 if entity["@id"] == ar_uri: 

453 agent_roles[ar_uri] = entity 

454 if "https://w3id.org/oc/ontology/hasNext" in entity: 

455 next_ar = entity[ 

456 "https://w3id.org/oc/ontology/hasNext" 

457 ][0]["@id"] 

458 next_relations[ar_uri] = next_ar 

459 

460 for role_type, role_list in [ 

461 ("author", authors), 

462 ("editor", editors), 

463 ("publisher", publishers), 

464 ]: 

465 first_ar = find_first_ar_by_role(agent_roles, next_relations, role_type) 

466 if not first_ar: 

467 continue 

468 

469 current_ar = first_ar 

470 processed_ars = set() 

471 max_iterations = len(agent_roles) 

472 iterations = 0 

473 

474 while current_ar and current_ar in agent_roles: 

475 if current_ar in processed_ars or iterations >= max_iterations: 

476 print( 

477 f"Warning: Detected cycle in hasNext relations or exceeded maximum iterations at AR: {current_ar}" 

478 ) 

479 break 

480 

481 processed_ars.add(current_ar) 

482 iterations += 1 

483 

484 entity = agent_roles[current_ar] 

485 role = entity.get("http://purl.org/spar/pro/withRole", [{}])[0].get( 

486 "@id", "" 

487 ) 

488 

489 if role_type in role: 

490 if "http://purl.org/spar/pro/isHeldBy" in entity: 

491 ra_uri = entity["http://purl.org/spar/pro/isHeldBy"][0][ 

492 "@id" 

493 ] 

494 ra_file = find_file( 

495 rdf_dir, dir_split_number, items_per_file, ra_uri 

496 ) 

497 if ra_file: 

498 ra_data = load_json_from_file(ra_file) 

499 for ra_graph in ra_data: 

500 for ra_entity in ra_graph.get("@graph", []): 

501 if ra_entity["@id"] == ra_uri: 

502 agent_name = process_responsible_agent( 

503 ra_entity, 

504 ra_uri, 

505 rdf_dir, 

506 dir_split_number, 

507 items_per_file, 

508 ) 

509 if agent_name: 

510 role_list.append(agent_name) 

511 

512 current_ar = next_relations.get(current_ar) 

513 

514 output["author"] = "; ".join(authors) 

515 output["editor"] = "; ".join(editors) 

516 output["publisher"] = "; ".join(publishers) 

517 

518 if "http://purl.org/vocab/frbr/core#partOf" in br_data: 

519 venue_uri = br_data["http://purl.org/vocab/frbr/core#partOf"][0]["@id"] 

520 venue_file = find_file(rdf_dir, dir_split_number, items_per_file, venue_uri) 

521 if venue_file: 

522 venue_data = load_json_from_file(venue_file) 

523 for graph in venue_data: 

524 for entity in graph.get("@graph", []): 

525 if entity["@id"] == venue_uri: 

526 venue_info = process_hierarchical_venue( 

527 entity, rdf_dir, dir_split_number, items_per_file 

528 ) 

529 output.update(venue_info) 

530 

531 if "http://purl.org/vocab/frbr/core#embodiment" in br_data: 

532 page_uri = br_data["http://purl.org/vocab/frbr/core#embodiment"][0]["@id"] 

533 page_file = find_file(rdf_dir, dir_split_number, items_per_file, page_uri) 

534 if page_file: 

535 page_data = load_json_from_file(page_file) 

536 for graph in page_data: 

537 for entity in graph.get("@graph", []): 

538 if entity["@id"] == page_uri: 

539 start_page = entity.get( 

540 "http://prismstandard.org/namespaces/basic/2.0/startingPage", 

541 [{}], 

542 )[0].get("@value", "") 

543 end_page = entity.get( 

544 "http://prismstandard.org/namespaces/basic/2.0/endingPage", 

545 [{}], 

546 )[0].get("@value", "") 

547 if start_page or end_page: 

548 output["page"] = f"{start_page}-{end_page}" 

549 

550 except Exception as e: 

551 print(f"Error processing bibliographic resource: {type(e).__name__}: {e}") 

552 

553 return output 

554 

555 

556class ResultBuffer: 

557 def __init__(self, output_dir: str, max_rows: int = 3000): 

558 self.buffer = [] 

559 self.output_dir = output_dir 

560 self.max_rows = max_rows 

561 self.file_counter = self._get_last_file_number() + 1 

562 

563 def _get_last_file_number(self) -> int: 

564 if not os.path.exists(self.output_dir): 

565 return -1 

566 

567 max_number = -1 

568 for filename in os.listdir(self.output_dir): 

569 if filename.startswith("output_") and filename.endswith(".csv"): 

570 try: 

571 number = int(filename[7:-4]) 

572 max_number = max(max_number, number) 

573 except ValueError: 

574 continue 

575 return max_number 

576 

577 def add_results(self, results: List[Dict[str, str]]) -> None: 

578 self.buffer.extend(results) 

579 while len(self.buffer) >= self.max_rows: 

580 self._write_buffer_chunk() 

581 

582 def _write_buffer_chunk(self) -> None: 

583 chunk = self.buffer[: self.max_rows] 

584 output_file = os.path.join(self.output_dir, f"output_{self.file_counter}.csv") 

585 write_csv(output_file, chunk) 

586 self.buffer = self.buffer[self.max_rows :] 

587 self.file_counter += 1 

588 

589 def flush(self) -> None: 

590 if self.buffer: 

591 output_file = os.path.join( 

592 self.output_dir, f"output_{self.file_counter}.csv" 

593 ) 

594 write_csv(output_file, self.buffer) 

595 self.buffer = [] 

596 self.file_counter += 1 

597 

598 

599def generate_csv( 

600 input_dir: str, 

601 output_dir: str, 

602 dir_split_number: int, 

603 items_per_file: int, 

604 redis_host: str = "localhost", 

605 redis_port: int = 6379, 

606 redis_db: int = 2, 

607 workers: int = 4, 

608) -> None: 

609 if not os.path.exists(output_dir): 

610 os.makedirs(output_dir) 

611 

612 checkpoint_file = os.path.join(output_dir, "processed_br_files.txt") 

613 processed_br_files = load_checkpoint(checkpoint_file) 

614 

615 redis_client = init_redis_connection(redis_host, redis_port, redis_db) 

616 load_processed_omids_to_redis(output_dir, redis_client) 

617 

618 br_dir = os.path.join(input_dir, "br") 

619 if not os.path.exists(br_dir): 

620 print(f"Error: directory not found at {br_dir}") 

621 return 

622 

623 all_files = collect_zip_files(br_dir, only_data=True) 

624 files_to_process = [f for f in all_files if f not in processed_br_files] 

625 

626 if not files_to_process: 

627 print("All files already processed") 

628 return 

629 

630 print(f"Skipping {len(processed_br_files)} already processed files") 

631 print(f"Processing {len(files_to_process)} remaining files with {workers} workers...") 

632 

633 result_buffer = ResultBuffer(output_dir) 

634 

635 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment 

636 ctx = multiprocessing.get_context('forkserver') 

637 with ctx.Pool( 

638 workers, 

639 _init_worker, 

640 (redis_host, redis_port, redis_db, input_dir, dir_split_number, items_per_file), 

641 ) as pool: 

642 with create_progress() as progress: 

643 task = progress.add_task("Processing files", total=len(files_to_process)) 

644 

645 for filepath, results in pool.imap_unordered(_process_file_worker, files_to_process): 

646 if results: 

647 result_buffer.add_results(results) 

648 mark_file_processed(checkpoint_file, filepath) 

649 progress.update(task, advance=1) 

650 

651 result_buffer.flush() 

652 print("Processing complete.") 

653 

654 

655def write_csv(filepath: str, data: List[Dict[str, str]]) -> None: 

656 with open(filepath, "w", newline="", encoding="utf-8") as f: 

657 writer = csv.DictWriter(f, fieldnames=FIELDNAMES) 

658 writer.writeheader() 

659 writer.writerows(data) 

660 

661 

662if __name__ == '__main__': 

663 parser = ArgumentParser('generate_csv.py', 

664 description='Generate CSV files from OpenCitations Meta RDF dump') 

665 parser.add_argument('-c', '--config', required=True, 

666 help='OpenCitations Meta configuration file location') 

667 parser.add_argument('-o', '--output_dir', required=True, 

668 help='Directory where CSV files will be stored') 

669 parser.add_argument('--redis-host', default='localhost', 

670 help='Redis host (default: localhost)') 

671 parser.add_argument('--redis-port', type=int, default=6379, 

672 help='Redis port (default: 6379)') 

673 parser.add_argument('--redis-db', type=int, default=2, 

674 help='Redis database number (default: 2)') 

675 parser.add_argument('--workers', type=int, default=4, 

676 help='Number of parallel workers (default: 4)') 

677 parser.add_argument('--clean', action='store_true', 

678 help='Clear checkpoint file and Redis cache before starting') 

679 args = parser.parse_args() 

680 

681 with open(args.config, encoding='utf-8') as f: 

682 settings = yaml.full_load(f) 

683 

684 rdf_dir = os.path.join(settings['output_rdf_dir'], 'rdf') 

685 dir_split_number = settings['dir_split_number'] 

686 items_per_file = settings['items_per_file'] 

687 

688 if args.clean: 

689 checkpoint_file = 'processed_br_files.txt' 

690 if os.path.exists(checkpoint_file): 

691 os.remove(checkpoint_file) 

692 print(f"Removed checkpoint file: {checkpoint_file}") 

693 redis_client = redis.Redis( 

694 host=args.redis_host, port=args.redis_port, db=args.redis_db 

695 ) 

696 deleted = redis_client.delete('processed_omids') 

697 if deleted: 

698 print("Cleared Redis processed_omids cache") 

699 

700 generate_csv( 

701 input_dir=rdf_dir, 

702 output_dir=args.output_dir, 

703 dir_split_number=dir_split_number, 

704 items_per_file=items_per_file, 

705 redis_host=args.redis_host, 

706 redis_port=args.redis_port, 

707 redis_db=args.redis_db, 

708 workers=args.workers, 

709 )