Coverage for oc_meta/plugins/csv_generator_lite/csv_generator_lite.py: 34%

400 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-12-20 08:55 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2024 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17from __future__ import annotations 

18 

19import concurrent.futures 

20import csv 

21import json 

22import multiprocessing 

23import os 

24import re 

25from typing import Dict, List, Optional 

26from zipfile import ZipFile 

27 

28import redis 

29from pebble import ProcessPool 

30from tqdm import tqdm 

31 

32# Increase CSV field size limit to handle large fields 

33csv.field_size_limit(2**31 - 1) # Set to max value for 32-bit systems 

34 

35# Constants 

36FIELDNAMES = [ 

37 "id", 

38 "title", 

39 "author", 

40 "issue", 

41 "volume", 

42 "venue", 

43 "page", 

44 "pub_date", 

45 "type", 

46 "publisher", 

47 "editor", 

48] 

49 

50URI_TYPE_DICT = { 

51 "http://purl.org/spar/doco/Abstract": "abstract", 

52 "http://purl.org/spar/fabio/ArchivalDocument": "archival document", 

53 "http://purl.org/spar/fabio/AudioDocument": "audio document", 

54 "http://purl.org/spar/fabio/Book": "book", 

55 "http://purl.org/spar/fabio/BookChapter": "book chapter", 

56 "http://purl.org/spar/fabio/ExpressionCollection": "book section", 

57 "http://purl.org/spar/fabio/BookSeries": "book series", 

58 "http://purl.org/spar/fabio/BookSet": "book set", 

59 "http://purl.org/spar/fabio/ComputerProgram": "computer program", 

60 "http://purl.org/spar/doco/Part": "book part", 

61 "http://purl.org/spar/fabio/Expression": "", 

62 "http://purl.org/spar/fabio/DataFile": "dataset", 

63 "http://purl.org/spar/fabio/DataManagementPlan": "data management plan", 

64 "http://purl.org/spar/fabio/Thesis": "dissertation", 

65 "http://purl.org/spar/fabio/Editorial": "editorial", 

66 "http://purl.org/spar/fabio/Journal": "journal", 

67 "http://purl.org/spar/fabio/JournalArticle": "journal article", 

68 "http://purl.org/spar/fabio/JournalEditorial": "journal editorial", 

69 "http://purl.org/spar/fabio/JournalIssue": "journal issue", 

70 "http://purl.org/spar/fabio/JournalVolume": "journal volume", 

71 "http://purl.org/spar/fabio/Newspaper": "newspaper", 

72 "http://purl.org/spar/fabio/NewspaperArticle": "newspaper article", 

73 "http://purl.org/spar/fabio/NewspaperIssue": "newspaper issue", 

74 "http://purl.org/spar/fr/ReviewVersion": "peer review", 

75 "http://purl.org/spar/fabio/AcademicProceedings": "proceedings", 

76 "http://purl.org/spar/fabio/Preprint": "preprint", 

77 "http://purl.org/spar/fabio/Presentation": "presentation", 

78 "http://purl.org/spar/fabio/ProceedingsPaper": "proceedings article", 

79 "http://purl.org/spar/fabio/ReferenceBook": "reference book", 

80 "http://purl.org/spar/fabio/ReferenceEntry": "reference entry", 

81 "http://purl.org/spar/fabio/ReportDocument": "report", 

82 "http://purl.org/spar/fabio/RetractionNotice": "retraction notice", 

83 "http://purl.org/spar/fabio/Series": "series", 

84 "http://purl.org/spar/fabio/SpecificationDocument": "standard", 

85 "http://purl.org/spar/fabio/WebContent": "web content", 

86} 

87 

88# Global cache for JSON files 

89_json_cache = {} 

90 

91 

92def init_redis_connection( 

93 host: str = "localhost", port: int = 6379, db: int = 2 

94) -> redis.Redis: 

95 """Initialize Redis connection with given parameters""" 

96 client = redis.Redis(host=host, port=port, db=db, decode_responses=True) 

97 # Test the connection by sending a PING command 

98 client.ping() # This will raise redis.ConnectionError if connection fails 

99 return client 

100 

101 

102def increase_csv_field_limit() -> int: 

103 """Incrementally increase the CSV field size limit until it works. 

104 Returns the successful limit value.""" 

105 current_limit = csv.field_size_limit() 

106 while True: 

107 try: 

108 csv.field_size_limit(current_limit) 

109 return current_limit 

110 except OverflowError: 

111 # If we get an overflow, try a smaller value 

112 if current_limit >= sys.maxsize: 

113 current_limit = int(sys.maxsize / 2) 

114 else: 

115 break 

116 except Exception: 

117 break 

118 

119 # Try next value 

120 current_limit *= 2 

121 

122 # If we get here, we need to try increasing from the default 

123 current_limit = csv.field_size_limit() 

124 while True: 

125 try: 

126 next_limit = current_limit * 2 

127 csv.field_size_limit(next_limit) 

128 current_limit = next_limit 

129 return current_limit 

130 except Exception as e: 

131 # If we can't increase further, return the last working value 

132 csv.field_size_limit(current_limit) 

133 return current_limit 

134 

135 

136def load_processed_omids_to_redis(output_dir: str, redis_client: redis.Redis) -> int: 

137 """Load all previously processed OMID from existing CSV files into Redis. 

138 This is done only once at startup to create the initial cache.""" 

139 if not os.path.exists(output_dir): 

140 return 0 

141 

142 # Clear any existing data in Redis 

143 redis_client.delete("processed_omids") 

144 

145 count = 0 

146 BATCH_SIZE = 1000 # Process commands in batches of 1000 

147 

148 # Get list of CSV files first 

149 csv_files = [f for f in os.listdir(output_dir) if f.endswith(".csv")] 

150 

151 # Read all CSV files and collect OMIDs with progress bar 

152 for filename in tqdm(csv_files, desc="Loading existing identifiers"): 

153 filepath = os.path.join(output_dir, filename) 

154 with open(filepath, "r", encoding="utf-8") as f: 

155 reader = csv.DictReader(f) 

156 batch_pipe = redis_client.pipeline() 

157 batch_count = 0 

158 

159 for row in reader: 

160 # Extract all OMIDs from the id field 

161 omids = [ 

162 id_part.strip() 

163 for id_part in row["id"].split() 

164 if id_part.startswith("omid:br/") 

165 ] 

166 # Add to Redis pipeline 

167 for omid in omids: 

168 batch_pipe.sadd("processed_omids", omid) 

169 batch_count += 1 

170 count += 1 

171 

172 # Execute batch if we've reached the batch size 

173 if batch_count >= BATCH_SIZE: 

174 batch_pipe.execute() 

175 batch_pipe = redis_client.pipeline() 

176 batch_count = 0 

177 

178 # Execute any remaining commands in the pipeline 

179 if batch_count > 0: 

180 batch_pipe.execute() 

181 

182 return count 

183 

184 

185def is_omid_processed(omid: str, redis_client: redis.Redis) -> bool: 

186 """Check if an OMID has been processed by looking it up in Redis""" 

187 return redis_client.sismember("processed_omids", omid) 

188 

189 

190def find_file( 

191 rdf_dir: str, dir_split_number: int, items_per_file: int, uri: str 

192) -> Optional[str]: 

193 """Find the file path for a given URI based on the directory structure""" 

194 entity_regex: str = ( 

195 r"^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$" 

196 ) 

197 entity_match = re.match(entity_regex, uri) 

198 if entity_match: 

199 cur_number = int(entity_match.group(4)) 

200 cur_file_split = ( 

201 (cur_number - 1) // items_per_file 

202 ) * items_per_file + items_per_file 

203 cur_split = ( 

204 (cur_number - 1) // dir_split_number 

205 ) * dir_split_number + dir_split_number 

206 

207 short_name = entity_match.group(2) 

208 sub_folder = entity_match.group(3) or "" 

209 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split)) 

210 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + ".zip" 

211 

212 return cur_file_path if os.path.exists(cur_file_path) else None 

213 return None 

214 

215 

216def load_json_from_file(filepath: str) -> dict: 

217 """Load JSON data from a ZIP file, using cache if available""" 

218 if filepath in _json_cache: 

219 return _json_cache[filepath] 

220 

221 try: 

222 with ZipFile(filepath, "r") as zip_file: 

223 json_filename = zip_file.namelist()[0] 

224 with zip_file.open(json_filename) as json_file: 

225 json_content = json_file.read().decode("utf-8") 

226 json_data = json.loads(json_content) 

227 _json_cache[filepath] = json_data 

228 return json_data 

229 except Exception as e: 

230 print(f"Error loading file {filepath}: {e}") 

231 return {} 

232 

233 

234def process_identifier(id_data: dict) -> Optional[str]: 

235 """Process identifier data to extract schema and value""" 

236 try: 

237 id_schema = id_data["http://purl.org/spar/datacite/usesIdentifierScheme"][0][ 

238 "@id" 

239 ].split("/datacite/")[1] 

240 literal_value = id_data[ 

241 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue" 

242 ][0]["@value"] 

243 return f"{id_schema}:{literal_value}" 

244 except (KeyError, IndexError): 

245 return None 

246 

247 

248def process_responsible_agent( 

249 ra_data: dict, ra_uri: str, rdf_dir: str, dir_split_number: int, items_per_file: int 

250) -> Optional[str]: 

251 """Process responsible agent data to extract name and identifiers""" 

252 try: 

253 # Process name 

254 family_name = ra_data.get("http://xmlns.com/foaf/0.1/familyName", [{}])[0].get( 

255 "@value", "" 

256 ) 

257 given_name = ra_data.get("http://xmlns.com/foaf/0.1/givenName", [{}])[0].get( 

258 "@value", "" 

259 ) 

260 foaf_name = ra_data.get("http://xmlns.com/foaf/0.1/name", [{}])[0].get( 

261 "@value", "" 

262 ) 

263 

264 # Determine name format 

265 if family_name or given_name: 

266 if family_name and given_name: 

267 name = f"{family_name}, {given_name}" 

268 elif family_name: 

269 name = f"{family_name}," 

270 else: 

271 name = f", {given_name}" 

272 elif foaf_name: 

273 name = foaf_name 

274 else: 

275 return None 

276 

277 # Get OMID 

278 omid = ra_uri.split("/")[-1] 

279 identifiers = [f"omid:ra/{omid}"] 

280 

281 # Process other identifiers 

282 if "http://purl.org/spar/datacite/hasIdentifier" in ra_data: 

283 for identifier in ra_data["http://purl.org/spar/datacite/hasIdentifier"]: 

284 id_uri = identifier["@id"] 

285 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri) 

286 if id_file: 

287 id_data = load_json_from_file(id_file) 

288 for graph in id_data: 

289 for entity in graph.get("@graph", []): 

290 if entity["@id"] == id_uri: 

291 id_value = process_identifier(entity) 

292 if id_value: 

293 identifiers.append(id_value) 

294 

295 if identifiers: 

296 return f"{name} [{' '.join(identifiers)}]" 

297 return name 

298 except (KeyError, IndexError): 

299 return None 

300 

301 

302def process_venue_title( 

303 venue_data: dict, 

304 venue_uri: str, 

305 rdf_dir: str, 

306 dir_split_number: int, 

307 items_per_file: int, 

308) -> str: 

309 """Process venue title and its identifiers""" 

310 venue_title = venue_data.get("http://purl.org/dc/terms/title", [{}])[0].get( 

311 "@value", "" 

312 ) 

313 if not venue_title: 

314 return "" 

315 

316 # Get OMID 

317 omid = venue_uri.split("/")[-1] 

318 identifiers = [f"omid:br/{omid}"] 

319 

320 # Process other identifiers 

321 if "http://purl.org/spar/datacite/hasIdentifier" in venue_data: 

322 for identifier in venue_data["http://purl.org/spar/datacite/hasIdentifier"]: 

323 id_uri = identifier["@id"] 

324 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri) 

325 if id_file: 

326 id_data = load_json_from_file(id_file) 

327 for graph in id_data: 

328 for entity in graph.get("@graph", []): 

329 if entity["@id"] == id_uri: 

330 id_value = process_identifier(entity) 

331 if id_value: 

332 identifiers.append(id_value) 

333 

334 return f"{venue_title} [{' '.join(identifiers)}]" if identifiers else venue_title 

335 

336 

337def process_hierarchical_venue( 

338 entity: dict, rdf_dir: str, dir_split_number: int, items_per_file: int 

339) -> Dict[str, str]: 

340 """Process hierarchical venue structure recursively""" 

341 result = {"volume": "", "issue": "", "venue": ""} 

342 entity_types = entity.get("@type", []) 

343 

344 if "http://purl.org/spar/fabio/JournalIssue" in entity_types: 

345 result["issue"] = entity.get( 

346 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}] 

347 )[0].get("@value", "") 

348 elif "http://purl.org/spar/fabio/JournalVolume" in entity_types: 

349 result["volume"] = entity.get( 

350 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}] 

351 )[0].get("@value", "") 

352 else: 

353 # It's a main venue (journal, book series, etc.) 

354 result["venue"] = process_venue_title( 

355 entity, entity["@id"], rdf_dir, dir_split_number, items_per_file 

356 ) 

357 return result 

358 

359 # Process parent if exists 

360 if "http://purl.org/vocab/frbr/core#partOf" in entity: 

361 parent_uri = entity["http://purl.org/vocab/frbr/core#partOf"][0]["@id"] 

362 parent_file = find_file(rdf_dir, dir_split_number, items_per_file, parent_uri) 

363 if parent_file: 

364 parent_data = load_json_from_file(parent_file) 

365 for graph in parent_data: 

366 for parent_entity in graph.get("@graph", []): 

367 if parent_entity["@id"] == parent_uri: 

368 parent_info = process_hierarchical_venue( 

369 parent_entity, rdf_dir, dir_split_number, items_per_file 

370 ) 

371 # Merge parent info, keeping our current values 

372 for key, value in parent_info.items(): 

373 if not result[key]: # Only update if we don't have a value 

374 result[key] = value 

375 

376 return result 

377 

378 

379def find_first_ar_by_role( 

380 agent_roles: Dict, next_relations: Dict, role_type: str 

381) -> Optional[str]: 

382 """Find the first AR for a specific role type that isn't referenced by any other AR of the same role""" 

383 # Get all ARs of this role type 

384 role_ars = { 

385 ar_uri: ar_data 

386 for ar_uri, ar_data in agent_roles.items() 

387 if role_type 

388 in ar_data.get("http://purl.org/spar/pro/withRole", [{}])[0].get("@id", "") 

389 } 

390 

391 # Get all "next" relations between ARs of this role type 

392 role_next_relations = { 

393 ar_uri: next_ar 

394 for ar_uri, next_ar in next_relations.items() 

395 if ar_uri in role_ars and next_ar in role_ars 

396 } 

397 

398 # Find the AR that isn't referenced as next by any other AR of this role 

399 referenced_ars = set(role_next_relations.values()) 

400 for ar_uri in role_ars: 

401 if ar_uri not in referenced_ars: 

402 return ar_uri 

403 

404 # If no first AR found, take the first one from the role ARs 

405 return next(iter(role_ars)) if role_ars else None 

406 

407 

408def process_bibliographic_resource( 

409 br_data: dict, rdf_dir: str, dir_split_number: int, items_per_file: int 

410) -> Optional[Dict[str, str]]: 

411 """Process bibliographic resource data and its related entities""" 

412 # Skip if the entity is a JournalVolume or JournalIssue 

413 br_types = br_data.get("@type", []) 

414 if ( 

415 "http://purl.org/spar/fabio/JournalVolume" in br_types 

416 or "http://purl.org/spar/fabio/JournalIssue" in br_types 

417 ): 

418 return None 

419 

420 output = {field: "" for field in FIELDNAMES} 

421 

422 try: 

423 # Extract OMID and basic BR information 

424 entity_id = br_data.get("@id", "") 

425 identifiers = [f'omid:br/{entity_id.split("/")[-1]}'] if entity_id else [] 

426 

427 output["title"] = br_data.get("http://purl.org/dc/terms/title", [{}])[0].get( 

428 "@value", "" 

429 ) 

430 output["pub_date"] = br_data.get( 

431 "http://prismstandard.org/namespaces/basic/2.0/publicationDate", [{}] 

432 )[0].get("@value", "") 

433 

434 # Extract type 

435 br_types = [ 

436 t 

437 for t in br_data.get("@type", []) 

438 if t != "http://purl.org/spar/fabio/Expression" 

439 ] 

440 output["type"] = URI_TYPE_DICT.get(br_types[0], "") if br_types else "" 

441 

442 # Process identifiers 

443 if "http://purl.org/spar/datacite/hasIdentifier" in br_data: 

444 for identifier in br_data["http://purl.org/spar/datacite/hasIdentifier"]: 

445 id_uri = identifier["@id"] 

446 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri) 

447 if id_file: 

448 id_data = load_json_from_file(id_file) 

449 for graph in id_data: 

450 for entity in graph.get("@graph", []): 

451 if entity["@id"] == id_uri: 

452 id_value = process_identifier(entity) 

453 if id_value: 

454 identifiers.append(id_value) 

455 output["id"] = " ".join(identifiers) 

456 

457 # Process authors, editors and publishers through agent roles 

458 authors = [] 

459 editors = [] 

460 publishers = [] 

461 

462 # Create a dictionary to store agent roles and their next relations 

463 agent_roles = {} 

464 next_relations = {} 

465 

466 if "http://purl.org/spar/pro/isDocumentContextFor" in br_data: 

467 # First pass: collect all agent roles and their next relations 

468 for ar_data in br_data["http://purl.org/spar/pro/isDocumentContextFor"]: 

469 ar_uri = ar_data["@id"] 

470 ar_file = find_file(rdf_dir, dir_split_number, items_per_file, ar_uri) 

471 if ar_file: 

472 ar_data = load_json_from_file(ar_file) 

473 for graph in ar_data: 

474 for entity in graph.get("@graph", []): 

475 if entity["@id"] == ar_uri: 

476 # Store the agent role data 

477 agent_roles[ar_uri] = entity 

478 # Store the next relation if it exists 

479 if "https://w3id.org/oc/ontology/hasNext" in entity: 

480 next_ar = entity[ 

481 "https://w3id.org/oc/ontology/hasNext" 

482 ][0]["@id"] 

483 next_relations[ar_uri] = next_ar 

484 

485 # Process each role type separately 

486 for role_type, role_list in [ 

487 ("author", authors), 

488 ("editor", editors), 

489 ("publisher", publishers), 

490 ]: 

491 first_ar = find_first_ar_by_role(agent_roles, next_relations, role_type) 

492 if not first_ar: 

493 continue 

494 

495 # Process agent roles in order for this role type 

496 current_ar = first_ar 

497 processed_ars = set() 

498 max_iterations = len(agent_roles) 

499 iterations = 0 

500 

501 while current_ar and current_ar in agent_roles: 

502 if current_ar in processed_ars or iterations >= max_iterations: 

503 print( 

504 f"Warning: Detected cycle in hasNext relations or exceeded maximum iterations at AR: {current_ar}" 

505 ) 

506 break 

507 

508 processed_ars.add(current_ar) 

509 iterations += 1 

510 

511 # Process the current AR 

512 entity = agent_roles[current_ar] 

513 role = entity.get("http://purl.org/spar/pro/withRole", [{}])[0].get( 

514 "@id", "" 

515 ) 

516 

517 # Only process if it matches our current role type 

518 if role_type in role: 

519 if "http://purl.org/spar/pro/isHeldBy" in entity: 

520 ra_uri = entity["http://purl.org/spar/pro/isHeldBy"][0][ 

521 "@id" 

522 ] 

523 ra_file = find_file( 

524 rdf_dir, dir_split_number, items_per_file, ra_uri 

525 ) 

526 if ra_file: 

527 ra_data = load_json_from_file(ra_file) 

528 for ra_graph in ra_data: 

529 for ra_entity in ra_graph.get("@graph", []): 

530 if ra_entity["@id"] == ra_uri: 

531 agent_name = process_responsible_agent( 

532 ra_entity, 

533 ra_uri, 

534 rdf_dir, 

535 dir_split_number, 

536 items_per_file, 

537 ) 

538 if agent_name: 

539 role_list.append(agent_name) 

540 

541 # Move to next agent role 

542 current_ar = next_relations.get(current_ar) 

543 

544 output["author"] = "; ".join(authors) 

545 output["editor"] = "; ".join(editors) 

546 output["publisher"] = "; ".join(publishers) 

547 

548 # Process venue information 

549 if "http://purl.org/vocab/frbr/core#partOf" in br_data: 

550 venue_uri = br_data["http://purl.org/vocab/frbr/core#partOf"][0]["@id"] 

551 venue_file = find_file(rdf_dir, dir_split_number, items_per_file, venue_uri) 

552 if venue_file: 

553 venue_data = load_json_from_file(venue_file) 

554 for graph in venue_data: 

555 for entity in graph.get("@graph", []): 

556 if entity["@id"] == venue_uri: 

557 venue_info = process_hierarchical_venue( 

558 entity, rdf_dir, dir_split_number, items_per_file 

559 ) 

560 output.update(venue_info) 

561 

562 # Process page information 

563 if "http://purl.org/vocab/frbr/core#embodiment" in br_data: 

564 page_uri = br_data["http://purl.org/vocab/frbr/core#embodiment"][0]["@id"] 

565 page_file = find_file(rdf_dir, dir_split_number, items_per_file, page_uri) 

566 if page_file: 

567 page_data = load_json_from_file(page_file) 

568 for graph in page_data: 

569 for entity in graph.get("@graph", []): 

570 if entity["@id"] == page_uri: 

571 start_page = entity.get( 

572 "http://prismstandard.org/namespaces/basic/2.0/startingPage", 

573 [{}], 

574 )[0].get("@value", "") 

575 end_page = entity.get( 

576 "http://prismstandard.org/namespaces/basic/2.0/endingPage", 

577 [{}], 

578 )[0].get("@value", "") 

579 if start_page or end_page: 

580 output["page"] = f"{start_page}-{end_page}" 

581 

582 except (KeyError, IndexError) as e: 

583 print(f"Error processing bibliographic resource: {e}") 

584 

585 return output 

586 

587 

588def process_single_file(args): 

589 """Process a single file and return the results""" 

590 filepath, input_dir, dir_split_number, items_per_file, redis_params = args 

591 results = [] 

592 

593 # Initialize Redis client once 

594 redis_client = redis.Redis( 

595 host=redis_params["host"], 

596 port=redis_params["port"], 

597 db=redis_params["db"], 

598 decode_responses=True, 

599 ) 

600 

601 data = load_json_from_file(filepath) 

602 for graph in data: 

603 for entity in graph.get("@graph", []): 

604 # Skip if this is a JournalVolume or JournalIssue 

605 entity_types = entity.get("@type", []) 

606 if ( 

607 "http://purl.org/spar/fabio/JournalVolume" in entity_types 

608 or "http://purl.org/spar/fabio/JournalIssue" in entity_types 

609 ): 

610 continue 

611 

612 # Extract OMID from entity ID 

613 entity_id = entity.get("@id", "") 

614 if entity_id: 

615 omid = f"omid:br/{entity_id.split('/')[-1]}" 

616 # Skip if already processed 

617 if is_omid_processed(omid, redis_client): 

618 continue 

619 

620 br_data = process_bibliographic_resource( 

621 entity, input_dir, dir_split_number, items_per_file 

622 ) 

623 if br_data: 

624 results.append(br_data) 

625 

626 return results 

627 

628 

629class ResultBuffer: 

630 """Buffer to collect results and write them when reaching the row limit""" 

631 

632 def __init__(self, output_dir: str, max_rows: int = 3000): 

633 self.buffer = [] 

634 self.output_dir = output_dir 

635 self.max_rows = max_rows 

636 self.file_counter = self._get_last_file_number() + 1 

637 self.pbar = None 

638 

639 def _get_last_file_number(self) -> int: 

640 """Find the highest file number from existing output files""" 

641 if not os.path.exists(self.output_dir): 

642 return -1 

643 

644 max_number = -1 

645 for filename in os.listdir(self.output_dir): 

646 if filename.startswith("output_") and filename.endswith(".csv"): 

647 try: 

648 number = int(filename[7:-4]) # Extract number from 'output_X.csv' 

649 max_number = max(max_number, number) 

650 except ValueError: 

651 continue 

652 return max_number 

653 

654 def set_progress_bar(self, total: int) -> None: 

655 """Initialize progress bar with total number of files""" 

656 self.pbar = tqdm(total=total, desc="Processing files") 

657 

658 def update_progress(self) -> None: 

659 """Update progress bar""" 

660 if self.pbar: 

661 self.pbar.update(1) 

662 

663 def close_progress_bar(self) -> None: 

664 """Close progress bar""" 

665 if self.pbar: 

666 self.pbar.close() 

667 

668 def add_results(self, results: List[Dict[str, str]]) -> None: 

669 """Add results to buffer and write to file if max_rows is reached""" 

670 self.buffer.extend(results) 

671 while len(self.buffer) >= self.max_rows: 

672 self._write_buffer_chunk() 

673 

674 def _write_buffer_chunk(self) -> None: 

675 """Write max_rows records to a new file""" 

676 chunk = self.buffer[: self.max_rows] 

677 output_file = os.path.join(self.output_dir, f"output_{self.file_counter}.csv") 

678 write_csv(output_file, chunk) 

679 self.buffer = self.buffer[self.max_rows :] 

680 self.file_counter += 1 

681 

682 def flush(self) -> None: 

683 """Write any remaining results in buffer""" 

684 if self.buffer: 

685 output_file = os.path.join( 

686 self.output_dir, f"output_{self.file_counter}.csv" 

687 ) 

688 write_csv(output_file, self.buffer) 

689 self.buffer = [] 

690 self.file_counter += 1 

691 

692 

693def task_done(future: concurrent.futures.Future, result_buffer: ResultBuffer): 

694 """Callback function for completed tasks""" 

695 try: 

696 results = future.result() 

697 if results: 

698 result_buffer.add_results(results) 

699 result_buffer.update_progress() # Update progress after task completion 

700 except Exception as e: 

701 print(f"Task failed: {e}") 

702 result_buffer.update_progress() # Update progress even if task fails 

703 

704 

705def generate_csv( 

706 input_dir: str, 

707 output_dir: str, 

708 dir_split_number: int, 

709 items_per_file: int, 

710 zip_output_rdf: bool, 

711 redis_host: str = "localhost", 

712 redis_port: int = 6379, 

713 redis_db: int = 2, 

714) -> None: 

715 """Generate CSV files from RDF data using Pebble for process management""" 

716 if not os.path.exists(output_dir): 

717 os.makedirs(output_dir) 

718 

719 # Initialize Redis connection and load initial cache 

720 redis_client = init_redis_connection(redis_host, redis_port, redis_db) 

721 processed_count = load_processed_omids_to_redis(output_dir, redis_client) 

722 

723 # Process only files in the 'br' directory 

724 br_dir = os.path.join(input_dir, "br") 

725 if not os.path.exists(br_dir): 

726 print(f"Error: bibliographic resources directory not found at {br_dir}") 

727 return 

728 

729 # Collect all ZIP files 

730 all_files = [] 

731 for root, _, files in os.walk(br_dir): 

732 if "prov" in root: 

733 continue 

734 all_files.extend(os.path.join(root, f) for f in files if f.endswith(".zip")) 

735 

736 if not all_files: 

737 print("No files found to process") 

738 return 

739 

740 print(f"Processing {len(all_files)} files...") 

741 

742 # Redis connection parameters to pass to worker processes 

743 redis_params = {"host": redis_host, "port": redis_port, "db": redis_db} 

744 

745 # Create result buffer and initialize progress bar 

746 result_buffer = ResultBuffer(output_dir) 

747 result_buffer.set_progress_bar(len(all_files)) 

748 

749 # Process files one at a time using Pebble 

750 with ProcessPool(max_workers=os.cpu_count(), max_tasks=1, context=multiprocessing.get_context('spawn')) as executor: 

751 futures: List[concurrent.futures.Future] = [] 

752 for filepath in all_files: 

753 future = executor.schedule( 

754 function=process_single_file, 

755 args=( 

756 ( 

757 filepath, 

758 input_dir, 

759 dir_split_number, 

760 items_per_file, 

761 redis_params, 

762 ), 

763 ), 

764 ) 

765 future.add_done_callback(lambda f: task_done(f, result_buffer)) 

766 futures.append(future) 

767 

768 # Wait for all futures to complete 

769 for future in futures: 

770 try: 

771 future.result() 

772 except Exception as e: 

773 print(f"Error processing file: {e}") 

774 return # Exit without clearing Redis if there's an error 

775 

776 # Flush any remaining results and close progress bar 

777 result_buffer.flush() 

778 result_buffer.close_progress_bar() 

779 

780 # Clear Redis cache only if we've successfully completed everything 

781 redis_client.delete("processed_omids") 

782 print("Processing complete. Redis cache cleared.") 

783 

784 

785def write_csv(filepath: str, data: List[Dict[str, str]]) -> None: 

786 """Write data to CSV file""" 

787 with open(filepath, "w", newline="", encoding="utf-8") as f: 

788 writer = csv.DictWriter(f, fieldnames=FIELDNAMES) 

789 writer.writeheader() 

790 writer.writerows(data)