Coverage for oc_meta / run / meta / generate_csv.py: 36%

400 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 17:25 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2022-2024 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17from __future__ import annotations 

18 

19import csv 

20import json 

21import os 

22import re 

23from argparse import ArgumentParser 

24from functools import lru_cache 

25from multiprocessing import Pool 

26from typing import Dict, List, Optional, Tuple 

27from zipfile import ZipFile 

28 

29import redis 

30import yaml 

31from rich.progress import (BarColumn, MofNCompleteColumn, Progress, 

32 SpinnerColumn, TextColumn, TimeElapsedColumn, 

33 TimeRemainingColumn) 

34 

35csv.field_size_limit(2**31 - 1) 

36 

37FIELDNAMES = [ 

38 "id", 

39 "title", 

40 "author", 

41 "issue", 

42 "volume", 

43 "venue", 

44 "page", 

45 "pub_date", 

46 "type", 

47 "publisher", 

48 "editor", 

49] 

50 

51URI_TYPE_DICT = { 

52 "http://purl.org/spar/doco/Abstract": "abstract", 

53 "http://purl.org/spar/fabio/ArchivalDocument": "archival document", 

54 "http://purl.org/spar/fabio/AudioDocument": "audio document", 

55 "http://purl.org/spar/fabio/Book": "book", 

56 "http://purl.org/spar/fabio/BookChapter": "book chapter", 

57 "http://purl.org/spar/fabio/ExpressionCollection": "book section", 

58 "http://purl.org/spar/fabio/BookSeries": "book series", 

59 "http://purl.org/spar/fabio/BookSet": "book set", 

60 "http://purl.org/spar/fabio/ComputerProgram": "computer program", 

61 "http://purl.org/spar/doco/Part": "book part", 

62 "http://purl.org/spar/fabio/Expression": "", 

63 "http://purl.org/spar/fabio/DataFile": "dataset", 

64 "http://purl.org/spar/fabio/DataManagementPlan": "data management plan", 

65 "http://purl.org/spar/fabio/Thesis": "dissertation", 

66 "http://purl.org/spar/fabio/Editorial": "editorial", 

67 "http://purl.org/spar/fabio/Journal": "journal", 

68 "http://purl.org/spar/fabio/JournalArticle": "journal article", 

69 "http://purl.org/spar/fabio/JournalEditorial": "journal editorial", 

70 "http://purl.org/spar/fabio/JournalIssue": "journal issue", 

71 "http://purl.org/spar/fabio/JournalVolume": "journal volume", 

72 "http://purl.org/spar/fabio/Newspaper": "newspaper", 

73 "http://purl.org/spar/fabio/NewspaperArticle": "newspaper article", 

74 "http://purl.org/spar/fabio/NewspaperIssue": "newspaper issue", 

75 "http://purl.org/spar/fr/ReviewVersion": "peer review", 

76 "http://purl.org/spar/fabio/AcademicProceedings": "proceedings", 

77 "http://purl.org/spar/fabio/Preprint": "preprint", 

78 "http://purl.org/spar/fabio/Presentation": "presentation", 

79 "http://purl.org/spar/fabio/ProceedingsPaper": "proceedings article", 

80 "http://purl.org/spar/fabio/ReferenceBook": "reference book", 

81 "http://purl.org/spar/fabio/ReferenceEntry": "reference entry", 

82 "http://purl.org/spar/fabio/ReportDocument": "report", 

83 "http://purl.org/spar/fabio/RetractionNotice": "retraction notice", 

84 "http://purl.org/spar/fabio/Series": "series", 

85 "http://purl.org/spar/fabio/SpecificationDocument": "standard", 

86 "http://purl.org/spar/fabio/WebContent": "web content", 

87} 

88 

89_worker_redis: redis.Redis = None 

90_worker_config: Tuple[str, int, int] = None 

91 

92 

93def _init_worker( 

94 redis_host: str, redis_port: int, redis_db: int, input_dir: str, dir_split_number: int, items_per_file: int 

95) -> None: 

96 global _worker_redis, _worker_config 

97 _worker_redis = redis.Redis(host=redis_host, port=redis_port, db=redis_db, decode_responses=True) 

98 _worker_config = (input_dir, dir_split_number, items_per_file) 

99 

100 

101def _process_file_worker(filepath: str) -> Tuple[str, List[Dict[str, str]]]: 

102 input_dir, dir_split_number, items_per_file = _worker_config 

103 results = [] 

104 data = load_json_from_file(filepath) 

105 for graph in data: 

106 for entity in graph.get("@graph", []): 

107 entity_types = entity.get("@type", []) 

108 if ( 

109 "http://purl.org/spar/fabio/JournalVolume" in entity_types 

110 or "http://purl.org/spar/fabio/JournalIssue" in entity_types 

111 ): 

112 continue 

113 entity_id = entity.get("@id", "") 

114 if entity_id: 

115 omid = f"omid:br/{entity_id.split('/')[-1]}" 

116 if _worker_redis.sismember("processed_omids", omid): 

117 continue 

118 br_data = process_bibliographic_resource(entity, input_dir, dir_split_number, items_per_file) 

119 if br_data: 

120 results.append(br_data) 

121 return (filepath, results) 

122 

123 

124def init_redis_connection( 

125 host: str = "localhost", port: int = 6379, db: int = 2 

126) -> redis.Redis: 

127 client = redis.Redis(host=host, port=port, db=db, decode_responses=True) 

128 client.ping() 

129 return client 

130 

131 

132def is_omid_processed(omid: str, redis_client: redis.Redis) -> bool: 

133 return redis_client.sismember("processed_omids", omid) 

134 

135 

136def load_processed_omids_to_redis(output_dir: str, redis_client: redis.Redis) -> int: 

137 existing_count = redis_client.scard("processed_omids") 

138 if existing_count > 0: 

139 print(f"Redis already has {existing_count} OMIDs, skipping rebuild") 

140 return existing_count 

141 

142 if not os.path.exists(output_dir): 

143 return 0 

144 

145 csv.field_size_limit(2**31 - 1) 

146 

147 count = 0 

148 BATCH_SIZE = 1000 

149 csv_files = [f for f in os.listdir(output_dir) if f.endswith(".csv")] 

150 

151 with Progress( 

152 SpinnerColumn(), 

153 TextColumn("[progress.description]{task.description}"), 

154 BarColumn(), 

155 MofNCompleteColumn(), 

156 TimeElapsedColumn(), 

157 TimeRemainingColumn(), 

158 ) as progress: 

159 task = progress.add_task("Loading existing identifiers", total=len(csv_files)) 

160 

161 for filename in csv_files: 

162 filepath = os.path.join(output_dir, filename) 

163 with open(filepath, "r", encoding="utf-8") as f: 

164 reader = csv.DictReader(f) 

165 batch_pipe = redis_client.pipeline() 

166 batch_count = 0 

167 

168 for row in reader: 

169 omids = [ 

170 id_part.strip() 

171 for id_part in row["id"].split() 

172 if id_part.startswith("omid:br/") 

173 ] 

174 for omid in omids: 

175 batch_pipe.sadd("processed_omids", omid) 

176 batch_count += 1 

177 count += 1 

178 

179 if batch_count >= BATCH_SIZE: 

180 batch_pipe.execute() 

181 batch_pipe = redis_client.pipeline() 

182 batch_count = 0 

183 

184 if batch_count > 0: 

185 batch_pipe.execute() 

186 

187 progress.update(task, advance=1) 

188 

189 return count 

190 

191 

192def load_checkpoint(checkpoint_file: str) -> set: 

193 if not os.path.exists(checkpoint_file): 

194 return set() 

195 with open(checkpoint_file, "r") as f: 

196 return set(line.strip() for line in f if line.strip()) 

197 

198 

199def mark_file_processed(checkpoint_file: str, filepath: str) -> None: 

200 with open(checkpoint_file, "a") as f: 

201 f.write(filepath + "\n") 

202 

203 

204def find_file( 

205 rdf_dir: str, dir_split_number: int, items_per_file: int, uri: str 

206) -> Optional[str]: 

207 entity_regex: str = ( 

208 r"^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$" 

209 ) 

210 entity_match = re.match(entity_regex, uri) 

211 if entity_match: 

212 cur_number = int(entity_match.group(4)) 

213 cur_file_split = ( 

214 (cur_number - 1) // items_per_file 

215 ) * items_per_file + items_per_file 

216 cur_split = ( 

217 (cur_number - 1) // dir_split_number 

218 ) * dir_split_number + dir_split_number 

219 

220 short_name = entity_match.group(2) 

221 sub_folder = entity_match.group(3) or "" 

222 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split)) 

223 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + ".zip" 

224 

225 return cur_file_path if os.path.exists(cur_file_path) else None 

226 return None 

227 

228 

229@lru_cache(maxsize=2000) 

230def load_json_from_file(filepath: str) -> list: 

231 with ZipFile(filepath, "r") as zip_file: 

232 json_filename = zip_file.namelist()[0] 

233 with zip_file.open(json_filename) as json_file: 

234 json_content = json_file.read().decode("utf-8") 

235 return json.loads(json_content) 

236 

237 

238def process_identifier(id_data: dict) -> Optional[str]: 

239 try: 

240 id_schema = id_data["http://purl.org/spar/datacite/usesIdentifierScheme"][0][ 

241 "@id" 

242 ].split("/datacite/")[1] 

243 literal_value = id_data[ 

244 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue" 

245 ][0]["@value"] 

246 return f"{id_schema}:{literal_value}" 

247 except (KeyError, IndexError): 

248 return None 

249 

250 

251def process_responsible_agent( 

252 ra_data: dict, ra_uri: str, rdf_dir: str, dir_split_number: int, items_per_file: int 

253) -> Optional[str]: 

254 try: 

255 family_name = ra_data.get("http://xmlns.com/foaf/0.1/familyName", [{}])[0].get( 

256 "@value", "" 

257 ) 

258 given_name = ra_data.get("http://xmlns.com/foaf/0.1/givenName", [{}])[0].get( 

259 "@value", "" 

260 ) 

261 foaf_name = ra_data.get("http://xmlns.com/foaf/0.1/name", [{}])[0].get( 

262 "@value", "" 

263 ) 

264 

265 if family_name or given_name: 

266 if family_name and given_name: 

267 name = f"{family_name}, {given_name}" 

268 elif family_name: 

269 name = f"{family_name}," 

270 else: 

271 name = f", {given_name}" 

272 elif foaf_name: 

273 name = foaf_name 

274 else: 

275 return None 

276 

277 omid = ra_uri.split("/")[-1] 

278 identifiers = [f"omid:ra/{omid}"] 

279 

280 if "http://purl.org/spar/datacite/hasIdentifier" in ra_data: 

281 for identifier in ra_data["http://purl.org/spar/datacite/hasIdentifier"]: 

282 id_uri = identifier["@id"] 

283 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri) 

284 if id_file: 

285 id_data = load_json_from_file(id_file) 

286 for graph in id_data: 

287 for entity in graph.get("@graph", []): 

288 if entity["@id"] == id_uri: 

289 id_value = process_identifier(entity) 

290 if id_value: 

291 identifiers.append(id_value) 

292 

293 if identifiers: 

294 return f"{name} [{' '.join(identifiers)}]" 

295 return name 

296 except (KeyError, IndexError): 

297 return None 

298 

299 

300def process_venue_title( 

301 venue_data: dict, 

302 venue_uri: str, 

303 rdf_dir: str, 

304 dir_split_number: int, 

305 items_per_file: int, 

306) -> str: 

307 venue_title = venue_data.get("http://purl.org/dc/terms/title", [{}])[0].get( 

308 "@value", "" 

309 ) 

310 if not venue_title: 

311 return "" 

312 

313 omid = venue_uri.split("/")[-1] 

314 identifiers = [f"omid:br/{omid}"] 

315 

316 if "http://purl.org/spar/datacite/hasIdentifier" in venue_data: 

317 for identifier in venue_data["http://purl.org/spar/datacite/hasIdentifier"]: 

318 id_uri = identifier["@id"] 

319 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri) 

320 if id_file: 

321 id_data = load_json_from_file(id_file) 

322 for graph in id_data: 

323 for entity in graph.get("@graph", []): 

324 if entity["@id"] == id_uri: 

325 id_value = process_identifier(entity) 

326 if id_value: 

327 identifiers.append(id_value) 

328 

329 return f"{venue_title} [{' '.join(identifiers)}]" if identifiers else venue_title 

330 

331 

332def process_hierarchical_venue( 

333 entity: dict, 

334 rdf_dir: str, 

335 dir_split_number: int, 

336 items_per_file: int, 

337 visited: Optional[set] = None, 

338 depth: int = 0, 

339) -> Dict[str, str]: 

340 result = {"volume": "", "issue": "", "venue": ""} 

341 

342 if visited is None: 

343 visited = set() 

344 

345 entity_id = entity.get("@id", "") 

346 if entity_id in visited or depth > 5: 

347 print(f"Warning: Cycle detected in venue hierarchy at: {entity_id}") 

348 return result 

349 visited.add(entity_id) 

350 

351 entity_types = entity.get("@type", []) 

352 

353 if "http://purl.org/spar/fabio/JournalIssue" in entity_types: 

354 result["issue"] = entity.get( 

355 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}] 

356 )[0].get("@value", "") 

357 elif "http://purl.org/spar/fabio/JournalVolume" in entity_types: 

358 result["volume"] = entity.get( 

359 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}] 

360 )[0].get("@value", "") 

361 else: 

362 result["venue"] = process_venue_title( 

363 entity, entity["@id"], rdf_dir, dir_split_number, items_per_file 

364 ) 

365 return result 

366 

367 if "http://purl.org/vocab/frbr/core#partOf" in entity: 

368 parent_uri = entity["http://purl.org/vocab/frbr/core#partOf"][0]["@id"] 

369 parent_file = find_file(rdf_dir, dir_split_number, items_per_file, parent_uri) 

370 if parent_file: 

371 parent_data = load_json_from_file(parent_file) 

372 for graph in parent_data: 

373 for parent_entity in graph.get("@graph", []): 

374 if parent_entity["@id"] == parent_uri: 

375 parent_info = process_hierarchical_venue( 

376 parent_entity, 

377 rdf_dir, 

378 dir_split_number, 

379 items_per_file, 

380 visited, 

381 depth + 1, 

382 ) 

383 for key, value in parent_info.items(): 

384 if not result[key]: 

385 result[key] = value 

386 

387 return result 

388 

389 

390def find_first_ar_by_role( 

391 agent_roles: Dict, next_relations: Dict, role_type: str 

392) -> Optional[str]: 

393 role_ars = { 

394 ar_uri: ar_data 

395 for ar_uri, ar_data in agent_roles.items() 

396 if role_type 

397 in ar_data.get("http://purl.org/spar/pro/withRole", [{}])[0].get("@id", "") 

398 } 

399 

400 role_next_relations = { 

401 ar_uri: next_ar 

402 for ar_uri, next_ar in next_relations.items() 

403 if ar_uri in role_ars and next_ar in role_ars 

404 } 

405 

406 referenced_ars = set(role_next_relations.values()) 

407 for ar_uri in role_ars: 

408 if ar_uri not in referenced_ars: 

409 return ar_uri 

410 

411 return next(iter(role_ars)) if role_ars else None 

412 

413 

414def process_bibliographic_resource( 

415 br_data: dict, rdf_dir: str, dir_split_number: int, items_per_file: int 

416) -> Optional[Dict[str, str]]: 

417 br_types = br_data.get("@type", []) 

418 if ( 

419 "http://purl.org/spar/fabio/JournalVolume" in br_types 

420 or "http://purl.org/spar/fabio/JournalIssue" in br_types 

421 ): 

422 return None 

423 

424 output = {field: "" for field in FIELDNAMES} 

425 

426 try: 

427 entity_id = br_data.get("@id", "") 

428 identifiers = [f'omid:br/{entity_id.split("/")[-1]}'] if entity_id else [] 

429 

430 output["title"] = br_data.get("http://purl.org/dc/terms/title", [{}])[0].get( 

431 "@value", "" 

432 ) 

433 output["pub_date"] = br_data.get( 

434 "http://prismstandard.org/namespaces/basic/2.0/publicationDate", [{}] 

435 )[0].get("@value", "") 

436 

437 br_types = [ 

438 t 

439 for t in br_data.get("@type", []) 

440 if t != "http://purl.org/spar/fabio/Expression" 

441 ] 

442 output["type"] = URI_TYPE_DICT.get(br_types[0], "") if br_types else "" 

443 

444 if "http://purl.org/spar/datacite/hasIdentifier" in br_data: 

445 for identifier in br_data["http://purl.org/spar/datacite/hasIdentifier"]: 

446 id_uri = identifier["@id"] 

447 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri) 

448 if id_file: 

449 id_data = load_json_from_file(id_file) 

450 for graph in id_data: 

451 for entity in graph.get("@graph", []): 

452 if entity["@id"] == id_uri: 

453 id_value = process_identifier(entity) 

454 if id_value: 

455 identifiers.append(id_value) 

456 output["id"] = " ".join(identifiers) 

457 

458 authors = [] 

459 editors = [] 

460 publishers = [] 

461 agent_roles = {} 

462 next_relations = {} 

463 

464 if "http://purl.org/spar/pro/isDocumentContextFor" in br_data: 

465 for ar_data in br_data["http://purl.org/spar/pro/isDocumentContextFor"]: 

466 ar_uri = ar_data["@id"] 

467 ar_file = find_file(rdf_dir, dir_split_number, items_per_file, ar_uri) 

468 if ar_file: 

469 ar_data = load_json_from_file(ar_file) 

470 for graph in ar_data: 

471 for entity in graph.get("@graph", []): 

472 if entity["@id"] == ar_uri: 

473 agent_roles[ar_uri] = entity 

474 if "https://w3id.org/oc/ontology/hasNext" in entity: 

475 next_ar = entity[ 

476 "https://w3id.org/oc/ontology/hasNext" 

477 ][0]["@id"] 

478 next_relations[ar_uri] = next_ar 

479 

480 for role_type, role_list in [ 

481 ("author", authors), 

482 ("editor", editors), 

483 ("publisher", publishers), 

484 ]: 

485 first_ar = find_first_ar_by_role(agent_roles, next_relations, role_type) 

486 if not first_ar: 

487 continue 

488 

489 current_ar = first_ar 

490 processed_ars = set() 

491 max_iterations = len(agent_roles) 

492 iterations = 0 

493 

494 while current_ar and current_ar in agent_roles: 

495 if current_ar in processed_ars or iterations >= max_iterations: 

496 print( 

497 f"Warning: Detected cycle in hasNext relations or exceeded maximum iterations at AR: {current_ar}" 

498 ) 

499 break 

500 

501 processed_ars.add(current_ar) 

502 iterations += 1 

503 

504 entity = agent_roles[current_ar] 

505 role = entity.get("http://purl.org/spar/pro/withRole", [{}])[0].get( 

506 "@id", "" 

507 ) 

508 

509 if role_type in role: 

510 if "http://purl.org/spar/pro/isHeldBy" in entity: 

511 ra_uri = entity["http://purl.org/spar/pro/isHeldBy"][0][ 

512 "@id" 

513 ] 

514 ra_file = find_file( 

515 rdf_dir, dir_split_number, items_per_file, ra_uri 

516 ) 

517 if ra_file: 

518 ra_data = load_json_from_file(ra_file) 

519 for ra_graph in ra_data: 

520 for ra_entity in ra_graph.get("@graph", []): 

521 if ra_entity["@id"] == ra_uri: 

522 agent_name = process_responsible_agent( 

523 ra_entity, 

524 ra_uri, 

525 rdf_dir, 

526 dir_split_number, 

527 items_per_file, 

528 ) 

529 if agent_name: 

530 role_list.append(agent_name) 

531 

532 current_ar = next_relations.get(current_ar) 

533 

534 output["author"] = "; ".join(authors) 

535 output["editor"] = "; ".join(editors) 

536 output["publisher"] = "; ".join(publishers) 

537 

538 if "http://purl.org/vocab/frbr/core#partOf" in br_data: 

539 venue_uri = br_data["http://purl.org/vocab/frbr/core#partOf"][0]["@id"] 

540 venue_file = find_file(rdf_dir, dir_split_number, items_per_file, venue_uri) 

541 if venue_file: 

542 venue_data = load_json_from_file(venue_file) 

543 for graph in venue_data: 

544 for entity in graph.get("@graph", []): 

545 if entity["@id"] == venue_uri: 

546 venue_info = process_hierarchical_venue( 

547 entity, rdf_dir, dir_split_number, items_per_file 

548 ) 

549 output.update(venue_info) 

550 

551 if "http://purl.org/vocab/frbr/core#embodiment" in br_data: 

552 page_uri = br_data["http://purl.org/vocab/frbr/core#embodiment"][0]["@id"] 

553 page_file = find_file(rdf_dir, dir_split_number, items_per_file, page_uri) 

554 if page_file: 

555 page_data = load_json_from_file(page_file) 

556 for graph in page_data: 

557 for entity in graph.get("@graph", []): 

558 if entity["@id"] == page_uri: 

559 start_page = entity.get( 

560 "http://prismstandard.org/namespaces/basic/2.0/startingPage", 

561 [{}], 

562 )[0].get("@value", "") 

563 end_page = entity.get( 

564 "http://prismstandard.org/namespaces/basic/2.0/endingPage", 

565 [{}], 

566 )[0].get("@value", "") 

567 if start_page or end_page: 

568 output["page"] = f"{start_page}-{end_page}" 

569 

570 except Exception as e: 

571 print(f"Error processing bibliographic resource: {type(e).__name__}: {e}") 

572 

573 return output 

574 

575 

576class ResultBuffer: 

577 def __init__(self, output_dir: str, max_rows: int = 3000): 

578 self.buffer = [] 

579 self.output_dir = output_dir 

580 self.max_rows = max_rows 

581 self.file_counter = self._get_last_file_number() + 1 

582 

583 def _get_last_file_number(self) -> int: 

584 if not os.path.exists(self.output_dir): 

585 return -1 

586 

587 max_number = -1 

588 for filename in os.listdir(self.output_dir): 

589 if filename.startswith("output_") and filename.endswith(".csv"): 

590 try: 

591 number = int(filename[7:-4]) 

592 max_number = max(max_number, number) 

593 except ValueError: 

594 continue 

595 return max_number 

596 

597 def add_results(self, results: List[Dict[str, str]]) -> None: 

598 self.buffer.extend(results) 

599 while len(self.buffer) >= self.max_rows: 

600 self._write_buffer_chunk() 

601 

602 def _write_buffer_chunk(self) -> None: 

603 chunk = self.buffer[: self.max_rows] 

604 output_file = os.path.join(self.output_dir, f"output_{self.file_counter}.csv") 

605 write_csv(output_file, chunk) 

606 self.buffer = self.buffer[self.max_rows :] 

607 self.file_counter += 1 

608 

609 def flush(self) -> None: 

610 if self.buffer: 

611 output_file = os.path.join( 

612 self.output_dir, f"output_{self.file_counter}.csv" 

613 ) 

614 write_csv(output_file, self.buffer) 

615 self.buffer = [] 

616 self.file_counter += 1 

617 

618 

619def generate_csv( 

620 input_dir: str, 

621 output_dir: str, 

622 dir_split_number: int, 

623 items_per_file: int, 

624 redis_host: str = "localhost", 

625 redis_port: int = 6379, 

626 redis_db: int = 2, 

627 workers: int = 4, 

628) -> None: 

629 if not os.path.exists(output_dir): 

630 os.makedirs(output_dir) 

631 

632 checkpoint_file = os.path.join(output_dir, "processed_br_files.txt") 

633 processed_br_files = load_checkpoint(checkpoint_file) 

634 

635 redis_client = init_redis_connection(redis_host, redis_port, redis_db) 

636 load_processed_omids_to_redis(output_dir, redis_client) 

637 

638 br_dir = os.path.join(input_dir, "br") 

639 if not os.path.exists(br_dir): 

640 print(f"Error: directory not found at {br_dir}") 

641 return 

642 

643 all_files = [] 

644 for root, _, files in os.walk(br_dir): 

645 if "prov" in root: 

646 continue 

647 all_files.extend(os.path.join(root, f) for f in files if f.endswith(".zip")) 

648 

649 all_files = sorted(all_files) 

650 files_to_process = [f for f in all_files if f not in processed_br_files] 

651 

652 if not files_to_process: 

653 print("All files already processed") 

654 return 

655 

656 print(f"Skipping {len(processed_br_files)} already processed files") 

657 print(f"Processing {len(files_to_process)} remaining files with {workers} workers...") 

658 

659 result_buffer = ResultBuffer(output_dir) 

660 

661 with Pool( 

662 workers, 

663 _init_worker, 

664 (redis_host, redis_port, redis_db, input_dir, dir_split_number, items_per_file), 

665 ) as pool: 

666 with Progress( 

667 SpinnerColumn(), 

668 TextColumn("[progress.description]{task.description}"), 

669 BarColumn(), 

670 MofNCompleteColumn(), 

671 TimeElapsedColumn(), 

672 TimeRemainingColumn(), 

673 ) as progress: 

674 task = progress.add_task("Processing files", total=len(files_to_process)) 

675 

676 for filepath, results in pool.imap_unordered(_process_file_worker, files_to_process): 

677 if results: 

678 result_buffer.add_results(results) 

679 mark_file_processed(checkpoint_file, filepath) 

680 progress.update(task, advance=1) 

681 

682 result_buffer.flush() 

683 print("Processing complete.") 

684 

685 

686def write_csv(filepath: str, data: List[Dict[str, str]]) -> None: 

687 with open(filepath, "w", newline="", encoding="utf-8") as f: 

688 writer = csv.DictWriter(f, fieldnames=FIELDNAMES) 

689 writer.writeheader() 

690 writer.writerows(data) 

691 

692 

693if __name__ == '__main__': 

694 parser = ArgumentParser('generate_csv.py', 

695 description='Generate CSV files from OpenCitations Meta RDF dump') 

696 parser.add_argument('-c', '--config', required=True, 

697 help='OpenCitations Meta configuration file location') 

698 parser.add_argument('-o', '--output_dir', required=True, 

699 help='Directory where CSV files will be stored') 

700 parser.add_argument('--redis-host', default='localhost', 

701 help='Redis host (default: localhost)') 

702 parser.add_argument('--redis-port', type=int, default=6379, 

703 help='Redis port (default: 6379)') 

704 parser.add_argument('--redis-db', type=int, default=2, 

705 help='Redis database number (default: 2)') 

706 parser.add_argument('--workers', type=int, default=4, 

707 help='Number of parallel workers (default: 4)') 

708 parser.add_argument('--clean', action='store_true', 

709 help='Clear checkpoint file and Redis cache before starting') 

710 args = parser.parse_args() 

711 

712 with open(args.config, encoding='utf-8') as f: 

713 settings = yaml.full_load(f) 

714 

715 rdf_dir = os.path.join(settings['output_rdf_dir'], 'rdf') 

716 dir_split_number = settings['dir_split_number'] 

717 items_per_file = settings['items_per_file'] 

718 

719 if args.clean: 

720 checkpoint_file = 'processed_br_files.txt' 

721 if os.path.exists(checkpoint_file): 

722 os.remove(checkpoint_file) 

723 print(f"Removed checkpoint file: {checkpoint_file}") 

724 redis_client = redis.Redis( 

725 host=args.redis_host, port=args.redis_port, db=args.redis_db 

726 ) 

727 deleted = redis_client.delete('processed_omids') 

728 if deleted: 

729 print("Cleared Redis processed_omids cache") 

730 

731 generate_csv( 

732 input_dir=rdf_dir, 

733 output_dir=args.output_dir, 

734 dir_split_number=dir_split_number, 

735 items_per_file=items_per_file, 

736 redis_host=args.redis_host, 

737 redis_port=args.redis_port, 

738 redis_db=args.redis_db, 

739 workers=args.workers, 

740 )