Coverage for oc_ds_converter / ra_processor.py: 86%

184 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-12 21:23 +0000

1# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it> 

3# 

4# SPDX-License-Identifier: ISC 

5 

6import os 

7import re 

8import unicodedata 

9from csv import DictReader 

10from typing import List, Tuple 

11from zipfile import ZipFile 

12 

13from oc_ds_converter.oc_idmanager import ISBNManager, ISSNManager, ORCIDManager 

14 

15from oc_ds_converter.datasource.orcid_index import OrcidIndexInterface 

16from oc_ds_converter.lib.cleaner import Cleaner 

17from oc_ds_converter.lib.csvmanager import CSVManager 

18from oc_ds_converter.lib.master_of_regex import orcid_pattern 

19 

20 

21def families_match(a: str, b: str) -> bool: 

22 tokens_a = {t for t in re.split(r"\s+", (a or "").strip().lower()) if t} 

23 tokens_b = {t for t in re.split(r"\s+", (b or "").strip().lower()) if t} 

24 if not tokens_a or not tokens_b: 

25 return False 

26 return tokens_a <= tokens_b or tokens_b <= tokens_a 

27 

28 

29class RaProcessor(object): 

30 def __init__( 

31 self, 

32 orcid_index: str | OrcidIndexInterface | CSVManager | None = None, 

33 publishers_filepath: str | None = None, 

34 citing_entities: str | None = None, 

35 ): 

36 self.publishers_mapping = self.load_publishers_mapping(publishers_filepath) if publishers_filepath else None 

37 if orcid_index is None: 

38 self.orcid_index: OrcidIndexInterface = CSVManager(None) 

39 elif isinstance(orcid_index, str): 

40 self.orcid_index = CSVManager(orcid_index) 

41 else: 

42 self.orcid_index = orcid_index 

43 if citing_entities: 

44 self.unzip_citing_entities(citing_entities) 

45 self.citing_entities_set = CSVManager.load_csv_column_as_set(citing_entities, 'id') if citing_entities else None 

46 

47 def get_agents_strings_list(self, doi: str, agents_list: List[dict]) -> Tuple[list, list]: 

48 authors_strings_list = list() 

49 editors_string_list = list() 

50 dict_orcid = None 

51 if not all('orcid' in agent or 'ORCID' in agent for agent in agents_list): 

52 dict_orcid = self.orcid_finder(doi) 

53 agents_list = [ 

54 {k: Cleaner(v).remove_unwanted_characters() if k in {'family', 'given', 'name'} and v is not None 

55 else v for k, v in agent_dict.items()} for agent_dict in agents_list] 

56 

57 for agent in agents_list: 

58 cur_role = agent['role'] 

59 f_name = None 

60 g_name = None 

61 agent_string = None 

62 if agent.get('family') and agent.get('given'): 

63 f_name = agent['family'] 

64 g_name = agent['given'] 

65 agent_string = f_name + ', ' + g_name 

66 elif agent.get('name'): 

67 agent_string = agent['name'] 

68 f_name = agent_string.split(",")[0].strip() if "," in agent_string else None 

69 g_name = agent_string.split(",")[-1].strip() if "," in agent_string else None 

70 

71 if f_name and g_name: 

72 agent_string = f_name + ', ' + g_name 

73 if agent_string is None: 

74 if agent.get('family') and not agent.get('given'): 

75 if g_name: 

76 agent_string = agent['family'] + ', ' + g_name 

77 else: 

78 agent_string = agent['family'] + ', ' 

79 elif agent.get('given') and not agent.get('family'): 

80 if f_name: 

81 agent_string = f_name + ', ' + agent['given'] 

82 else: 

83 agent_string = ', ' + agent['given'] 

84 orcid = None 

85 if 'orcid' in agent: 

86 if isinstance(agent['orcid'], list): 

87 orcid = str(agent['orcid'][0]) 

88 else: 

89 orcid = str(agent['orcid']) 

90 elif 'ORCID' in agent: 

91 if isinstance(agent['ORCID'], list): 

92 orcid = str(agent['ORCID'][0]) 

93 else: 

94 orcid = str(agent['ORCID']) 

95 if orcid: 

96 orcid_manager = ORCIDManager(use_api_service=False) 

97 orcid = orcid_manager.normalise(orcid, include_prefix=False) 

98 orcid = orcid if orcid_manager.check_digit(orcid) else None 

99 elif dict_orcid and f_name: 

100 for ori in dict_orcid: 

101 orc_n: List[str] = dict_orcid[ori].split(', ') 

102 orc_f = orc_n[0].lower() 

103 orc_g = orc_n[1] if len(orc_n) == 2 else None 

104 if families_match(f_name, orc_f): 

105 if g_name and orc_g: 

106 # If there are several authors with the same surname 

107 if len([person for person in agents_list if 'family' in person if person['family'] if 

108 families_match(person['family'], orc_f)]) > 1: 

109 # If there are several authors with the same surname and the same given names' initials 

110 if len([person for person in agents_list if 'given' in person if person['given'] if 

111 person['given'][0].lower() == orc_g[0].lower()]) > 1: 

112 homonyms_list = [person for person in agents_list if 'given' in person if 

113 person['given'] if person['given'].lower() == orc_g.lower()] 

114 # If there are homonyms 

115 if len(homonyms_list) > 1: 

116 # If such homonyms have different roles from the current role 

117 if [person for person in homonyms_list if person['role'] != cur_role]: 

118 if orc_g.lower() == g_name.lower(): 

119 orcid = ori 

120 else: 

121 if orc_g.lower() == g_name.lower(): 

122 orcid = ori 

123 elif orc_g[0].lower() == g_name[0].lower(): 

124 orcid = ori 

125 # If there is a person whose given name is equal to the family name of the current person (a common situation for cjk names) 

126 elif any([person for person in agents_list if 'given' in person if person['given'] if 

127 person['given'].lower() == f_name.lower()]): 

128 if orc_g.lower() == g_name.lower(): 

129 orcid = ori 

130 else: 

131 orcid = ori 

132 else: 

133 orcid = ori 

134 if agent_string and orcid: 

135 agent_string += ' [' + 'orcid:' + str(orcid) + ']' 

136 if agent_string: 

137 if agent['role'] == 'author': 

138 authors_strings_list.append(agent_string) 

139 elif agent['role'] == 'editor': 

140 editors_string_list.append(agent_string) 

141 return authors_strings_list, editors_string_list 

142 

143 def orcid_finder(self, doi: str) -> dict[str, str]: 

144 found: dict[str, str] = {} 

145 doi = doi.lower() 

146 people = self.orcid_index.get_value(doi) 

147 if people: 

148 for person in people: 

149 match = re.search(orcid_pattern, person) 

150 if match: 

151 orcid = match.group(0) 

152 name: str = person[:person.find(orcid)-1] 

153 found[orcid] = name.strip().lower() 

154 return found 

155 

156 def unzip_citing_entities(self, citing_entities): 

157 for dirpath, _, filenames in os.walk(citing_entities): 

158 for filename in filenames: 

159 if filename.endswith('.zip'): 

160 with ZipFile(os.path.join(citing_entities, filename), mode='r') as zipf: 

161 zipf.extractall(citing_entities) 

162 os.remove(os.path.join(citing_entities, filename)) 

163 

164 def get_pages(self, pages_list:list) -> str: 

165 ''' 

166 This function returns the pages interval.  

167 

168 :params pages_list: a list of pages 

169 :type item: dict 

170 :returns: str -- The output is a string in the format 'START-END', for example, '583-584'. If there are no pages, the output is an empty string. 

171 ''' 

172 roman_letters = {'I', 'V', 'X', 'L', 'C', 'D', 'M'} 

173 clean_pages_list = list() 

174 for page in pages_list: 

175 # e.g. 583-584 or 1_583-1_584 

176 if all(c.isdigit() or c == "_" for c in page): 

177 clean_pages_list.append(page) 

178 # e.g. G27. It is a born digital document. PeerJ uses this approach, where G27 identifies the whole document, since it has no pages. 

179 elif len(pages_list) == 1: 

180 clean_pages_list.append(page) 

181 # e.g. iv-vii. This syntax is used in the prefaces. 

182 elif all(c.upper() in roman_letters for c in page): 

183 clean_pages_list.append(page) 

184 # 583b-584. It is an error. The b must be removed. 

185 elif any(c.isdigit() for c in page): 

186 page_without_letters = ''.join([c for c in page if c.isdigit() or c == '_']) 

187 clean_pages_list.append(page_without_letters) 

188 if clean_pages_list: 

189 if len(clean_pages_list) == 1: 

190 clean_pages_list.append(clean_pages_list[0]) 

191 return '-'.join(clean_pages_list) 

192 return '' 

193 

194 @staticmethod 

195 def normalise_unicode(metadata: dict) -> dict: 

196 return {k:unicodedata.normalize('NFKC', v) for k, v in metadata.items()} 

197 

198 @staticmethod 

199 def id_worker(field, ids:list, func) -> None: 

200 if isinstance(field, list): 

201 for i in field: 

202 func(str(i), ids) 

203 else: 

204 id = str(field) 

205 func(id, ids) 

206 

207 @staticmethod 

208 def load_publishers_mapping(publishers_filepath: str) -> dict[str, dict[str, str | set[str]]]: 

209 publishers_mapping: dict[str, dict[str, str | set[str]]] = {} 

210 with open(publishers_filepath, 'r', encoding='utf-8') as f: 

211 data = DictReader(f) 

212 for row in data: 

213 pub_id = row['id'] 

214 if pub_id not in publishers_mapping: 

215 publishers_mapping[pub_id] = {'name': row['name'], 'prefixes': set()} 

216 else: 

217 publishers_mapping[pub_id]['name'] = row['name'] 

218 prefixes = publishers_mapping[pub_id]['prefixes'] 

219 if isinstance(prefixes, set): 

220 prefixes.add(row['prefix']) 

221 return publishers_mapping 

222 

223 @staticmethod 

224 def issn_worker(issnid: str, ids: list) -> None: 

225 issn_manager = ISSNManager() 

226 norm_issnid = issn_manager.normalise(issnid, include_prefix=False) 

227 if norm_issnid and issn_manager.check_digit(norm_issnid) and f'issn:{norm_issnid}' not in ids: 

228 ids.append('issn:' + norm_issnid) 

229 

230 @staticmethod 

231 def isbn_worker(isbnid: str, ids: list) -> None: 

232 isbn_manager = ISBNManager() 

233 norm_isbnid = isbn_manager.normalise(isbnid, include_prefix=False) 

234 if norm_isbnid and isbn_manager.check_digit(norm_isbnid) and f'isbn:{norm_isbnid}' not in ids: 

235 ids.append('isbn:' + norm_isbnid) 

236 

237 @staticmethod 

238 def uppercase_initials(inp_str: str): 

239 upper_word_list = [] 

240 words_list = inp_str.split() 

241 for w in words_list: 

242 upper_word_list.append(w[0].upper() + w[1:]) if len(w)>1 else upper_word_list.append(w[0].upper()) 

243 upper_str = " ".join(upper_word_list) 

244 return upper_str