Coverage for oc_meta/plugins/csv_generator_lite/csv_generator_lite.py: 36%

356 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2026-01-15 10:29 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2024 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17from __future__ import annotations 

18 

19import csv 

20import json 

21import multiprocessing 

22import os 

23import re 

24from typing import Dict, List, Optional 

25from zipfile import ZipFile 

26 

27import redis 

28from pebble import ProcessPool 

29from tqdm import tqdm 

30 

31csv.field_size_limit(2**31 - 1) 

32 

33FIELDNAMES = [ 

34 "id", 

35 "title", 

36 "author", 

37 "issue", 

38 "volume", 

39 "venue", 

40 "page", 

41 "pub_date", 

42 "type", 

43 "publisher", 

44 "editor", 

45] 

46 

47URI_TYPE_DICT = { 

48 "http://purl.org/spar/doco/Abstract": "abstract", 

49 "http://purl.org/spar/fabio/ArchivalDocument": "archival document", 

50 "http://purl.org/spar/fabio/AudioDocument": "audio document", 

51 "http://purl.org/spar/fabio/Book": "book", 

52 "http://purl.org/spar/fabio/BookChapter": "book chapter", 

53 "http://purl.org/spar/fabio/ExpressionCollection": "book section", 

54 "http://purl.org/spar/fabio/BookSeries": "book series", 

55 "http://purl.org/spar/fabio/BookSet": "book set", 

56 "http://purl.org/spar/fabio/ComputerProgram": "computer program", 

57 "http://purl.org/spar/doco/Part": "book part", 

58 "http://purl.org/spar/fabio/Expression": "", 

59 "http://purl.org/spar/fabio/DataFile": "dataset", 

60 "http://purl.org/spar/fabio/DataManagementPlan": "data management plan", 

61 "http://purl.org/spar/fabio/Thesis": "dissertation", 

62 "http://purl.org/spar/fabio/Editorial": "editorial", 

63 "http://purl.org/spar/fabio/Journal": "journal", 

64 "http://purl.org/spar/fabio/JournalArticle": "journal article", 

65 "http://purl.org/spar/fabio/JournalEditorial": "journal editorial", 

66 "http://purl.org/spar/fabio/JournalIssue": "journal issue", 

67 "http://purl.org/spar/fabio/JournalVolume": "journal volume", 

68 "http://purl.org/spar/fabio/Newspaper": "newspaper", 

69 "http://purl.org/spar/fabio/NewspaperArticle": "newspaper article", 

70 "http://purl.org/spar/fabio/NewspaperIssue": "newspaper issue", 

71 "http://purl.org/spar/fr/ReviewVersion": "peer review", 

72 "http://purl.org/spar/fabio/AcademicProceedings": "proceedings", 

73 "http://purl.org/spar/fabio/Preprint": "preprint", 

74 "http://purl.org/spar/fabio/Presentation": "presentation", 

75 "http://purl.org/spar/fabio/ProceedingsPaper": "proceedings article", 

76 "http://purl.org/spar/fabio/ReferenceBook": "reference book", 

77 "http://purl.org/spar/fabio/ReferenceEntry": "reference entry", 

78 "http://purl.org/spar/fabio/ReportDocument": "report", 

79 "http://purl.org/spar/fabio/RetractionNotice": "retraction notice", 

80 "http://purl.org/spar/fabio/Series": "series", 

81 "http://purl.org/spar/fabio/SpecificationDocument": "standard", 

82 "http://purl.org/spar/fabio/WebContent": "web content", 

83} 

84 

85def init_redis_connection( 

86 host: str = "localhost", port: int = 6379, db: int = 2 

87) -> redis.Redis: 

88 client = redis.Redis(host=host, port=port, db=db, decode_responses=True) 

89 client.ping() 

90 return client 

91 

92 

93def load_processed_omids_to_redis(output_dir: str, redis_client: redis.Redis) -> int: 

94 if not os.path.exists(output_dir): 

95 return 0 

96 

97 redis_client.delete("processed_omids") 

98 

99 count = 0 

100 BATCH_SIZE = 1000 

101 csv_files = [f for f in os.listdir(output_dir) if f.endswith(".csv")] 

102 

103 for filename in tqdm(csv_files, desc="Loading existing identifiers"): 

104 filepath = os.path.join(output_dir, filename) 

105 with open(filepath, "r", encoding="utf-8") as f: 

106 reader = csv.DictReader(f) 

107 batch_pipe = redis_client.pipeline() 

108 batch_count = 0 

109 

110 for row in reader: 

111 omids = [ 

112 id_part.strip() 

113 for id_part in row["id"].split() 

114 if id_part.startswith("omid:br/") 

115 ] 

116 for omid in omids: 

117 batch_pipe.sadd("processed_omids", omid) 

118 batch_count += 1 

119 count += 1 

120 

121 if batch_count >= BATCH_SIZE: 

122 batch_pipe.execute() 

123 batch_pipe = redis_client.pipeline() 

124 batch_count = 0 

125 

126 if batch_count > 0: 

127 batch_pipe.execute() 

128 

129 return count 

130 

131 

132def is_omid_processed(omid: str, redis_client: redis.Redis) -> bool: 

133 return redis_client.sismember("processed_omids", omid) 

134 

135 

136def find_file( 

137 rdf_dir: str, dir_split_number: int, items_per_file: int, uri: str 

138) -> Optional[str]: 

139 entity_regex: str = ( 

140 r"^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$" 

141 ) 

142 entity_match = re.match(entity_regex, uri) 

143 if entity_match: 

144 cur_number = int(entity_match.group(4)) 

145 cur_file_split = ( 

146 (cur_number - 1) // items_per_file 

147 ) * items_per_file + items_per_file 

148 cur_split = ( 

149 (cur_number - 1) // dir_split_number 

150 ) * dir_split_number + dir_split_number 

151 

152 short_name = entity_match.group(2) 

153 sub_folder = entity_match.group(3) or "" 

154 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split)) 

155 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + ".zip" 

156 

157 return cur_file_path if os.path.exists(cur_file_path) else None 

158 return None 

159 

160 

161def load_json_from_file(filepath: str) -> dict: 

162 with ZipFile(filepath, "r") as zip_file: 

163 json_filename = zip_file.namelist()[0] 

164 with zip_file.open(json_filename) as json_file: 

165 json_content = json_file.read().decode("utf-8") 

166 return json.loads(json_content) 

167 

168 

169def process_identifier(id_data: dict) -> Optional[str]: 

170 try: 

171 id_schema = id_data["http://purl.org/spar/datacite/usesIdentifierScheme"][0][ 

172 "@id" 

173 ].split("/datacite/")[1] 

174 literal_value = id_data[ 

175 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue" 

176 ][0]["@value"] 

177 return f"{id_schema}:{literal_value}" 

178 except (KeyError, IndexError): 

179 return None 

180 

181 

182def process_responsible_agent( 

183 ra_data: dict, ra_uri: str, rdf_dir: str, dir_split_number: int, items_per_file: int 

184) -> Optional[str]: 

185 try: 

186 family_name = ra_data.get("http://xmlns.com/foaf/0.1/familyName", [{}])[0].get( 

187 "@value", "" 

188 ) 

189 given_name = ra_data.get("http://xmlns.com/foaf/0.1/givenName", [{}])[0].get( 

190 "@value", "" 

191 ) 

192 foaf_name = ra_data.get("http://xmlns.com/foaf/0.1/name", [{}])[0].get( 

193 "@value", "" 

194 ) 

195 

196 if family_name or given_name: 

197 if family_name and given_name: 

198 name = f"{family_name}, {given_name}" 

199 elif family_name: 

200 name = f"{family_name}," 

201 else: 

202 name = f", {given_name}" 

203 elif foaf_name: 

204 name = foaf_name 

205 else: 

206 return None 

207 

208 omid = ra_uri.split("/")[-1] 

209 identifiers = [f"omid:ra/{omid}"] 

210 

211 if "http://purl.org/spar/datacite/hasIdentifier" in ra_data: 

212 for identifier in ra_data["http://purl.org/spar/datacite/hasIdentifier"]: 

213 id_uri = identifier["@id"] 

214 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri) 

215 if id_file: 

216 id_data = load_json_from_file(id_file) 

217 for graph in id_data: 

218 for entity in graph.get("@graph", []): 

219 if entity["@id"] == id_uri: 

220 id_value = process_identifier(entity) 

221 if id_value: 

222 identifiers.append(id_value) 

223 

224 if identifiers: 

225 return f"{name} [{' '.join(identifiers)}]" 

226 return name 

227 except (KeyError, IndexError): 

228 return None 

229 

230 

231def process_venue_title( 

232 venue_data: dict, 

233 venue_uri: str, 

234 rdf_dir: str, 

235 dir_split_number: int, 

236 items_per_file: int, 

237) -> str: 

238 venue_title = venue_data.get("http://purl.org/dc/terms/title", [{}])[0].get( 

239 "@value", "" 

240 ) 

241 if not venue_title: 

242 return "" 

243 

244 omid = venue_uri.split("/")[-1] 

245 identifiers = [f"omid:br/{omid}"] 

246 

247 if "http://purl.org/spar/datacite/hasIdentifier" in venue_data: 

248 for identifier in venue_data["http://purl.org/spar/datacite/hasIdentifier"]: 

249 id_uri = identifier["@id"] 

250 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri) 

251 if id_file: 

252 id_data = load_json_from_file(id_file) 

253 for graph in id_data: 

254 for entity in graph.get("@graph", []): 

255 if entity["@id"] == id_uri: 

256 id_value = process_identifier(entity) 

257 if id_value: 

258 identifiers.append(id_value) 

259 

260 return f"{venue_title} [{' '.join(identifiers)}]" if identifiers else venue_title 

261 

262 

263def process_hierarchical_venue( 

264 entity: dict, rdf_dir: str, dir_split_number: int, items_per_file: int 

265) -> Dict[str, str]: 

266 result = {"volume": "", "issue": "", "venue": ""} 

267 entity_types = entity.get("@type", []) 

268 

269 if "http://purl.org/spar/fabio/JournalIssue" in entity_types: 

270 result["issue"] = entity.get( 

271 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}] 

272 )[0].get("@value", "") 

273 elif "http://purl.org/spar/fabio/JournalVolume" in entity_types: 

274 result["volume"] = entity.get( 

275 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}] 

276 )[0].get("@value", "") 

277 else: 

278 result["venue"] = process_venue_title( 

279 entity, entity["@id"], rdf_dir, dir_split_number, items_per_file 

280 ) 

281 return result 

282 

283 if "http://purl.org/vocab/frbr/core#partOf" in entity: 

284 parent_uri = entity["http://purl.org/vocab/frbr/core#partOf"][0]["@id"] 

285 parent_file = find_file(rdf_dir, dir_split_number, items_per_file, parent_uri) 

286 if parent_file: 

287 parent_data = load_json_from_file(parent_file) 

288 for graph in parent_data: 

289 for parent_entity in graph.get("@graph", []): 

290 if parent_entity["@id"] == parent_uri: 

291 parent_info = process_hierarchical_venue( 

292 parent_entity, rdf_dir, dir_split_number, items_per_file 

293 ) 

294 for key, value in parent_info.items(): 

295 if not result[key]: 

296 result[key] = value 

297 

298 return result 

299 

300 

301def find_first_ar_by_role( 

302 agent_roles: Dict, next_relations: Dict, role_type: str 

303) -> Optional[str]: 

304 role_ars = { 

305 ar_uri: ar_data 

306 for ar_uri, ar_data in agent_roles.items() 

307 if role_type 

308 in ar_data.get("http://purl.org/spar/pro/withRole", [{}])[0].get("@id", "") 

309 } 

310 

311 role_next_relations = { 

312 ar_uri: next_ar 

313 for ar_uri, next_ar in next_relations.items() 

314 if ar_uri in role_ars and next_ar in role_ars 

315 } 

316 

317 referenced_ars = set(role_next_relations.values()) 

318 for ar_uri in role_ars: 

319 if ar_uri not in referenced_ars: 

320 return ar_uri 

321 

322 return next(iter(role_ars)) if role_ars else None 

323 

324 

325def process_bibliographic_resource( 

326 br_data: dict, rdf_dir: str, dir_split_number: int, items_per_file: int 

327) -> Optional[Dict[str, str]]: 

328 br_types = br_data.get("@type", []) 

329 if ( 

330 "http://purl.org/spar/fabio/JournalVolume" in br_types 

331 or "http://purl.org/spar/fabio/JournalIssue" in br_types 

332 ): 

333 return None 

334 

335 output = {field: "" for field in FIELDNAMES} 

336 

337 try: 

338 entity_id = br_data.get("@id", "") 

339 identifiers = [f'omid:br/{entity_id.split("/")[-1]}'] if entity_id else [] 

340 

341 output["title"] = br_data.get("http://purl.org/dc/terms/title", [{}])[0].get( 

342 "@value", "" 

343 ) 

344 output["pub_date"] = br_data.get( 

345 "http://prismstandard.org/namespaces/basic/2.0/publicationDate", [{}] 

346 )[0].get("@value", "") 

347 

348 br_types = [ 

349 t 

350 for t in br_data.get("@type", []) 

351 if t != "http://purl.org/spar/fabio/Expression" 

352 ] 

353 output["type"] = URI_TYPE_DICT.get(br_types[0], "") if br_types else "" 

354 

355 if "http://purl.org/spar/datacite/hasIdentifier" in br_data: 

356 for identifier in br_data["http://purl.org/spar/datacite/hasIdentifier"]: 

357 id_uri = identifier["@id"] 

358 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri) 

359 if id_file: 

360 id_data = load_json_from_file(id_file) 

361 for graph in id_data: 

362 for entity in graph.get("@graph", []): 

363 if entity["@id"] == id_uri: 

364 id_value = process_identifier(entity) 

365 if id_value: 

366 identifiers.append(id_value) 

367 output["id"] = " ".join(identifiers) 

368 

369 authors = [] 

370 editors = [] 

371 publishers = [] 

372 agent_roles = {} 

373 next_relations = {} 

374 

375 if "http://purl.org/spar/pro/isDocumentContextFor" in br_data: 

376 for ar_data in br_data["http://purl.org/spar/pro/isDocumentContextFor"]: 

377 ar_uri = ar_data["@id"] 

378 ar_file = find_file(rdf_dir, dir_split_number, items_per_file, ar_uri) 

379 if ar_file: 

380 ar_data = load_json_from_file(ar_file) 

381 for graph in ar_data: 

382 for entity in graph.get("@graph", []): 

383 if entity["@id"] == ar_uri: 

384 agent_roles[ar_uri] = entity 

385 if "https://w3id.org/oc/ontology/hasNext" in entity: 

386 next_ar = entity[ 

387 "https://w3id.org/oc/ontology/hasNext" 

388 ][0]["@id"] 

389 next_relations[ar_uri] = next_ar 

390 

391 for role_type, role_list in [ 

392 ("author", authors), 

393 ("editor", editors), 

394 ("publisher", publishers), 

395 ]: 

396 first_ar = find_first_ar_by_role(agent_roles, next_relations, role_type) 

397 if not first_ar: 

398 continue 

399 

400 current_ar = first_ar 

401 processed_ars = set() 

402 max_iterations = len(agent_roles) 

403 iterations = 0 

404 

405 while current_ar and current_ar in agent_roles: 

406 if current_ar in processed_ars or iterations >= max_iterations: 

407 print( 

408 f"Warning: Detected cycle in hasNext relations or exceeded maximum iterations at AR: {current_ar}" 

409 ) 

410 break 

411 

412 processed_ars.add(current_ar) 

413 iterations += 1 

414 

415 entity = agent_roles[current_ar] 

416 role = entity.get("http://purl.org/spar/pro/withRole", [{}])[0].get( 

417 "@id", "" 

418 ) 

419 

420 if role_type in role: 

421 if "http://purl.org/spar/pro/isHeldBy" in entity: 

422 ra_uri = entity["http://purl.org/spar/pro/isHeldBy"][0][ 

423 "@id" 

424 ] 

425 ra_file = find_file( 

426 rdf_dir, dir_split_number, items_per_file, ra_uri 

427 ) 

428 if ra_file: 

429 ra_data = load_json_from_file(ra_file) 

430 for ra_graph in ra_data: 

431 for ra_entity in ra_graph.get("@graph", []): 

432 if ra_entity["@id"] == ra_uri: 

433 agent_name = process_responsible_agent( 

434 ra_entity, 

435 ra_uri, 

436 rdf_dir, 

437 dir_split_number, 

438 items_per_file, 

439 ) 

440 if agent_name: 

441 role_list.append(agent_name) 

442 

443 current_ar = next_relations.get(current_ar) 

444 

445 output["author"] = "; ".join(authors) 

446 output["editor"] = "; ".join(editors) 

447 output["publisher"] = "; ".join(publishers) 

448 

449 if "http://purl.org/vocab/frbr/core#partOf" in br_data: 

450 venue_uri = br_data["http://purl.org/vocab/frbr/core#partOf"][0]["@id"] 

451 venue_file = find_file(rdf_dir, dir_split_number, items_per_file, venue_uri) 

452 if venue_file: 

453 venue_data = load_json_from_file(venue_file) 

454 for graph in venue_data: 

455 for entity in graph.get("@graph", []): 

456 if entity["@id"] == venue_uri: 

457 venue_info = process_hierarchical_venue( 

458 entity, rdf_dir, dir_split_number, items_per_file 

459 ) 

460 output.update(venue_info) 

461 

462 if "http://purl.org/vocab/frbr/core#embodiment" in br_data: 

463 page_uri = br_data["http://purl.org/vocab/frbr/core#embodiment"][0]["@id"] 

464 page_file = find_file(rdf_dir, dir_split_number, items_per_file, page_uri) 

465 if page_file: 

466 page_data = load_json_from_file(page_file) 

467 for graph in page_data: 

468 for entity in graph.get("@graph", []): 

469 if entity["@id"] == page_uri: 

470 start_page = entity.get( 

471 "http://prismstandard.org/namespaces/basic/2.0/startingPage", 

472 [{}], 

473 )[0].get("@value", "") 

474 end_page = entity.get( 

475 "http://prismstandard.org/namespaces/basic/2.0/endingPage", 

476 [{}], 

477 )[0].get("@value", "") 

478 if start_page or end_page: 

479 output["page"] = f"{start_page}-{end_page}" 

480 

481 except (KeyError, IndexError) as e: 

482 print(f"Error processing bibliographic resource: {e}") 

483 

484 return output 

485 

486 

487def process_single_file(args): 

488 filepath, input_dir, dir_split_number, items_per_file, redis_params = args 

489 results = [] 

490 

491 redis_client = redis.Redis( 

492 host=redis_params["host"], 

493 port=redis_params["port"], 

494 db=redis_params["db"], 

495 decode_responses=True, 

496 ) 

497 

498 data = load_json_from_file(filepath) 

499 for graph in data: 

500 for entity in graph.get("@graph", []): 

501 entity_types = entity.get("@type", []) 

502 if ( 

503 "http://purl.org/spar/fabio/JournalVolume" in entity_types 

504 or "http://purl.org/spar/fabio/JournalIssue" in entity_types 

505 ): 

506 continue 

507 

508 entity_id = entity.get("@id", "") 

509 if entity_id: 

510 omid = f"omid:br/{entity_id.split('/')[-1]}" 

511 if is_omid_processed(omid, redis_client): 

512 continue 

513 

514 br_data = process_bibliographic_resource( 

515 entity, input_dir, dir_split_number, items_per_file 

516 ) 

517 if br_data: 

518 results.append(br_data) 

519 

520 return results 

521 

522 

523class ResultBuffer: 

524 def __init__(self, output_dir: str, max_rows: int = 3000): 

525 self.buffer = [] 

526 self.output_dir = output_dir 

527 self.max_rows = max_rows 

528 self.file_counter = self._get_last_file_number() + 1 

529 self.pbar = None 

530 

531 def _get_last_file_number(self) -> int: 

532 if not os.path.exists(self.output_dir): 

533 return -1 

534 

535 max_number = -1 

536 for filename in os.listdir(self.output_dir): 

537 if filename.startswith("output_") and filename.endswith(".csv"): 

538 try: 

539 number = int(filename[7:-4]) 

540 max_number = max(max_number, number) 

541 except ValueError: 

542 continue 

543 return max_number 

544 

545 def set_progress_bar(self, total: int) -> None: 

546 self.pbar = tqdm(total=total, desc="Processing files") 

547 

548 def update_progress(self) -> None: 

549 if self.pbar: 

550 self.pbar.update(1) 

551 

552 def close_progress_bar(self) -> None: 

553 if self.pbar: 

554 self.pbar.close() 

555 

556 def add_results(self, results: List[Dict[str, str]]) -> None: 

557 self.buffer.extend(results) 

558 while len(self.buffer) >= self.max_rows: 

559 self._write_buffer_chunk() 

560 

561 def _write_buffer_chunk(self) -> None: 

562 chunk = self.buffer[: self.max_rows] 

563 output_file = os.path.join(self.output_dir, f"output_{self.file_counter}.csv") 

564 write_csv(output_file, chunk) 

565 self.buffer = self.buffer[self.max_rows :] 

566 self.file_counter += 1 

567 

568 def flush(self) -> None: 

569 if self.buffer: 

570 output_file = os.path.join( 

571 self.output_dir, f"output_{self.file_counter}.csv" 

572 ) 

573 write_csv(output_file, self.buffer) 

574 self.buffer = [] 

575 self.file_counter += 1 

576 

577 

578def generate_csv( 

579 input_dir: str, 

580 output_dir: str, 

581 dir_split_number: int, 

582 items_per_file: int, 

583 redis_host: str = "localhost", 

584 redis_port: int = 6379, 

585 redis_db: int = 2, 

586) -> None: 

587 if not os.path.exists(output_dir): 

588 os.makedirs(output_dir) 

589 

590 redis_client = init_redis_connection(redis_host, redis_port, redis_db) 

591 load_processed_omids_to_redis(output_dir, redis_client) 

592 

593 br_dir = os.path.join(input_dir, "br") 

594 if not os.path.exists(br_dir): 

595 print(f"Error: bibliographic resources directory not found at {br_dir}") 

596 return 

597 

598 all_files = [] 

599 for root, _, files in os.walk(br_dir): 

600 if "prov" in root: 

601 continue 

602 all_files.extend(os.path.join(root, f) for f in files if f.endswith(".zip")) 

603 

604 if not all_files: 

605 print("No files found to process") 

606 return 

607 

608 print(f"Processing {len(all_files)} files...") 

609 

610 redis_params = {"host": redis_host, "port": redis_port, "db": redis_db} 

611 result_buffer = ResultBuffer(output_dir) 

612 result_buffer.set_progress_bar(len(all_files)) 

613 

614 with ProcessPool(max_workers=os.cpu_count(), max_tasks=1, context=multiprocessing.get_context('spawn')) as executor: 

615 for filepath in all_files: 

616 future = executor.schedule( 

617 function=process_single_file, 

618 args=( 

619 ( 

620 filepath, 

621 input_dir, 

622 dir_split_number, 

623 items_per_file, 

624 redis_params, 

625 ), 

626 ), 

627 ) 

628 try: 

629 results = future.result() 

630 if results: 

631 result_buffer.add_results(results) 

632 except Exception as e: 

633 print(f"Error processing file {filepath}: {e}") 

634 result_buffer.update_progress() 

635 

636 result_buffer.flush() 

637 result_buffer.close_progress_bar() 

638 redis_client.delete("processed_omids") 

639 print("Processing complete. Redis cache cleared.") 

640 

641 

642def write_csv(filepath: str, data: List[Dict[str, str]]) -> None: 

643 with open(filepath, "w", newline="", encoding="utf-8") as f: 

644 writer = csv.DictWriter(f, fieldnames=FIELDNAMES) 

645 writer.writeheader() 

646 writer.writerows(data)