Coverage for oc_ds_converter / crossref / extract_crossref_publishers.py: 100%
83 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
6import html
7import os
8from csv import QUOTE_NONNUMERIC, DictReader, DictWriter
9from json import loads
10from os.path import exists
11from time import sleep, time
13from requests import get
15from oc_ds_converter.lib.console import create_progress
17MAX_TRY = 5
18SLEEPING_TIME = 5
19csv_headers = (
20 "id", "name", "prefix"
21)
22headers = {
23 "User-Agent":
24 "OpenCitations "
25 "(http://opencitations.net; mailto:contact@opencitations.net)"
26}
29def get_via_requests(get_url: str) -> dict | None:
30 for _ in range(MAX_TRY):
31 try:
32 r = get(get_url, headers=headers, timeout=10)
33 if r.status_code == 200:
34 r.encoding = "utf-8"
35 return loads(r.text)
36 elif r.status_code == 404:
37 return None
38 else:
39 sleep(SLEEPING_TIME)
40 except Exception:
41 sleep(SLEEPING_TIME)
42 raise ConnectionError(f"Failed to fetch {get_url} after {MAX_TRY} attempts")
45def is_stale(filepath: str, max_age_days: int) -> bool:
46 if not exists(filepath):
47 return True
48 mtime = os.path.getmtime(filepath)
49 age_seconds = time() - mtime
50 age_days = age_seconds / (60 * 60 * 24)
51 return age_days > max_age_days
54def get_publishers(offset: int) -> tuple[list, int, int] | None:
55 get_url = "https://api.crossref.org/members?rows=1000&offset=" + str(offset)
56 req = get_via_requests(get_url)
58 if req is not None:
59 r_json = req.get("message")
60 if r_json is not None:
61 offset += 1000
62 total_results = int(r_json.get("total-results"))
63 items = r_json.get("items")
64 return items, offset, total_results
65 return None
68def process(out_path: str, max_age_days: int = 30, force: bool = False) -> bool:
69 if not force and not is_stale(out_path, max_age_days):
70 return False
72 pub_ids = set()
74 if exists(out_path):
75 with open(out_path, encoding="utf8") as f:
76 csv_reader = DictReader(f, csv_headers)
77 for row in csv_reader:
78 pub_ids.add(row["id"])
80 offset = 0
81 tot = 10000000000
83 with create_progress() as progress:
84 task = progress.add_task("[green]Downloading publishers", total=tot)
86 while offset < tot:
87 response = get_publishers(offset)
88 if response is None:
89 break
90 result, offset, tot = response
91 progress.update(task, total=tot)
93 if result is not None:
94 for publisher in result:
95 progress.update(task, advance=1)
96 cur_id = str(publisher["id"])
97 if cur_id not in pub_ids:
98 pub_ids.add(cur_id)
99 cur_name = html.unescape(publisher["primary-name"])
100 prefixes = set()
101 for prefix in publisher["prefix"]:
102 prefix_value = prefix["value"]
103 if prefix_value not in prefixes:
104 prefixes.add(prefix_value)
105 store_csv_on_file(out_path, csv_headers, {
106 "id": cur_id, "name": cur_name, "prefix": prefix_value})
107 return True
110def store_csv_on_file(f_path: str, header: tuple, json_obj: dict) -> None:
111 f_exists = exists(f_path)
112 with open(f_path, "a", encoding="utf8", newline='') as f:
113 dw = DictWriter(f=f, fieldnames=header, delimiter=',', quotechar='"', quoting=QUOTE_NONNUMERIC)
114 if not f_exists:
115 dw.writeheader()
116 dw.writerow(json_obj)