Coverage for oc_meta/plugins/csv_generator_lite/csv_generator_lite.py: 35%

401 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-07-14 14:06 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2024 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17from __future__ import annotations 

18 

19import concurrent.futures 

20import csv 

21import json 

22import os 

23import re 

24from typing import Dict, List, Optional 

25from zipfile import ZipFile 

26 

27import redis 

28from pebble import ProcessPool 

29from tqdm import tqdm 

30 

31# Increase CSV field size limit to handle large fields 

32csv.field_size_limit(2**31 - 1) # Set to max value for 32-bit systems 

33 

34# Constants 

35FIELDNAMES = [ 

36 "id", 

37 "title", 

38 "author", 

39 "issue", 

40 "volume", 

41 "venue", 

42 "page", 

43 "pub_date", 

44 "type", 

45 "publisher", 

46 "editor", 

47] 

48 

49URI_TYPE_DICT = { 

50 "http://purl.org/spar/doco/Abstract": "abstract", 

51 "http://purl.org/spar/fabio/ArchivalDocument": "archival document", 

52 "http://purl.org/spar/fabio/AudioDocument": "audio document", 

53 "http://purl.org/spar/fabio/Book": "book", 

54 "http://purl.org/spar/fabio/BookChapter": "book chapter", 

55 "http://purl.org/spar/fabio/ExpressionCollection": "book section", 

56 "http://purl.org/spar/fabio/BookSeries": "book series", 

57 "http://purl.org/spar/fabio/BookSet": "book set", 

58 "http://purl.org/spar/fabio/ComputerProgram": "computer program", 

59 "http://purl.org/spar/doco/Part": "book part", 

60 "http://purl.org/spar/fabio/Expression": "", 

61 "http://purl.org/spar/fabio/DataFile": "dataset", 

62 "http://purl.org/spar/fabio/DataManagementPlan": "data management plan", 

63 "http://purl.org/spar/fabio/Thesis": "dissertation", 

64 "http://purl.org/spar/fabio/Editorial": "editorial", 

65 "http://purl.org/spar/fabio/Journal": "journal", 

66 "http://purl.org/spar/fabio/JournalArticle": "journal article", 

67 "http://purl.org/spar/fabio/JournalEditorial": "journal editorial", 

68 "http://purl.org/spar/fabio/JournalIssue": "journal issue", 

69 "http://purl.org/spar/fabio/JournalVolume": "journal volume", 

70 "http://purl.org/spar/fabio/Newspaper": "newspaper", 

71 "http://purl.org/spar/fabio/NewspaperArticle": "newspaper article", 

72 "http://purl.org/spar/fabio/NewspaperIssue": "newspaper issue", 

73 "http://purl.org/spar/fr/ReviewVersion": "peer review", 

74 "http://purl.org/spar/fabio/AcademicProceedings": "proceedings", 

75 "http://purl.org/spar/fabio/Preprint": "preprint", 

76 "http://purl.org/spar/fabio/Presentation": "presentation", 

77 "http://purl.org/spar/fabio/ProceedingsPaper": "proceedings article", 

78 "http://purl.org/spar/fabio/ReferenceBook": "reference book", 

79 "http://purl.org/spar/fabio/ReferenceEntry": "reference entry", 

80 "http://purl.org/spar/fabio/ReportDocument": "report", 

81 "http://purl.org/spar/fabio/RetractionNotice": "retraction notice", 

82 "http://purl.org/spar/fabio/Series": "series", 

83 "http://purl.org/spar/fabio/SpecificationDocument": "standard", 

84 "http://purl.org/spar/fabio/WebContent": "web content", 

85} 

86 

87# Global cache for JSON files 

88_json_cache = {} 

89 

90 

91def init_redis_connection( 

92 host: str = "localhost", port: int = 6379, db: int = 2 

93) -> redis.Redis: 

94 """Initialize Redis connection with given parameters""" 

95 client = redis.Redis(host=host, port=port, db=db, decode_responses=True) 

96 # Test the connection by sending a PING command 

97 client.ping() # This will raise redis.ConnectionError if connection fails 

98 return client 

99 

100 

101def increase_csv_field_limit() -> int: 

102 """Incrementally increase the CSV field size limit until it works. 

103 Returns the successful limit value.""" 

104 current_limit = csv.field_size_limit() 

105 while True: 

106 try: 

107 csv.field_size_limit(current_limit) 

108 return current_limit 

109 except OverflowError: 

110 # If we get an overflow, try a smaller value 

111 if current_limit >= sys.maxsize: 

112 current_limit = int(sys.maxsize / 2) 

113 else: 

114 break 

115 except Exception: 

116 break 

117 

118 # Try next value 

119 current_limit *= 2 

120 

121 # If we get here, we need to try increasing from the default 

122 current_limit = csv.field_size_limit() 

123 while True: 

124 try: 

125 next_limit = current_limit * 2 

126 csv.field_size_limit(next_limit) 

127 current_limit = next_limit 

128 return current_limit 

129 except Exception as e: 

130 # If we can't increase further, return the last working value 

131 csv.field_size_limit(current_limit) 

132 return current_limit 

133 

134 

135def load_processed_omids_to_redis(output_dir: str, redis_client: redis.Redis) -> int: 

136 """Load all previously processed OMID from existing CSV files into Redis. 

137 This is done only once at startup to create the initial cache.""" 

138 if not os.path.exists(output_dir): 

139 return 0 

140 

141 # Clear any existing data in Redis 

142 redis_client.delete("processed_omids") 

143 

144 count = 0 

145 BATCH_SIZE = 1000 # Process commands in batches of 1000 

146 

147 # Get list of CSV files first 

148 csv_files = [f for f in os.listdir(output_dir) if f.endswith(".csv")] 

149 print(f"Loading {len(csv_files)} CSV files into Redis cache...") 

150 

151 # Read all CSV files and collect OMIDs with progress bar 

152 for filename in tqdm(csv_files, desc="Loading existing identifiers"): 

153 filepath = os.path.join(output_dir, filename) 

154 with open(filepath, "r", encoding="utf-8") as f: 

155 reader = csv.DictReader(f) 

156 batch_pipe = redis_client.pipeline() 

157 batch_count = 0 

158 

159 for row in reader: 

160 # Extract all OMIDs from the id field 

161 omids = [ 

162 id_part.strip() 

163 for id_part in row["id"].split() 

164 if id_part.startswith("omid:br/") 

165 ] 

166 # Add to Redis pipeline 

167 for omid in omids: 

168 batch_pipe.sadd("processed_omids", omid) 

169 batch_count += 1 

170 count += 1 

171 

172 # Execute batch if we've reached the batch size 

173 if batch_count >= BATCH_SIZE: 

174 batch_pipe.execute() 

175 batch_pipe = redis_client.pipeline() 

176 batch_count = 0 

177 

178 # Execute any remaining commands in the pipeline 

179 if batch_count > 0: 

180 batch_pipe.execute() 

181 

182 print(f"Loaded {count} identifiers into Redis cache") 

183 return count 

184 

185 

186def is_omid_processed(omid: str, redis_client: redis.Redis) -> bool: 

187 """Check if an OMID has been processed by looking it up in Redis""" 

188 return redis_client.sismember("processed_omids", omid) 

189 

190 

191def find_file( 

192 rdf_dir: str, dir_split_number: int, items_per_file: int, uri: str 

193) -> Optional[str]: 

194 """Find the file path for a given URI based on the directory structure""" 

195 entity_regex: str = ( 

196 r"^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$" 

197 ) 

198 entity_match = re.match(entity_regex, uri) 

199 if entity_match: 

200 cur_number = int(entity_match.group(4)) 

201 cur_file_split = ( 

202 (cur_number - 1) // items_per_file 

203 ) * items_per_file + items_per_file 

204 cur_split = ( 

205 (cur_number - 1) // dir_split_number 

206 ) * dir_split_number + dir_split_number 

207 

208 short_name = entity_match.group(2) 

209 sub_folder = entity_match.group(3) or "" 

210 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split)) 

211 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + ".zip" 

212 

213 return cur_file_path if os.path.exists(cur_file_path) else None 

214 return None 

215 

216 

217def load_json_from_file(filepath: str) -> dict: 

218 """Load JSON data from a ZIP file, using cache if available""" 

219 if filepath in _json_cache: 

220 return _json_cache[filepath] 

221 

222 try: 

223 with ZipFile(filepath, "r") as zip_file: 

224 json_filename = zip_file.namelist()[0] 

225 with zip_file.open(json_filename) as json_file: 

226 json_content = json_file.read().decode("utf-8") 

227 json_data = json.loads(json_content) 

228 _json_cache[filepath] = json_data 

229 return json_data 

230 except Exception as e: 

231 print(f"Error loading file {filepath}: {e}") 

232 return {} 

233 

234 

235def process_identifier(id_data: dict) -> Optional[str]: 

236 """Process identifier data to extract schema and value""" 

237 try: 

238 id_schema = id_data["http://purl.org/spar/datacite/usesIdentifierScheme"][0][ 

239 "@id" 

240 ].split("/datacite/")[1] 

241 literal_value = id_data[ 

242 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue" 

243 ][0]["@value"] 

244 return f"{id_schema}:{literal_value}" 

245 except (KeyError, IndexError): 

246 return None 

247 

248 

249def process_responsible_agent( 

250 ra_data: dict, ra_uri: str, rdf_dir: str, dir_split_number: int, items_per_file: int 

251) -> Optional[str]: 

252 """Process responsible agent data to extract name and identifiers""" 

253 try: 

254 # Process name 

255 family_name = ra_data.get("http://xmlns.com/foaf/0.1/familyName", [{}])[0].get( 

256 "@value", "" 

257 ) 

258 given_name = ra_data.get("http://xmlns.com/foaf/0.1/givenName", [{}])[0].get( 

259 "@value", "" 

260 ) 

261 foaf_name = ra_data.get("http://xmlns.com/foaf/0.1/name", [{}])[0].get( 

262 "@value", "" 

263 ) 

264 

265 # Determine name format 

266 if family_name or given_name: 

267 if family_name and given_name: 

268 name = f"{family_name}, {given_name}" 

269 elif family_name: 

270 name = f"{family_name}," 

271 else: 

272 name = f", {given_name}" 

273 elif foaf_name: 

274 name = foaf_name 

275 else: 

276 return None 

277 

278 # Get OMID 

279 omid = ra_uri.split("/")[-1] 

280 identifiers = [f"omid:ra/{omid}"] 

281 

282 # Process other identifiers 

283 if "http://purl.org/spar/datacite/hasIdentifier" in ra_data: 

284 for identifier in ra_data["http://purl.org/spar/datacite/hasIdentifier"]: 

285 id_uri = identifier["@id"] 

286 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri) 

287 if id_file: 

288 id_data = load_json_from_file(id_file) 

289 for graph in id_data: 

290 for entity in graph.get("@graph", []): 

291 if entity["@id"] == id_uri: 

292 id_value = process_identifier(entity) 

293 if id_value: 

294 identifiers.append(id_value) 

295 

296 if identifiers: 

297 return f"{name} [{' '.join(identifiers)}]" 

298 return name 

299 except (KeyError, IndexError): 

300 return None 

301 

302 

303def process_venue_title( 

304 venue_data: dict, 

305 venue_uri: str, 

306 rdf_dir: str, 

307 dir_split_number: int, 

308 items_per_file: int, 

309) -> str: 

310 """Process venue title and its identifiers""" 

311 venue_title = venue_data.get("http://purl.org/dc/terms/title", [{}])[0].get( 

312 "@value", "" 

313 ) 

314 if not venue_title: 

315 return "" 

316 

317 # Get OMID 

318 omid = venue_uri.split("/")[-1] 

319 identifiers = [f"omid:br/{omid}"] 

320 

321 # Process other identifiers 

322 if "http://purl.org/spar/datacite/hasIdentifier" in venue_data: 

323 for identifier in venue_data["http://purl.org/spar/datacite/hasIdentifier"]: 

324 id_uri = identifier["@id"] 

325 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri) 

326 if id_file: 

327 id_data = load_json_from_file(id_file) 

328 for graph in id_data: 

329 for entity in graph.get("@graph", []): 

330 if entity["@id"] == id_uri: 

331 id_value = process_identifier(entity) 

332 if id_value: 

333 identifiers.append(id_value) 

334 

335 return f"{venue_title} [{' '.join(identifiers)}]" if identifiers else venue_title 

336 

337 

338def process_hierarchical_venue( 

339 entity: dict, rdf_dir: str, dir_split_number: int, items_per_file: int 

340) -> Dict[str, str]: 

341 """Process hierarchical venue structure recursively""" 

342 result = {"volume": "", "issue": "", "venue": ""} 

343 entity_types = entity.get("@type", []) 

344 

345 if "http://purl.org/spar/fabio/JournalIssue" in entity_types: 

346 result["issue"] = entity.get( 

347 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}] 

348 )[0].get("@value", "") 

349 elif "http://purl.org/spar/fabio/JournalVolume" in entity_types: 

350 result["volume"] = entity.get( 

351 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}] 

352 )[0].get("@value", "") 

353 else: 

354 # It's a main venue (journal, book series, etc.) 

355 result["venue"] = process_venue_title( 

356 entity, entity["@id"], rdf_dir, dir_split_number, items_per_file 

357 ) 

358 return result 

359 

360 # Process parent if exists 

361 if "http://purl.org/vocab/frbr/core#partOf" in entity: 

362 parent_uri = entity["http://purl.org/vocab/frbr/core#partOf"][0]["@id"] 

363 parent_file = find_file(rdf_dir, dir_split_number, items_per_file, parent_uri) 

364 if parent_file: 

365 parent_data = load_json_from_file(parent_file) 

366 for graph in parent_data: 

367 for parent_entity in graph.get("@graph", []): 

368 if parent_entity["@id"] == parent_uri: 

369 parent_info = process_hierarchical_venue( 

370 parent_entity, rdf_dir, dir_split_number, items_per_file 

371 ) 

372 # Merge parent info, keeping our current values 

373 for key, value in parent_info.items(): 

374 if not result[key]: # Only update if we don't have a value 

375 result[key] = value 

376 

377 return result 

378 

379 

380def find_first_ar_by_role( 

381 agent_roles: Dict, next_relations: Dict, role_type: str 

382) -> Optional[str]: 

383 """Find the first AR for a specific role type that isn't referenced by any other AR of the same role""" 

384 # Get all ARs of this role type 

385 role_ars = { 

386 ar_uri: ar_data 

387 for ar_uri, ar_data in agent_roles.items() 

388 if role_type 

389 in ar_data.get("http://purl.org/spar/pro/withRole", [{}])[0].get("@id", "") 

390 } 

391 

392 # Get all "next" relations between ARs of this role type 

393 role_next_relations = { 

394 ar_uri: next_ar 

395 for ar_uri, next_ar in next_relations.items() 

396 if ar_uri in role_ars and next_ar in role_ars 

397 } 

398 

399 # Find the AR that isn't referenced as next by any other AR of this role 

400 referenced_ars = set(role_next_relations.values()) 

401 for ar_uri in role_ars: 

402 if ar_uri not in referenced_ars: 

403 return ar_uri 

404 

405 # If no first AR found, take the first one from the role ARs 

406 return next(iter(role_ars)) if role_ars else None 

407 

408 

409def process_bibliographic_resource( 

410 br_data: dict, rdf_dir: str, dir_split_number: int, items_per_file: int 

411) -> Optional[Dict[str, str]]: 

412 """Process bibliographic resource data and its related entities""" 

413 # Skip if the entity is a JournalVolume or JournalIssue 

414 br_types = br_data.get("@type", []) 

415 if ( 

416 "http://purl.org/spar/fabio/JournalVolume" in br_types 

417 or "http://purl.org/spar/fabio/JournalIssue" in br_types 

418 ): 

419 return None 

420 

421 output = {field: "" for field in FIELDNAMES} 

422 

423 try: 

424 # Extract OMID and basic BR information 

425 entity_id = br_data.get("@id", "") 

426 identifiers = [f'omid:br/{entity_id.split("/")[-1]}'] if entity_id else [] 

427 

428 output["title"] = br_data.get("http://purl.org/dc/terms/title", [{}])[0].get( 

429 "@value", "" 

430 ) 

431 output["pub_date"] = br_data.get( 

432 "http://prismstandard.org/namespaces/basic/2.0/publicationDate", [{}] 

433 )[0].get("@value", "") 

434 

435 # Extract type 

436 br_types = [ 

437 t 

438 for t in br_data.get("@type", []) 

439 if t != "http://purl.org/spar/fabio/Expression" 

440 ] 

441 output["type"] = URI_TYPE_DICT.get(br_types[0], "") if br_types else "" 

442 

443 # Process identifiers 

444 if "http://purl.org/spar/datacite/hasIdentifier" in br_data: 

445 for identifier in br_data["http://purl.org/spar/datacite/hasIdentifier"]: 

446 id_uri = identifier["@id"] 

447 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri) 

448 if id_file: 

449 id_data = load_json_from_file(id_file) 

450 for graph in id_data: 

451 for entity in graph.get("@graph", []): 

452 if entity["@id"] == id_uri: 

453 id_value = process_identifier(entity) 

454 if id_value: 

455 identifiers.append(id_value) 

456 output["id"] = " ".join(identifiers) 

457 

458 # Process authors, editors and publishers through agent roles 

459 authors = [] 

460 editors = [] 

461 publishers = [] 

462 

463 # Create a dictionary to store agent roles and their next relations 

464 agent_roles = {} 

465 next_relations = {} 

466 

467 if "http://purl.org/spar/pro/isDocumentContextFor" in br_data: 

468 # First pass: collect all agent roles and their next relations 

469 for ar_data in br_data["http://purl.org/spar/pro/isDocumentContextFor"]: 

470 ar_uri = ar_data["@id"] 

471 ar_file = find_file(rdf_dir, dir_split_number, items_per_file, ar_uri) 

472 if ar_file: 

473 ar_data = load_json_from_file(ar_file) 

474 for graph in ar_data: 

475 for entity in graph.get("@graph", []): 

476 if entity["@id"] == ar_uri: 

477 # Store the agent role data 

478 agent_roles[ar_uri] = entity 

479 # Store the next relation if it exists 

480 if "https://w3id.org/oc/ontology/hasNext" in entity: 

481 next_ar = entity[ 

482 "https://w3id.org/oc/ontology/hasNext" 

483 ][0]["@id"] 

484 next_relations[ar_uri] = next_ar 

485 

486 # Process each role type separately 

487 for role_type, role_list in [ 

488 ("author", authors), 

489 ("editor", editors), 

490 ("publisher", publishers), 

491 ]: 

492 first_ar = find_first_ar_by_role(agent_roles, next_relations, role_type) 

493 if not first_ar: 

494 continue 

495 

496 # Process agent roles in order for this role type 

497 current_ar = first_ar 

498 processed_ars = set() 

499 max_iterations = len(agent_roles) 

500 iterations = 0 

501 

502 while current_ar and current_ar in agent_roles: 

503 if current_ar in processed_ars or iterations >= max_iterations: 

504 print( 

505 f"Warning: Detected cycle in hasNext relations or exceeded maximum iterations at AR: {current_ar}" 

506 ) 

507 break 

508 

509 processed_ars.add(current_ar) 

510 iterations += 1 

511 

512 # Process the current AR 

513 entity = agent_roles[current_ar] 

514 role = entity.get("http://purl.org/spar/pro/withRole", [{}])[0].get( 

515 "@id", "" 

516 ) 

517 

518 # Only process if it matches our current role type 

519 if role_type in role: 

520 if "http://purl.org/spar/pro/isHeldBy" in entity: 

521 ra_uri = entity["http://purl.org/spar/pro/isHeldBy"][0][ 

522 "@id" 

523 ] 

524 ra_file = find_file( 

525 rdf_dir, dir_split_number, items_per_file, ra_uri 

526 ) 

527 if ra_file: 

528 ra_data = load_json_from_file(ra_file) 

529 for ra_graph in ra_data: 

530 for ra_entity in ra_graph.get("@graph", []): 

531 if ra_entity["@id"] == ra_uri: 

532 agent_name = process_responsible_agent( 

533 ra_entity, 

534 ra_uri, 

535 rdf_dir, 

536 dir_split_number, 

537 items_per_file, 

538 ) 

539 if agent_name: 

540 role_list.append(agent_name) 

541 

542 # Move to next agent role 

543 current_ar = next_relations.get(current_ar) 

544 

545 output["author"] = "; ".join(authors) 

546 output["editor"] = "; ".join(editors) 

547 output["publisher"] = "; ".join(publishers) 

548 

549 # Process venue information 

550 if "http://purl.org/vocab/frbr/core#partOf" in br_data: 

551 venue_uri = br_data["http://purl.org/vocab/frbr/core#partOf"][0]["@id"] 

552 venue_file = find_file(rdf_dir, dir_split_number, items_per_file, venue_uri) 

553 if venue_file: 

554 venue_data = load_json_from_file(venue_file) 

555 for graph in venue_data: 

556 for entity in graph.get("@graph", []): 

557 if entity["@id"] == venue_uri: 

558 venue_info = process_hierarchical_venue( 

559 entity, rdf_dir, dir_split_number, items_per_file 

560 ) 

561 output.update(venue_info) 

562 

563 # Process page information 

564 if "http://purl.org/vocab/frbr/core#embodiment" in br_data: 

565 page_uri = br_data["http://purl.org/vocab/frbr/core#embodiment"][0]["@id"] 

566 page_file = find_file(rdf_dir, dir_split_number, items_per_file, page_uri) 

567 if page_file: 

568 page_data = load_json_from_file(page_file) 

569 for graph in page_data: 

570 for entity in graph.get("@graph", []): 

571 if entity["@id"] == page_uri: 

572 start_page = entity.get( 

573 "http://prismstandard.org/namespaces/basic/2.0/startingPage", 

574 [{}], 

575 )[0].get("@value", "") 

576 end_page = entity.get( 

577 "http://prismstandard.org/namespaces/basic/2.0/endingPage", 

578 [{}], 

579 )[0].get("@value", "") 

580 if start_page or end_page: 

581 output["page"] = f"{start_page}-{end_page}" 

582 

583 except (KeyError, IndexError) as e: 

584 print(f"Error processing bibliographic resource: {e}") 

585 

586 return output 

587 

588 

589def process_single_file(args): 

590 """Process a single file and return the results""" 

591 filepath, input_dir, dir_split_number, items_per_file, redis_params = args 

592 results = [] 

593 

594 # Initialize Redis client once 

595 redis_client = redis.Redis( 

596 host=redis_params["host"], 

597 port=redis_params["port"], 

598 db=redis_params["db"], 

599 decode_responses=True, 

600 ) 

601 

602 data = load_json_from_file(filepath) 

603 for graph in data: 

604 for entity in graph.get("@graph", []): 

605 # Skip if this is a JournalVolume or JournalIssue 

606 entity_types = entity.get("@type", []) 

607 if ( 

608 "http://purl.org/spar/fabio/JournalVolume" in entity_types 

609 or "http://purl.org/spar/fabio/JournalIssue" in entity_types 

610 ): 

611 continue 

612 

613 # Extract OMID from entity ID 

614 entity_id = entity.get("@id", "") 

615 if entity_id: 

616 omid = f"omid:br/{entity_id.split('/')[-1]}" 

617 # Skip if already processed 

618 if is_omid_processed(omid, redis_client): 

619 continue 

620 

621 br_data = process_bibliographic_resource( 

622 entity, input_dir, dir_split_number, items_per_file 

623 ) 

624 if br_data: 

625 results.append(br_data) 

626 

627 return results 

628 

629 

630class ResultBuffer: 

631 """Buffer to collect results and write them when reaching the row limit""" 

632 

633 def __init__(self, output_dir: str, max_rows: int = 3000): 

634 self.buffer = [] 

635 self.output_dir = output_dir 

636 self.max_rows = max_rows 

637 self.file_counter = self._get_last_file_number() + 1 

638 self.pbar = None 

639 

640 def _get_last_file_number(self) -> int: 

641 """Find the highest file number from existing output files""" 

642 if not os.path.exists(self.output_dir): 

643 return -1 

644 

645 max_number = -1 

646 for filename in os.listdir(self.output_dir): 

647 if filename.startswith("output_") and filename.endswith(".csv"): 

648 try: 

649 number = int(filename[7:-4]) # Extract number from 'output_X.csv' 

650 max_number = max(max_number, number) 

651 except ValueError: 

652 continue 

653 return max_number 

654 

655 def set_progress_bar(self, total: int) -> None: 

656 """Initialize progress bar with total number of files""" 

657 self.pbar = tqdm(total=total, desc="Processing files") 

658 

659 def update_progress(self) -> None: 

660 """Update progress bar""" 

661 if self.pbar: 

662 self.pbar.update(1) 

663 

664 def close_progress_bar(self) -> None: 

665 """Close progress bar""" 

666 if self.pbar: 

667 self.pbar.close() 

668 

669 def add_results(self, results: List[Dict[str, str]]) -> None: 

670 """Add results to buffer and write to file if max_rows is reached""" 

671 self.buffer.extend(results) 

672 while len(self.buffer) >= self.max_rows: 

673 self._write_buffer_chunk() 

674 

675 def _write_buffer_chunk(self) -> None: 

676 """Write max_rows records to a new file""" 

677 chunk = self.buffer[: self.max_rows] 

678 output_file = os.path.join(self.output_dir, f"output_{self.file_counter}.csv") 

679 write_csv(output_file, chunk) 

680 self.buffer = self.buffer[self.max_rows :] 

681 self.file_counter += 1 

682 

683 def flush(self) -> None: 

684 """Write any remaining results in buffer""" 

685 if self.buffer: 

686 output_file = os.path.join( 

687 self.output_dir, f"output_{self.file_counter}.csv" 

688 ) 

689 write_csv(output_file, self.buffer) 

690 self.buffer = [] 

691 self.file_counter += 1 

692 

693 

694def task_done(future: concurrent.futures.Future, result_buffer: ResultBuffer): 

695 """Callback function for completed tasks""" 

696 try: 

697 results = future.result() 

698 if results: 

699 result_buffer.add_results(results) 

700 result_buffer.update_progress() # Update progress after task completion 

701 except Exception as e: 

702 print(f"Task failed: {e}") 

703 result_buffer.update_progress() # Update progress even if task fails 

704 

705 

706def generate_csv( 

707 input_dir: str, 

708 output_dir: str, 

709 dir_split_number: int, 

710 items_per_file: int, 

711 zip_output_rdf: bool, 

712 redis_host: str = "localhost", 

713 redis_port: int = 6379, 

714 redis_db: int = 2, 

715) -> None: 

716 """Generate CSV files from RDF data using Pebble for process management""" 

717 if not os.path.exists(output_dir): 

718 os.makedirs(output_dir) 

719 

720 # Initialize Redis connection and load initial cache 

721 redis_client = init_redis_connection(redis_host, redis_port, redis_db) 

722 processed_count = load_processed_omids_to_redis(output_dir, redis_client) 

723 

724 # Process only files in the 'br' directory 

725 br_dir = os.path.join(input_dir, "br") 

726 if not os.path.exists(br_dir): 

727 print(f"Error: bibliographic resources directory not found at {br_dir}") 

728 return 

729 

730 # Collect all ZIP files 

731 all_files = [] 

732 for root, _, files in os.walk(br_dir): 

733 if "prov" in root: 

734 continue 

735 all_files.extend(os.path.join(root, f) for f in files if f.endswith(".zip")) 

736 

737 if not all_files: 

738 print("No files found to process") 

739 return 

740 

741 print(f"Processing {len(all_files)} files...") 

742 

743 # Redis connection parameters to pass to worker processes 

744 redis_params = {"host": redis_host, "port": redis_port, "db": redis_db} 

745 

746 # Create result buffer and initialize progress bar 

747 result_buffer = ResultBuffer(output_dir) 

748 result_buffer.set_progress_bar(len(all_files)) 

749 

750 # Process files one at a time using Pebble 

751 with ProcessPool(max_workers=os.cpu_count(), max_tasks=1) as executor: 

752 futures: List[concurrent.futures.Future] = [] 

753 for filepath in all_files: 

754 future = executor.schedule( 

755 function=process_single_file, 

756 args=( 

757 ( 

758 filepath, 

759 input_dir, 

760 dir_split_number, 

761 items_per_file, 

762 redis_params, 

763 ), 

764 ), 

765 ) 

766 future.add_done_callback(lambda f: task_done(f, result_buffer)) 

767 futures.append(future) 

768 

769 # Wait for all futures to complete 

770 for future in futures: 

771 try: 

772 future.result() 

773 except Exception as e: 

774 print(f"Error processing file: {e}") 

775 return # Exit without clearing Redis if there's an error 

776 

777 # Flush any remaining results and close progress bar 

778 result_buffer.flush() 

779 result_buffer.close_progress_bar() 

780 

781 # Clear Redis cache only if we've successfully completed everything 

782 redis_client.delete("processed_omids") 

783 print("Processing complete. Redis cache cleared.") 

784 

785 

786def write_csv(filepath: str, data: List[Dict[str, str]]) -> None: 

787 """Write data to CSV file""" 

788 with open(filepath, "w", newline="", encoding="utf-8") as f: 

789 writer = csv.DictWriter(f, fieldnames=FIELDNAMES) 

790 writer.writeheader() 

791 writer.writerows(data)