Coverage for oc_meta / run / meta / generate_csv.py: 35%
394 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7from __future__ import annotations
9import csv
10import os
11import re
12from argparse import ArgumentParser
13from functools import lru_cache
14import multiprocessing
15from typing import Dict, List, Optional, Tuple
16from zipfile import ZipFile
18import orjson
19import redis
20import yaml
22from oc_meta.lib.console import create_progress
23from oc_meta.lib.file_manager import collect_zip_files
25csv.field_size_limit(2**31 - 1)
27FIELDNAMES = [
28 "id",
29 "title",
30 "author",
31 "issue",
32 "volume",
33 "venue",
34 "page",
35 "pub_date",
36 "type",
37 "publisher",
38 "editor",
39]
41URI_TYPE_DICT = {
42 "http://purl.org/spar/doco/Abstract": "abstract",
43 "http://purl.org/spar/fabio/ArchivalDocument": "archival document",
44 "http://purl.org/spar/fabio/AudioDocument": "audio document",
45 "http://purl.org/spar/fabio/Book": "book",
46 "http://purl.org/spar/fabio/BookChapter": "book chapter",
47 "http://purl.org/spar/fabio/ExpressionCollection": "book section",
48 "http://purl.org/spar/fabio/BookSeries": "book series",
49 "http://purl.org/spar/fabio/BookSet": "book set",
50 "http://purl.org/spar/fabio/ComputerProgram": "computer program",
51 "http://purl.org/spar/doco/Part": "book part",
52 "http://purl.org/spar/fabio/Expression": "",
53 "http://purl.org/spar/fabio/DataFile": "dataset",
54 "http://purl.org/spar/fabio/DataManagementPlan": "data management plan",
55 "http://purl.org/spar/fabio/Thesis": "dissertation",
56 "http://purl.org/spar/fabio/Editorial": "editorial",
57 "http://purl.org/spar/fabio/Journal": "journal",
58 "http://purl.org/spar/fabio/JournalArticle": "journal article",
59 "http://purl.org/spar/fabio/JournalEditorial": "journal editorial",
60 "http://purl.org/spar/fabio/JournalIssue": "journal issue",
61 "http://purl.org/spar/fabio/JournalVolume": "journal volume",
62 "http://purl.org/spar/fabio/Newspaper": "newspaper",
63 "http://purl.org/spar/fabio/NewspaperArticle": "newspaper article",
64 "http://purl.org/spar/fabio/NewspaperIssue": "newspaper issue",
65 "http://purl.org/spar/fr/ReviewVersion": "peer review",
66 "http://purl.org/spar/fabio/AcademicProceedings": "proceedings",
67 "http://purl.org/spar/fabio/Preprint": "preprint",
68 "http://purl.org/spar/fabio/Presentation": "presentation",
69 "http://purl.org/spar/fabio/ProceedingsPaper": "proceedings article",
70 "http://purl.org/spar/fabio/ReferenceBook": "reference book",
71 "http://purl.org/spar/fabio/ReferenceEntry": "reference entry",
72 "http://purl.org/spar/fabio/ReportDocument": "report",
73 "http://purl.org/spar/fabio/RetractionNotice": "retraction notice",
74 "http://purl.org/spar/fabio/Series": "series",
75 "http://purl.org/spar/fabio/SpecificationDocument": "standard",
76 "http://purl.org/spar/fabio/WebContent": "web content",
77}
79_worker_redis: Optional[redis.Redis] = None
80_worker_config: Optional[Tuple[str, int, int]] = None
83def _init_worker(
84 redis_host: str, redis_port: int, redis_db: int, input_dir: str, dir_split_number: int, items_per_file: int
85) -> None:
86 global _worker_redis, _worker_config
87 _worker_redis = redis.Redis(host=redis_host, port=redis_port, db=redis_db, decode_responses=True)
88 _worker_config = (input_dir, dir_split_number, items_per_file)
91def _process_file_worker(filepath: str) -> Tuple[str, List[Dict[str, str]]]:
92 assert _worker_redis is not None and _worker_config is not None
93 input_dir, dir_split_number, items_per_file = _worker_config
94 results = []
95 data = load_json_from_file(filepath)
96 for graph in data:
97 for entity in graph.get("@graph", []):
98 entity_types = entity.get("@type", [])
99 if (
100 "http://purl.org/spar/fabio/JournalVolume" in entity_types
101 or "http://purl.org/spar/fabio/JournalIssue" in entity_types
102 ):
103 continue
104 entity_id = entity.get("@id", "")
105 if entity_id:
106 omid = f"omid:br/{entity_id.split('/')[-1]}"
107 if _worker_redis.sismember("processed_omids", omid):
108 continue
109 br_data = process_bibliographic_resource(entity, input_dir, dir_split_number, items_per_file)
110 if br_data:
111 results.append(br_data)
112 return (filepath, results)
115def init_redis_connection(
116 host: str = "localhost", port: int = 6379, db: int = 2
117) -> redis.Redis:
118 client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
119 client.ping()
120 return client
123def is_omid_processed(omid: str, redis_client: redis.Redis) -> bool:
124 return bool(redis_client.sismember("processed_omids", omid))
127def load_processed_omids_to_redis(output_dir: str, redis_client: redis.Redis) -> int:
128 redis_client.delete("processed_omids")
130 if not os.path.exists(output_dir):
131 return 0
133 csv.field_size_limit(2**31 - 1)
135 count = 0
136 BATCH_SIZE = 1000
137 csv_files = [f for f in os.listdir(output_dir) if f.endswith(".csv")]
139 with create_progress() as progress:
140 task = progress.add_task("Loading existing identifiers", total=len(csv_files))
142 for filename in csv_files:
143 filepath = os.path.join(output_dir, filename)
144 with open(filepath, "r", encoding="utf-8") as f:
145 reader = csv.DictReader(f)
146 batch_pipe = redis_client.pipeline()
147 batch_count = 0
149 for row in reader:
150 omids = [
151 id_part.strip()
152 for id_part in row["id"].split()
153 if id_part.startswith("omid:br/")
154 ]
155 for omid in omids:
156 batch_pipe.sadd("processed_omids", omid)
157 batch_count += 1
158 count += 1
160 if batch_count >= BATCH_SIZE:
161 batch_pipe.execute()
162 batch_pipe = redis_client.pipeline()
163 batch_count = 0
165 if batch_count > 0:
166 batch_pipe.execute()
168 progress.update(task, advance=1)
170 return count
173def load_checkpoint(checkpoint_file: str) -> set:
174 if not os.path.exists(checkpoint_file):
175 return set()
176 with open(checkpoint_file, "r") as f:
177 return set(line.strip() for line in f if line.strip())
180def mark_file_processed(checkpoint_file: str, filepath: str) -> None:
181 with open(checkpoint_file, "a") as f:
182 f.write(filepath + "\n")
185def find_file(
186 rdf_dir: str, dir_split_number: int, items_per_file: int, uri: str
187) -> Optional[str]:
188 entity_regex: str = (
189 r"^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$"
190 )
191 entity_match = re.match(entity_regex, uri)
192 if entity_match:
193 cur_number = int(entity_match.group(4))
194 cur_file_split = (
195 (cur_number - 1) // items_per_file
196 ) * items_per_file + items_per_file
197 cur_split = (
198 (cur_number - 1) // dir_split_number
199 ) * dir_split_number + dir_split_number
201 short_name = entity_match.group(2)
202 sub_folder = entity_match.group(3) or ""
203 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split))
204 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + ".zip"
206 return cur_file_path if os.path.exists(cur_file_path) else None
207 return None
210@lru_cache(maxsize=2000)
211def load_json_from_file(filepath: str) -> list:
212 with ZipFile(filepath, "r") as zip_file:
213 json_filename = zip_file.namelist()[0]
214 with zip_file.open(json_filename) as json_file:
215 return orjson.loads(json_file.read())
218def process_identifier(id_data: dict) -> Optional[str]:
219 try:
220 id_schema = id_data["http://purl.org/spar/datacite/usesIdentifierScheme"][0][
221 "@id"
222 ].split("/datacite/")[1]
223 literal_value = id_data[
224 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue"
225 ][0]["@value"]
226 return f"{id_schema}:{literal_value}"
227 except (KeyError, IndexError):
228 return None
231def process_responsible_agent(
232 ra_data: dict, ra_uri: str, rdf_dir: str, dir_split_number: int, items_per_file: int
233) -> Optional[str]:
234 try:
235 family_name = ra_data.get("http://xmlns.com/foaf/0.1/familyName", [{}])[0].get(
236 "@value", ""
237 )
238 given_name = ra_data.get("http://xmlns.com/foaf/0.1/givenName", [{}])[0].get(
239 "@value", ""
240 )
241 foaf_name = ra_data.get("http://xmlns.com/foaf/0.1/name", [{}])[0].get(
242 "@value", ""
243 )
245 if family_name or given_name:
246 if family_name and given_name:
247 name = f"{family_name}, {given_name}"
248 elif family_name:
249 name = f"{family_name},"
250 else:
251 name = f", {given_name}"
252 elif foaf_name:
253 name = foaf_name
254 else:
255 return None
257 omid = ra_uri.split("/")[-1]
258 identifiers = [f"omid:ra/{omid}"]
260 if "http://purl.org/spar/datacite/hasIdentifier" in ra_data:
261 for identifier in ra_data["http://purl.org/spar/datacite/hasIdentifier"]:
262 id_uri = identifier["@id"]
263 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri)
264 if id_file:
265 id_data = load_json_from_file(id_file)
266 for graph in id_data:
267 for entity in graph.get("@graph", []):
268 if entity["@id"] == id_uri:
269 id_value = process_identifier(entity)
270 if id_value:
271 identifiers.append(id_value)
273 if identifiers:
274 return f"{name} [{' '.join(identifiers)}]"
275 return name
276 except (KeyError, IndexError):
277 return None
280def process_venue_title(
281 venue_data: dict,
282 venue_uri: str,
283 rdf_dir: str,
284 dir_split_number: int,
285 items_per_file: int,
286) -> str:
287 venue_title = venue_data.get("http://purl.org/dc/terms/title", [{}])[0].get(
288 "@value", ""
289 )
290 if not venue_title:
291 return ""
293 omid = venue_uri.split("/")[-1]
294 identifiers = [f"omid:br/{omid}"]
296 if "http://purl.org/spar/datacite/hasIdentifier" in venue_data:
297 for identifier in venue_data["http://purl.org/spar/datacite/hasIdentifier"]:
298 id_uri = identifier["@id"]
299 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri)
300 if id_file:
301 id_data = load_json_from_file(id_file)
302 for graph in id_data:
303 for entity in graph.get("@graph", []):
304 if entity["@id"] == id_uri:
305 id_value = process_identifier(entity)
306 if id_value:
307 identifiers.append(id_value)
309 return f"{venue_title} [{' '.join(identifiers)}]" if identifiers else venue_title
312def process_hierarchical_venue(
313 entity: dict,
314 rdf_dir: str,
315 dir_split_number: int,
316 items_per_file: int,
317 visited: Optional[set] = None,
318 depth: int = 0,
319) -> Dict[str, str]:
320 result = {"volume": "", "issue": "", "venue": ""}
322 if visited is None:
323 visited = set()
325 entity_id = entity.get("@id", "")
326 if entity_id in visited or depth > 5:
327 print(f"Warning: Cycle detected in venue hierarchy at: {entity_id}")
328 return result
329 visited.add(entity_id)
331 entity_types = entity.get("@type", [])
333 if "http://purl.org/spar/fabio/JournalIssue" in entity_types:
334 result["issue"] = entity.get(
335 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}]
336 )[0].get("@value", "")
337 elif "http://purl.org/spar/fabio/JournalVolume" in entity_types:
338 result["volume"] = entity.get(
339 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}]
340 )[0].get("@value", "")
341 else:
342 result["venue"] = process_venue_title(
343 entity, entity["@id"], rdf_dir, dir_split_number, items_per_file
344 )
345 return result
347 if "http://purl.org/vocab/frbr/core#partOf" in entity:
348 parent_uri = entity["http://purl.org/vocab/frbr/core#partOf"][0]["@id"]
349 parent_file = find_file(rdf_dir, dir_split_number, items_per_file, parent_uri)
350 if parent_file:
351 parent_data = load_json_from_file(parent_file)
352 for graph in parent_data:
353 for parent_entity in graph.get("@graph", []):
354 if parent_entity["@id"] == parent_uri:
355 parent_info = process_hierarchical_venue(
356 parent_entity,
357 rdf_dir,
358 dir_split_number,
359 items_per_file,
360 visited,
361 depth + 1,
362 )
363 for key, value in parent_info.items():
364 if not result[key]:
365 result[key] = value
367 return result
370def find_first_ar_by_role(
371 agent_roles: Dict, next_relations: Dict, role_type: str
372) -> Optional[str]:
373 role_ars = {
374 ar_uri: ar_data
375 for ar_uri, ar_data in agent_roles.items()
376 if role_type
377 in ar_data.get("http://purl.org/spar/pro/withRole", [{}])[0].get("@id", "")
378 }
380 role_next_relations = {
381 ar_uri: next_ar
382 for ar_uri, next_ar in next_relations.items()
383 if ar_uri in role_ars and next_ar in role_ars
384 }
386 referenced_ars = set(role_next_relations.values())
387 for ar_uri in role_ars:
388 if ar_uri not in referenced_ars:
389 return ar_uri
391 return next(iter(role_ars)) if role_ars else None
394def process_bibliographic_resource(
395 br_data: dict, rdf_dir: str, dir_split_number: int, items_per_file: int
396) -> Optional[Dict[str, str]]:
397 br_types = br_data.get("@type", [])
398 if (
399 "http://purl.org/spar/fabio/JournalVolume" in br_types
400 or "http://purl.org/spar/fabio/JournalIssue" in br_types
401 ):
402 return None
404 output = {field: "" for field in FIELDNAMES}
406 try:
407 entity_id = br_data.get("@id", "")
408 identifiers = [f'omid:br/{entity_id.split("/")[-1]}'] if entity_id else []
410 output["title"] = br_data.get("http://purl.org/dc/terms/title", [{}])[0].get(
411 "@value", ""
412 )
413 output["pub_date"] = br_data.get(
414 "http://prismstandard.org/namespaces/basic/2.0/publicationDate", [{}]
415 )[0].get("@value", "")
417 br_types = [
418 t
419 for t in br_data.get("@type", [])
420 if t != "http://purl.org/spar/fabio/Expression"
421 ]
422 output["type"] = URI_TYPE_DICT.get(br_types[0], "") if br_types else ""
424 if "http://purl.org/spar/datacite/hasIdentifier" in br_data:
425 for identifier in br_data["http://purl.org/spar/datacite/hasIdentifier"]:
426 id_uri = identifier["@id"]
427 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri)
428 if id_file:
429 id_data = load_json_from_file(id_file)
430 for graph in id_data:
431 for entity in graph.get("@graph", []):
432 if entity["@id"] == id_uri:
433 id_value = process_identifier(entity)
434 if id_value:
435 identifiers.append(id_value)
436 output["id"] = " ".join(identifiers)
438 authors = []
439 editors = []
440 publishers = []
441 agent_roles = {}
442 next_relations = {}
444 if "http://purl.org/spar/pro/isDocumentContextFor" in br_data:
445 for ar_data in br_data["http://purl.org/spar/pro/isDocumentContextFor"]:
446 ar_uri = ar_data["@id"]
447 ar_file = find_file(rdf_dir, dir_split_number, items_per_file, ar_uri)
448 if ar_file:
449 ar_data = load_json_from_file(ar_file)
450 for graph in ar_data:
451 for entity in graph.get("@graph", []):
452 if entity["@id"] == ar_uri:
453 agent_roles[ar_uri] = entity
454 if "https://w3id.org/oc/ontology/hasNext" in entity:
455 next_ar = entity[
456 "https://w3id.org/oc/ontology/hasNext"
457 ][0]["@id"]
458 next_relations[ar_uri] = next_ar
460 for role_type, role_list in [
461 ("author", authors),
462 ("editor", editors),
463 ("publisher", publishers),
464 ]:
465 first_ar = find_first_ar_by_role(agent_roles, next_relations, role_type)
466 if not first_ar:
467 continue
469 current_ar = first_ar
470 processed_ars = set()
471 max_iterations = len(agent_roles)
472 iterations = 0
474 while current_ar and current_ar in agent_roles:
475 if current_ar in processed_ars or iterations >= max_iterations:
476 print(
477 f"Warning: Detected cycle in hasNext relations or exceeded maximum iterations at AR: {current_ar}"
478 )
479 break
481 processed_ars.add(current_ar)
482 iterations += 1
484 entity = agent_roles[current_ar]
485 role = entity.get("http://purl.org/spar/pro/withRole", [{}])[0].get(
486 "@id", ""
487 )
489 if role_type in role:
490 if "http://purl.org/spar/pro/isHeldBy" in entity:
491 ra_uri = entity["http://purl.org/spar/pro/isHeldBy"][0][
492 "@id"
493 ]
494 ra_file = find_file(
495 rdf_dir, dir_split_number, items_per_file, ra_uri
496 )
497 if ra_file:
498 ra_data = load_json_from_file(ra_file)
499 for ra_graph in ra_data:
500 for ra_entity in ra_graph.get("@graph", []):
501 if ra_entity["@id"] == ra_uri:
502 agent_name = process_responsible_agent(
503 ra_entity,
504 ra_uri,
505 rdf_dir,
506 dir_split_number,
507 items_per_file,
508 )
509 if agent_name:
510 role_list.append(agent_name)
512 current_ar = next_relations.get(current_ar)
514 output["author"] = "; ".join(authors)
515 output["editor"] = "; ".join(editors)
516 output["publisher"] = "; ".join(publishers)
518 if "http://purl.org/vocab/frbr/core#partOf" in br_data:
519 venue_uri = br_data["http://purl.org/vocab/frbr/core#partOf"][0]["@id"]
520 venue_file = find_file(rdf_dir, dir_split_number, items_per_file, venue_uri)
521 if venue_file:
522 venue_data = load_json_from_file(venue_file)
523 for graph in venue_data:
524 for entity in graph.get("@graph", []):
525 if entity["@id"] == venue_uri:
526 venue_info = process_hierarchical_venue(
527 entity, rdf_dir, dir_split_number, items_per_file
528 )
529 output.update(venue_info)
531 if "http://purl.org/vocab/frbr/core#embodiment" in br_data:
532 page_uri = br_data["http://purl.org/vocab/frbr/core#embodiment"][0]["@id"]
533 page_file = find_file(rdf_dir, dir_split_number, items_per_file, page_uri)
534 if page_file:
535 page_data = load_json_from_file(page_file)
536 for graph in page_data:
537 for entity in graph.get("@graph", []):
538 if entity["@id"] == page_uri:
539 start_page = entity.get(
540 "http://prismstandard.org/namespaces/basic/2.0/startingPage",
541 [{}],
542 )[0].get("@value", "")
543 end_page = entity.get(
544 "http://prismstandard.org/namespaces/basic/2.0/endingPage",
545 [{}],
546 )[0].get("@value", "")
547 if start_page or end_page:
548 output["page"] = f"{start_page}-{end_page}"
550 except Exception as e:
551 print(f"Error processing bibliographic resource: {type(e).__name__}: {e}")
553 return output
556class ResultBuffer:
557 def __init__(self, output_dir: str, max_rows: int = 3000):
558 self.buffer = []
559 self.output_dir = output_dir
560 self.max_rows = max_rows
561 self.file_counter = self._get_last_file_number() + 1
563 def _get_last_file_number(self) -> int:
564 if not os.path.exists(self.output_dir):
565 return -1
567 max_number = -1
568 for filename in os.listdir(self.output_dir):
569 if filename.startswith("output_") and filename.endswith(".csv"):
570 try:
571 number = int(filename[7:-4])
572 max_number = max(max_number, number)
573 except ValueError:
574 continue
575 return max_number
577 def add_results(self, results: List[Dict[str, str]]) -> None:
578 self.buffer.extend(results)
579 while len(self.buffer) >= self.max_rows:
580 self._write_buffer_chunk()
582 def _write_buffer_chunk(self) -> None:
583 chunk = self.buffer[: self.max_rows]
584 output_file = os.path.join(self.output_dir, f"output_{self.file_counter}.csv")
585 write_csv(output_file, chunk)
586 self.buffer = self.buffer[self.max_rows :]
587 self.file_counter += 1
589 def flush(self) -> None:
590 if self.buffer:
591 output_file = os.path.join(
592 self.output_dir, f"output_{self.file_counter}.csv"
593 )
594 write_csv(output_file, self.buffer)
595 self.buffer = []
596 self.file_counter += 1
599def generate_csv(
600 input_dir: str,
601 output_dir: str,
602 dir_split_number: int,
603 items_per_file: int,
604 redis_host: str = "localhost",
605 redis_port: int = 6379,
606 redis_db: int = 2,
607 workers: int = 4,
608) -> None:
609 if not os.path.exists(output_dir):
610 os.makedirs(output_dir)
612 checkpoint_file = os.path.join(output_dir, "processed_br_files.txt")
613 processed_br_files = load_checkpoint(checkpoint_file)
615 redis_client = init_redis_connection(redis_host, redis_port, redis_db)
616 load_processed_omids_to_redis(output_dir, redis_client)
618 br_dir = os.path.join(input_dir, "br")
619 if not os.path.exists(br_dir):
620 print(f"Error: directory not found at {br_dir}")
621 return
623 all_files = collect_zip_files(br_dir, only_data=True)
624 files_to_process = [f for f in all_files if f not in processed_br_files]
626 if not files_to_process:
627 print("All files already processed")
628 return
630 print(f"Skipping {len(processed_br_files)} already processed files")
631 print(f"Processing {len(files_to_process)} remaining files with {workers} workers...")
633 result_buffer = ResultBuffer(output_dir)
635 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment
636 ctx = multiprocessing.get_context('forkserver')
637 with ctx.Pool(
638 workers,
639 _init_worker,
640 (redis_host, redis_port, redis_db, input_dir, dir_split_number, items_per_file),
641 ) as pool:
642 with create_progress() as progress:
643 task = progress.add_task("Processing files", total=len(files_to_process))
645 for filepath, results in pool.imap_unordered(_process_file_worker, files_to_process):
646 if results:
647 result_buffer.add_results(results)
648 mark_file_processed(checkpoint_file, filepath)
649 progress.update(task, advance=1)
651 result_buffer.flush()
652 print("Processing complete.")
655def write_csv(filepath: str, data: List[Dict[str, str]]) -> None:
656 with open(filepath, "w", newline="", encoding="utf-8") as f:
657 writer = csv.DictWriter(f, fieldnames=FIELDNAMES)
658 writer.writeheader()
659 writer.writerows(data)
662if __name__ == '__main__':
663 parser = ArgumentParser('generate_csv.py',
664 description='Generate CSV files from OpenCitations Meta RDF dump')
665 parser.add_argument('-c', '--config', required=True,
666 help='OpenCitations Meta configuration file location')
667 parser.add_argument('-o', '--output_dir', required=True,
668 help='Directory where CSV files will be stored')
669 parser.add_argument('--redis-host', default='localhost',
670 help='Redis host (default: localhost)')
671 parser.add_argument('--redis-port', type=int, default=6379,
672 help='Redis port (default: 6379)')
673 parser.add_argument('--redis-db', type=int, default=2,
674 help='Redis database number (default: 2)')
675 parser.add_argument('--workers', type=int, default=4,
676 help='Number of parallel workers (default: 4)')
677 parser.add_argument('--clean', action='store_true',
678 help='Clear checkpoint file and Redis cache before starting')
679 args = parser.parse_args()
681 with open(args.config, encoding='utf-8') as f:
682 settings = yaml.full_load(f)
684 rdf_dir = os.path.join(settings['output_rdf_dir'], 'rdf')
685 dir_split_number = settings['dir_split_number']
686 items_per_file = settings['items_per_file']
688 if args.clean:
689 checkpoint_file = 'processed_br_files.txt'
690 if os.path.exists(checkpoint_file):
691 os.remove(checkpoint_file)
692 print(f"Removed checkpoint file: {checkpoint_file}")
693 redis_client = redis.Redis(
694 host=args.redis_host, port=args.redis_port, db=args.redis_db
695 )
696 deleted = redis_client.delete('processed_omids')
697 if deleted:
698 print("Cleared Redis processed_omids cache")
700 generate_csv(
701 input_dir=rdf_dir,
702 output_dir=args.output_dir,
703 dir_split_number=dir_split_number,
704 items_per_file=items_per_file,
705 redis_host=args.redis_host,
706 redis_port=args.redis_port,
707 redis_db=args.redis_db,
708 workers=args.workers,
709 )