Coverage for oc_ds_converter / crossref / extract_crossref_publishers.py: 100%

83 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5 

6import html 

7import os 

8from csv import QUOTE_NONNUMERIC, DictReader, DictWriter 

9from json import loads 

10from os.path import exists 

11from time import sleep, time 

12 

13from requests import get 

14 

15from oc_ds_converter.lib.console import create_progress 

16 

17MAX_TRY = 5 

18SLEEPING_TIME = 5 

19csv_headers = ( 

20 "id", "name", "prefix" 

21) 

22headers = { 

23 "User-Agent": 

24 "OpenCitations " 

25 "(http://opencitations.net; mailto:contact@opencitations.net)" 

26} 

27 

28 

29def get_via_requests(get_url: str) -> dict | None: 

30 for _ in range(MAX_TRY): 

31 try: 

32 r = get(get_url, headers=headers, timeout=10) 

33 if r.status_code == 200: 

34 r.encoding = "utf-8" 

35 return loads(r.text) 

36 elif r.status_code == 404: 

37 return None 

38 else: 

39 sleep(SLEEPING_TIME) 

40 except Exception: 

41 sleep(SLEEPING_TIME) 

42 raise ConnectionError(f"Failed to fetch {get_url} after {MAX_TRY} attempts") 

43 

44 

45def is_stale(filepath: str, max_age_days: int) -> bool: 

46 if not exists(filepath): 

47 return True 

48 mtime = os.path.getmtime(filepath) 

49 age_seconds = time() - mtime 

50 age_days = age_seconds / (60 * 60 * 24) 

51 return age_days > max_age_days 

52 

53 

54def get_publishers(offset: int) -> tuple[list, int, int] | None: 

55 get_url = "https://api.crossref.org/members?rows=1000&offset=" + str(offset) 

56 req = get_via_requests(get_url) 

57 

58 if req is not None: 

59 r_json = req.get("message") 

60 if r_json is not None: 

61 offset += 1000 

62 total_results = int(r_json.get("total-results")) 

63 items = r_json.get("items") 

64 return items, offset, total_results 

65 return None 

66 

67 

68def process(out_path: str, max_age_days: int = 30, force: bool = False) -> bool: 

69 if not force and not is_stale(out_path, max_age_days): 

70 return False 

71 

72 pub_ids = set() 

73 

74 if exists(out_path): 

75 with open(out_path, encoding="utf8") as f: 

76 csv_reader = DictReader(f, csv_headers) 

77 for row in csv_reader: 

78 pub_ids.add(row["id"]) 

79 

80 offset = 0 

81 tot = 10000000000 

82 

83 with create_progress() as progress: 

84 task = progress.add_task("[green]Downloading publishers", total=tot) 

85 

86 while offset < tot: 

87 response = get_publishers(offset) 

88 if response is None: 

89 break 

90 result, offset, tot = response 

91 progress.update(task, total=tot) 

92 

93 if result is not None: 

94 for publisher in result: 

95 progress.update(task, advance=1) 

96 cur_id = str(publisher["id"]) 

97 if cur_id not in pub_ids: 

98 pub_ids.add(cur_id) 

99 cur_name = html.unescape(publisher["primary-name"]) 

100 prefixes = set() 

101 for prefix in publisher["prefix"]: 

102 prefix_value = prefix["value"] 

103 if prefix_value not in prefixes: 

104 prefixes.add(prefix_value) 

105 store_csv_on_file(out_path, csv_headers, { 

106 "id": cur_id, "name": cur_name, "prefix": prefix_value}) 

107 return True 

108 

109 

110def store_csv_on_file(f_path: str, header: tuple, json_obj: dict) -> None: 

111 f_exists = exists(f_path) 

112 with open(f_path, "a", encoding="utf8", newline='') as f: 

113 dw = DictWriter(f=f, fieldnames=header, delimiter=',', quotechar='"', quoting=QUOTE_NONNUMERIC) 

114 if not f_exists: 

115 dw.writeheader() 

116 dw.writerow(json_obj)