Coverage for oc_meta / run / meta / generate_csv.py: 36%
400 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2022-2024 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17from __future__ import annotations
19import csv
20import json
21import os
22import re
23from argparse import ArgumentParser
24from functools import lru_cache
25from multiprocessing import Pool
26from typing import Dict, List, Optional, Tuple
27from zipfile import ZipFile
29import redis
30import yaml
31from rich.progress import (BarColumn, MofNCompleteColumn, Progress,
32 SpinnerColumn, TextColumn, TimeElapsedColumn,
33 TimeRemainingColumn)
35csv.field_size_limit(2**31 - 1)
37FIELDNAMES = [
38 "id",
39 "title",
40 "author",
41 "issue",
42 "volume",
43 "venue",
44 "page",
45 "pub_date",
46 "type",
47 "publisher",
48 "editor",
49]
51URI_TYPE_DICT = {
52 "http://purl.org/spar/doco/Abstract": "abstract",
53 "http://purl.org/spar/fabio/ArchivalDocument": "archival document",
54 "http://purl.org/spar/fabio/AudioDocument": "audio document",
55 "http://purl.org/spar/fabio/Book": "book",
56 "http://purl.org/spar/fabio/BookChapter": "book chapter",
57 "http://purl.org/spar/fabio/ExpressionCollection": "book section",
58 "http://purl.org/spar/fabio/BookSeries": "book series",
59 "http://purl.org/spar/fabio/BookSet": "book set",
60 "http://purl.org/spar/fabio/ComputerProgram": "computer program",
61 "http://purl.org/spar/doco/Part": "book part",
62 "http://purl.org/spar/fabio/Expression": "",
63 "http://purl.org/spar/fabio/DataFile": "dataset",
64 "http://purl.org/spar/fabio/DataManagementPlan": "data management plan",
65 "http://purl.org/spar/fabio/Thesis": "dissertation",
66 "http://purl.org/spar/fabio/Editorial": "editorial",
67 "http://purl.org/spar/fabio/Journal": "journal",
68 "http://purl.org/spar/fabio/JournalArticle": "journal article",
69 "http://purl.org/spar/fabio/JournalEditorial": "journal editorial",
70 "http://purl.org/spar/fabio/JournalIssue": "journal issue",
71 "http://purl.org/spar/fabio/JournalVolume": "journal volume",
72 "http://purl.org/spar/fabio/Newspaper": "newspaper",
73 "http://purl.org/spar/fabio/NewspaperArticle": "newspaper article",
74 "http://purl.org/spar/fabio/NewspaperIssue": "newspaper issue",
75 "http://purl.org/spar/fr/ReviewVersion": "peer review",
76 "http://purl.org/spar/fabio/AcademicProceedings": "proceedings",
77 "http://purl.org/spar/fabio/Preprint": "preprint",
78 "http://purl.org/spar/fabio/Presentation": "presentation",
79 "http://purl.org/spar/fabio/ProceedingsPaper": "proceedings article",
80 "http://purl.org/spar/fabio/ReferenceBook": "reference book",
81 "http://purl.org/spar/fabio/ReferenceEntry": "reference entry",
82 "http://purl.org/spar/fabio/ReportDocument": "report",
83 "http://purl.org/spar/fabio/RetractionNotice": "retraction notice",
84 "http://purl.org/spar/fabio/Series": "series",
85 "http://purl.org/spar/fabio/SpecificationDocument": "standard",
86 "http://purl.org/spar/fabio/WebContent": "web content",
87}
89_worker_redis: redis.Redis = None
90_worker_config: Tuple[str, int, int] = None
93def _init_worker(
94 redis_host: str, redis_port: int, redis_db: int, input_dir: str, dir_split_number: int, items_per_file: int
95) -> None:
96 global _worker_redis, _worker_config
97 _worker_redis = redis.Redis(host=redis_host, port=redis_port, db=redis_db, decode_responses=True)
98 _worker_config = (input_dir, dir_split_number, items_per_file)
101def _process_file_worker(filepath: str) -> Tuple[str, List[Dict[str, str]]]:
102 input_dir, dir_split_number, items_per_file = _worker_config
103 results = []
104 data = load_json_from_file(filepath)
105 for graph in data:
106 for entity in graph.get("@graph", []):
107 entity_types = entity.get("@type", [])
108 if (
109 "http://purl.org/spar/fabio/JournalVolume" in entity_types
110 or "http://purl.org/spar/fabio/JournalIssue" in entity_types
111 ):
112 continue
113 entity_id = entity.get("@id", "")
114 if entity_id:
115 omid = f"omid:br/{entity_id.split('/')[-1]}"
116 if _worker_redis.sismember("processed_omids", omid):
117 continue
118 br_data = process_bibliographic_resource(entity, input_dir, dir_split_number, items_per_file)
119 if br_data:
120 results.append(br_data)
121 return (filepath, results)
124def init_redis_connection(
125 host: str = "localhost", port: int = 6379, db: int = 2
126) -> redis.Redis:
127 client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
128 client.ping()
129 return client
132def is_omid_processed(omid: str, redis_client: redis.Redis) -> bool:
133 return redis_client.sismember("processed_omids", omid)
136def load_processed_omids_to_redis(output_dir: str, redis_client: redis.Redis) -> int:
137 existing_count = redis_client.scard("processed_omids")
138 if existing_count > 0:
139 print(f"Redis already has {existing_count} OMIDs, skipping rebuild")
140 return existing_count
142 if not os.path.exists(output_dir):
143 return 0
145 csv.field_size_limit(2**31 - 1)
147 count = 0
148 BATCH_SIZE = 1000
149 csv_files = [f for f in os.listdir(output_dir) if f.endswith(".csv")]
151 with Progress(
152 SpinnerColumn(),
153 TextColumn("[progress.description]{task.description}"),
154 BarColumn(),
155 MofNCompleteColumn(),
156 TimeElapsedColumn(),
157 TimeRemainingColumn(),
158 ) as progress:
159 task = progress.add_task("Loading existing identifiers", total=len(csv_files))
161 for filename in csv_files:
162 filepath = os.path.join(output_dir, filename)
163 with open(filepath, "r", encoding="utf-8") as f:
164 reader = csv.DictReader(f)
165 batch_pipe = redis_client.pipeline()
166 batch_count = 0
168 for row in reader:
169 omids = [
170 id_part.strip()
171 for id_part in row["id"].split()
172 if id_part.startswith("omid:br/")
173 ]
174 for omid in omids:
175 batch_pipe.sadd("processed_omids", omid)
176 batch_count += 1
177 count += 1
179 if batch_count >= BATCH_SIZE:
180 batch_pipe.execute()
181 batch_pipe = redis_client.pipeline()
182 batch_count = 0
184 if batch_count > 0:
185 batch_pipe.execute()
187 progress.update(task, advance=1)
189 return count
192def load_checkpoint(checkpoint_file: str) -> set:
193 if not os.path.exists(checkpoint_file):
194 return set()
195 with open(checkpoint_file, "r") as f:
196 return set(line.strip() for line in f if line.strip())
199def mark_file_processed(checkpoint_file: str, filepath: str) -> None:
200 with open(checkpoint_file, "a") as f:
201 f.write(filepath + "\n")
204def find_file(
205 rdf_dir: str, dir_split_number: int, items_per_file: int, uri: str
206) -> Optional[str]:
207 entity_regex: str = (
208 r"^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$"
209 )
210 entity_match = re.match(entity_regex, uri)
211 if entity_match:
212 cur_number = int(entity_match.group(4))
213 cur_file_split = (
214 (cur_number - 1) // items_per_file
215 ) * items_per_file + items_per_file
216 cur_split = (
217 (cur_number - 1) // dir_split_number
218 ) * dir_split_number + dir_split_number
220 short_name = entity_match.group(2)
221 sub_folder = entity_match.group(3) or ""
222 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split))
223 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + ".zip"
225 return cur_file_path if os.path.exists(cur_file_path) else None
226 return None
229@lru_cache(maxsize=2000)
230def load_json_from_file(filepath: str) -> list:
231 with ZipFile(filepath, "r") as zip_file:
232 json_filename = zip_file.namelist()[0]
233 with zip_file.open(json_filename) as json_file:
234 json_content = json_file.read().decode("utf-8")
235 return json.loads(json_content)
238def process_identifier(id_data: dict) -> Optional[str]:
239 try:
240 id_schema = id_data["http://purl.org/spar/datacite/usesIdentifierScheme"][0][
241 "@id"
242 ].split("/datacite/")[1]
243 literal_value = id_data[
244 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue"
245 ][0]["@value"]
246 return f"{id_schema}:{literal_value}"
247 except (KeyError, IndexError):
248 return None
251def process_responsible_agent(
252 ra_data: dict, ra_uri: str, rdf_dir: str, dir_split_number: int, items_per_file: int
253) -> Optional[str]:
254 try:
255 family_name = ra_data.get("http://xmlns.com/foaf/0.1/familyName", [{}])[0].get(
256 "@value", ""
257 )
258 given_name = ra_data.get("http://xmlns.com/foaf/0.1/givenName", [{}])[0].get(
259 "@value", ""
260 )
261 foaf_name = ra_data.get("http://xmlns.com/foaf/0.1/name", [{}])[0].get(
262 "@value", ""
263 )
265 if family_name or given_name:
266 if family_name and given_name:
267 name = f"{family_name}, {given_name}"
268 elif family_name:
269 name = f"{family_name},"
270 else:
271 name = f", {given_name}"
272 elif foaf_name:
273 name = foaf_name
274 else:
275 return None
277 omid = ra_uri.split("/")[-1]
278 identifiers = [f"omid:ra/{omid}"]
280 if "http://purl.org/spar/datacite/hasIdentifier" in ra_data:
281 for identifier in ra_data["http://purl.org/spar/datacite/hasIdentifier"]:
282 id_uri = identifier["@id"]
283 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri)
284 if id_file:
285 id_data = load_json_from_file(id_file)
286 for graph in id_data:
287 for entity in graph.get("@graph", []):
288 if entity["@id"] == id_uri:
289 id_value = process_identifier(entity)
290 if id_value:
291 identifiers.append(id_value)
293 if identifiers:
294 return f"{name} [{' '.join(identifiers)}]"
295 return name
296 except (KeyError, IndexError):
297 return None
300def process_venue_title(
301 venue_data: dict,
302 venue_uri: str,
303 rdf_dir: str,
304 dir_split_number: int,
305 items_per_file: int,
306) -> str:
307 venue_title = venue_data.get("http://purl.org/dc/terms/title", [{}])[0].get(
308 "@value", ""
309 )
310 if not venue_title:
311 return ""
313 omid = venue_uri.split("/")[-1]
314 identifiers = [f"omid:br/{omid}"]
316 if "http://purl.org/spar/datacite/hasIdentifier" in venue_data:
317 for identifier in venue_data["http://purl.org/spar/datacite/hasIdentifier"]:
318 id_uri = identifier["@id"]
319 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri)
320 if id_file:
321 id_data = load_json_from_file(id_file)
322 for graph in id_data:
323 for entity in graph.get("@graph", []):
324 if entity["@id"] == id_uri:
325 id_value = process_identifier(entity)
326 if id_value:
327 identifiers.append(id_value)
329 return f"{venue_title} [{' '.join(identifiers)}]" if identifiers else venue_title
332def process_hierarchical_venue(
333 entity: dict,
334 rdf_dir: str,
335 dir_split_number: int,
336 items_per_file: int,
337 visited: Optional[set] = None,
338 depth: int = 0,
339) -> Dict[str, str]:
340 result = {"volume": "", "issue": "", "venue": ""}
342 if visited is None:
343 visited = set()
345 entity_id = entity.get("@id", "")
346 if entity_id in visited or depth > 5:
347 print(f"Warning: Cycle detected in venue hierarchy at: {entity_id}")
348 return result
349 visited.add(entity_id)
351 entity_types = entity.get("@type", [])
353 if "http://purl.org/spar/fabio/JournalIssue" in entity_types:
354 result["issue"] = entity.get(
355 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}]
356 )[0].get("@value", "")
357 elif "http://purl.org/spar/fabio/JournalVolume" in entity_types:
358 result["volume"] = entity.get(
359 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}]
360 )[0].get("@value", "")
361 else:
362 result["venue"] = process_venue_title(
363 entity, entity["@id"], rdf_dir, dir_split_number, items_per_file
364 )
365 return result
367 if "http://purl.org/vocab/frbr/core#partOf" in entity:
368 parent_uri = entity["http://purl.org/vocab/frbr/core#partOf"][0]["@id"]
369 parent_file = find_file(rdf_dir, dir_split_number, items_per_file, parent_uri)
370 if parent_file:
371 parent_data = load_json_from_file(parent_file)
372 for graph in parent_data:
373 for parent_entity in graph.get("@graph", []):
374 if parent_entity["@id"] == parent_uri:
375 parent_info = process_hierarchical_venue(
376 parent_entity,
377 rdf_dir,
378 dir_split_number,
379 items_per_file,
380 visited,
381 depth + 1,
382 )
383 for key, value in parent_info.items():
384 if not result[key]:
385 result[key] = value
387 return result
390def find_first_ar_by_role(
391 agent_roles: Dict, next_relations: Dict, role_type: str
392) -> Optional[str]:
393 role_ars = {
394 ar_uri: ar_data
395 for ar_uri, ar_data in agent_roles.items()
396 if role_type
397 in ar_data.get("http://purl.org/spar/pro/withRole", [{}])[0].get("@id", "")
398 }
400 role_next_relations = {
401 ar_uri: next_ar
402 for ar_uri, next_ar in next_relations.items()
403 if ar_uri in role_ars and next_ar in role_ars
404 }
406 referenced_ars = set(role_next_relations.values())
407 for ar_uri in role_ars:
408 if ar_uri not in referenced_ars:
409 return ar_uri
411 return next(iter(role_ars)) if role_ars else None
414def process_bibliographic_resource(
415 br_data: dict, rdf_dir: str, dir_split_number: int, items_per_file: int
416) -> Optional[Dict[str, str]]:
417 br_types = br_data.get("@type", [])
418 if (
419 "http://purl.org/spar/fabio/JournalVolume" in br_types
420 or "http://purl.org/spar/fabio/JournalIssue" in br_types
421 ):
422 return None
424 output = {field: "" for field in FIELDNAMES}
426 try:
427 entity_id = br_data.get("@id", "")
428 identifiers = [f'omid:br/{entity_id.split("/")[-1]}'] if entity_id else []
430 output["title"] = br_data.get("http://purl.org/dc/terms/title", [{}])[0].get(
431 "@value", ""
432 )
433 output["pub_date"] = br_data.get(
434 "http://prismstandard.org/namespaces/basic/2.0/publicationDate", [{}]
435 )[0].get("@value", "")
437 br_types = [
438 t
439 for t in br_data.get("@type", [])
440 if t != "http://purl.org/spar/fabio/Expression"
441 ]
442 output["type"] = URI_TYPE_DICT.get(br_types[0], "") if br_types else ""
444 if "http://purl.org/spar/datacite/hasIdentifier" in br_data:
445 for identifier in br_data["http://purl.org/spar/datacite/hasIdentifier"]:
446 id_uri = identifier["@id"]
447 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri)
448 if id_file:
449 id_data = load_json_from_file(id_file)
450 for graph in id_data:
451 for entity in graph.get("@graph", []):
452 if entity["@id"] == id_uri:
453 id_value = process_identifier(entity)
454 if id_value:
455 identifiers.append(id_value)
456 output["id"] = " ".join(identifiers)
458 authors = []
459 editors = []
460 publishers = []
461 agent_roles = {}
462 next_relations = {}
464 if "http://purl.org/spar/pro/isDocumentContextFor" in br_data:
465 for ar_data in br_data["http://purl.org/spar/pro/isDocumentContextFor"]:
466 ar_uri = ar_data["@id"]
467 ar_file = find_file(rdf_dir, dir_split_number, items_per_file, ar_uri)
468 if ar_file:
469 ar_data = load_json_from_file(ar_file)
470 for graph in ar_data:
471 for entity in graph.get("@graph", []):
472 if entity["@id"] == ar_uri:
473 agent_roles[ar_uri] = entity
474 if "https://w3id.org/oc/ontology/hasNext" in entity:
475 next_ar = entity[
476 "https://w3id.org/oc/ontology/hasNext"
477 ][0]["@id"]
478 next_relations[ar_uri] = next_ar
480 for role_type, role_list in [
481 ("author", authors),
482 ("editor", editors),
483 ("publisher", publishers),
484 ]:
485 first_ar = find_first_ar_by_role(agent_roles, next_relations, role_type)
486 if not first_ar:
487 continue
489 current_ar = first_ar
490 processed_ars = set()
491 max_iterations = len(agent_roles)
492 iterations = 0
494 while current_ar and current_ar in agent_roles:
495 if current_ar in processed_ars or iterations >= max_iterations:
496 print(
497 f"Warning: Detected cycle in hasNext relations or exceeded maximum iterations at AR: {current_ar}"
498 )
499 break
501 processed_ars.add(current_ar)
502 iterations += 1
504 entity = agent_roles[current_ar]
505 role = entity.get("http://purl.org/spar/pro/withRole", [{}])[0].get(
506 "@id", ""
507 )
509 if role_type in role:
510 if "http://purl.org/spar/pro/isHeldBy" in entity:
511 ra_uri = entity["http://purl.org/spar/pro/isHeldBy"][0][
512 "@id"
513 ]
514 ra_file = find_file(
515 rdf_dir, dir_split_number, items_per_file, ra_uri
516 )
517 if ra_file:
518 ra_data = load_json_from_file(ra_file)
519 for ra_graph in ra_data:
520 for ra_entity in ra_graph.get("@graph", []):
521 if ra_entity["@id"] == ra_uri:
522 agent_name = process_responsible_agent(
523 ra_entity,
524 ra_uri,
525 rdf_dir,
526 dir_split_number,
527 items_per_file,
528 )
529 if agent_name:
530 role_list.append(agent_name)
532 current_ar = next_relations.get(current_ar)
534 output["author"] = "; ".join(authors)
535 output["editor"] = "; ".join(editors)
536 output["publisher"] = "; ".join(publishers)
538 if "http://purl.org/vocab/frbr/core#partOf" in br_data:
539 venue_uri = br_data["http://purl.org/vocab/frbr/core#partOf"][0]["@id"]
540 venue_file = find_file(rdf_dir, dir_split_number, items_per_file, venue_uri)
541 if venue_file:
542 venue_data = load_json_from_file(venue_file)
543 for graph in venue_data:
544 for entity in graph.get("@graph", []):
545 if entity["@id"] == venue_uri:
546 venue_info = process_hierarchical_venue(
547 entity, rdf_dir, dir_split_number, items_per_file
548 )
549 output.update(venue_info)
551 if "http://purl.org/vocab/frbr/core#embodiment" in br_data:
552 page_uri = br_data["http://purl.org/vocab/frbr/core#embodiment"][0]["@id"]
553 page_file = find_file(rdf_dir, dir_split_number, items_per_file, page_uri)
554 if page_file:
555 page_data = load_json_from_file(page_file)
556 for graph in page_data:
557 for entity in graph.get("@graph", []):
558 if entity["@id"] == page_uri:
559 start_page = entity.get(
560 "http://prismstandard.org/namespaces/basic/2.0/startingPage",
561 [{}],
562 )[0].get("@value", "")
563 end_page = entity.get(
564 "http://prismstandard.org/namespaces/basic/2.0/endingPage",
565 [{}],
566 )[0].get("@value", "")
567 if start_page or end_page:
568 output["page"] = f"{start_page}-{end_page}"
570 except Exception as e:
571 print(f"Error processing bibliographic resource: {type(e).__name__}: {e}")
573 return output
576class ResultBuffer:
577 def __init__(self, output_dir: str, max_rows: int = 3000):
578 self.buffer = []
579 self.output_dir = output_dir
580 self.max_rows = max_rows
581 self.file_counter = self._get_last_file_number() + 1
583 def _get_last_file_number(self) -> int:
584 if not os.path.exists(self.output_dir):
585 return -1
587 max_number = -1
588 for filename in os.listdir(self.output_dir):
589 if filename.startswith("output_") and filename.endswith(".csv"):
590 try:
591 number = int(filename[7:-4])
592 max_number = max(max_number, number)
593 except ValueError:
594 continue
595 return max_number
597 def add_results(self, results: List[Dict[str, str]]) -> None:
598 self.buffer.extend(results)
599 while len(self.buffer) >= self.max_rows:
600 self._write_buffer_chunk()
602 def _write_buffer_chunk(self) -> None:
603 chunk = self.buffer[: self.max_rows]
604 output_file = os.path.join(self.output_dir, f"output_{self.file_counter}.csv")
605 write_csv(output_file, chunk)
606 self.buffer = self.buffer[self.max_rows :]
607 self.file_counter += 1
609 def flush(self) -> None:
610 if self.buffer:
611 output_file = os.path.join(
612 self.output_dir, f"output_{self.file_counter}.csv"
613 )
614 write_csv(output_file, self.buffer)
615 self.buffer = []
616 self.file_counter += 1
619def generate_csv(
620 input_dir: str,
621 output_dir: str,
622 dir_split_number: int,
623 items_per_file: int,
624 redis_host: str = "localhost",
625 redis_port: int = 6379,
626 redis_db: int = 2,
627 workers: int = 4,
628) -> None:
629 if not os.path.exists(output_dir):
630 os.makedirs(output_dir)
632 checkpoint_file = os.path.join(output_dir, "processed_br_files.txt")
633 processed_br_files = load_checkpoint(checkpoint_file)
635 redis_client = init_redis_connection(redis_host, redis_port, redis_db)
636 load_processed_omids_to_redis(output_dir, redis_client)
638 br_dir = os.path.join(input_dir, "br")
639 if not os.path.exists(br_dir):
640 print(f"Error: directory not found at {br_dir}")
641 return
643 all_files = []
644 for root, _, files in os.walk(br_dir):
645 if "prov" in root:
646 continue
647 all_files.extend(os.path.join(root, f) for f in files if f.endswith(".zip"))
649 all_files = sorted(all_files)
650 files_to_process = [f for f in all_files if f not in processed_br_files]
652 if not files_to_process:
653 print("All files already processed")
654 return
656 print(f"Skipping {len(processed_br_files)} already processed files")
657 print(f"Processing {len(files_to_process)} remaining files with {workers} workers...")
659 result_buffer = ResultBuffer(output_dir)
661 with Pool(
662 workers,
663 _init_worker,
664 (redis_host, redis_port, redis_db, input_dir, dir_split_number, items_per_file),
665 ) as pool:
666 with Progress(
667 SpinnerColumn(),
668 TextColumn("[progress.description]{task.description}"),
669 BarColumn(),
670 MofNCompleteColumn(),
671 TimeElapsedColumn(),
672 TimeRemainingColumn(),
673 ) as progress:
674 task = progress.add_task("Processing files", total=len(files_to_process))
676 for filepath, results in pool.imap_unordered(_process_file_worker, files_to_process):
677 if results:
678 result_buffer.add_results(results)
679 mark_file_processed(checkpoint_file, filepath)
680 progress.update(task, advance=1)
682 result_buffer.flush()
683 print("Processing complete.")
686def write_csv(filepath: str, data: List[Dict[str, str]]) -> None:
687 with open(filepath, "w", newline="", encoding="utf-8") as f:
688 writer = csv.DictWriter(f, fieldnames=FIELDNAMES)
689 writer.writeheader()
690 writer.writerows(data)
693if __name__ == '__main__':
694 parser = ArgumentParser('generate_csv.py',
695 description='Generate CSV files from OpenCitations Meta RDF dump')
696 parser.add_argument('-c', '--config', required=True,
697 help='OpenCitations Meta configuration file location')
698 parser.add_argument('-o', '--output_dir', required=True,
699 help='Directory where CSV files will be stored')
700 parser.add_argument('--redis-host', default='localhost',
701 help='Redis host (default: localhost)')
702 parser.add_argument('--redis-port', type=int, default=6379,
703 help='Redis port (default: 6379)')
704 parser.add_argument('--redis-db', type=int, default=2,
705 help='Redis database number (default: 2)')
706 parser.add_argument('--workers', type=int, default=4,
707 help='Number of parallel workers (default: 4)')
708 parser.add_argument('--clean', action='store_true',
709 help='Clear checkpoint file and Redis cache before starting')
710 args = parser.parse_args()
712 with open(args.config, encoding='utf-8') as f:
713 settings = yaml.full_load(f)
715 rdf_dir = os.path.join(settings['output_rdf_dir'], 'rdf')
716 dir_split_number = settings['dir_split_number']
717 items_per_file = settings['items_per_file']
719 if args.clean:
720 checkpoint_file = 'processed_br_files.txt'
721 if os.path.exists(checkpoint_file):
722 os.remove(checkpoint_file)
723 print(f"Removed checkpoint file: {checkpoint_file}")
724 redis_client = redis.Redis(
725 host=args.redis_host, port=args.redis_port, db=args.redis_db
726 )
727 deleted = redis_client.delete('processed_omids')
728 if deleted:
729 print("Cleared Redis processed_omids cache")
731 generate_csv(
732 input_dir=rdf_dir,
733 output_dir=args.output_dir,
734 dir_split_number=dir_split_number,
735 items_per_file=items_per_file,
736 redis_host=args.redis_host,
737 redis_port=args.redis_port,
738 redis_db=args.redis_db,
739 workers=args.workers,
740 )