Coverage for oc_ds_converter / ra_processor.py: 86%

178 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it> 

3# 

4# SPDX-License-Identifier: ISC 

5 

6import os 

7import re 

8import unicodedata 

9from csv import DictReader 

10from typing import List, Tuple 

11from zipfile import ZipFile 

12 

13from oc_ds_converter.oc_idmanager import ISBNManager, ISSNManager, ORCIDManager 

14 

15from oc_ds_converter.datasource.orcid_index import OrcidIndexInterface 

16from oc_ds_converter.lib.cleaner import Cleaner 

17from oc_ds_converter.lib.csvmanager import CSVManager 

18from oc_ds_converter.lib.master_of_regex import orcid_pattern 

19 

20 

21class RaProcessor(object): 

22 def __init__( 

23 self, 

24 orcid_index: str | OrcidIndexInterface | None = None, 

25 publishers_filepath: str | None = None, 

26 citing_entities: str | None = None, 

27 ): 

28 self.publishers_mapping = self.load_publishers_mapping(publishers_filepath) if publishers_filepath else None 

29 if orcid_index is None: 

30 self.orcid_index: OrcidIndexInterface = CSVManager(None) 

31 elif isinstance(orcid_index, str): 

32 self.orcid_index = CSVManager(orcid_index) 

33 else: 

34 self.orcid_index = orcid_index 

35 if citing_entities: 

36 self.unzip_citing_entities(citing_entities) 

37 self.citing_entities_set = CSVManager.load_csv_column_as_set(citing_entities, 'id') if citing_entities else None 

38 

39 def get_agents_strings_list(self, doi: str, agents_list: List[dict]) -> Tuple[list, list]: 

40 authors_strings_list = list() 

41 editors_string_list = list() 

42 dict_orcid = None 

43 if not all('orcid' in agent or 'ORCID' in agent for agent in agents_list): 

44 dict_orcid = self.orcid_finder(doi) 

45 agents_list = [ 

46 {k: Cleaner(v).remove_unwanted_characters() if k in {'family', 'given', 'name'} and v is not None 

47 else v for k, v in agent_dict.items()} for agent_dict in agents_list] 

48 

49 for agent in agents_list: 

50 cur_role = agent['role'] 

51 f_name = None 

52 g_name = None 

53 agent_string = None 

54 if agent.get('family') and agent.get('given'): 

55 f_name = agent['family'] 

56 g_name = agent['given'] 

57 agent_string = f_name + ', ' + g_name 

58 elif agent.get('name'): 

59 agent_string = agent['name'] 

60 f_name = agent_string.split(",")[0].strip() if "," in agent_string else None 

61 g_name = agent_string.split(",")[-1].strip() if "," in agent_string else None 

62 

63 if f_name and g_name: 

64 agent_string = f_name + ', ' + g_name 

65 if agent_string is None: 

66 if agent.get('family') and not agent.get('given'): 

67 if g_name: 

68 agent_string = agent['family'] + ', ' + g_name 

69 else: 

70 agent_string = agent['family'] + ', ' 

71 elif agent.get('given') and not agent.get('family'): 

72 if f_name: 

73 agent_string = f_name + ', ' + agent['given'] 

74 else: 

75 agent_string = ', ' + agent['given'] 

76 orcid = None 

77 if 'orcid' in agent: 

78 if isinstance(agent['orcid'], list): 

79 orcid = str(agent['orcid'][0]) 

80 else: 

81 orcid = str(agent['orcid']) 

82 elif 'ORCID' in agent: 

83 if isinstance(agent['ORCID'], list): 

84 orcid = str(agent['ORCID'][0]) 

85 else: 

86 orcid = str(agent['ORCID']) 

87 if orcid: 

88 orcid_manager = ORCIDManager(use_api_service=False) 

89 orcid = orcid_manager.normalise(orcid, include_prefix=False) 

90 orcid = orcid if orcid_manager.check_digit(orcid) else None 

91 elif dict_orcid and f_name: 

92 for ori in dict_orcid: 

93 orc_n: List[str] = dict_orcid[ori].split(', ') 

94 orc_f = orc_n[0].lower() 

95 orc_g = orc_n[1] if len(orc_n) == 2 else None 

96 if f_name.lower() in orc_f.lower() or orc_f.lower() in f_name.lower(): 

97 if g_name and orc_g: 

98 # If there are several authors with the same surname 

99 if len([person for person in agents_list if 'family' in person if person['family'] if 

100 person['family'].lower() in orc_f.lower() or orc_f.lower() in person[ 

101 'family'].lower()]) > 1: 

102 # If there are several authors with the same surname and the same given names' initials 

103 if len([person for person in agents_list if 'given' in person if person['given'] if 

104 person['given'][0].lower() == orc_g[0].lower()]) > 1: 

105 homonyms_list = [person for person in agents_list if 'given' in person if 

106 person['given'] if person['given'].lower() == orc_g.lower()] 

107 # If there are homonyms 

108 if len(homonyms_list) > 1: 

109 # If such homonyms have different roles from the current role 

110 if [person for person in homonyms_list if person['role'] != cur_role]: 

111 if orc_g.lower() == g_name.lower(): 

112 orcid = ori 

113 else: 

114 if orc_g.lower() == g_name.lower(): 

115 orcid = ori 

116 elif orc_g[0].lower() == g_name[0].lower(): 

117 orcid = ori 

118 # If there is a person whose given name is equal to the family name of the current person (a common situation for cjk names) 

119 elif any([person for person in agents_list if 'given' in person if person['given'] if 

120 person['given'].lower() == f_name.lower()]): 

121 if orc_g.lower() == g_name.lower(): 

122 orcid = ori 

123 else: 

124 orcid = ori 

125 else: 

126 orcid = ori 

127 if agent_string and orcid: 

128 agent_string += ' [' + 'orcid:' + str(orcid) + ']' 

129 if agent_string: 

130 if agent['role'] == 'author': 

131 authors_strings_list.append(agent_string) 

132 elif agent['role'] == 'editor': 

133 editors_string_list.append(agent_string) 

134 return authors_strings_list, editors_string_list 

135 

136 def orcid_finder(self, doi: str) -> dict[str, str]: 

137 found: dict[str, str] = {} 

138 doi = doi.lower() 

139 people = self.orcid_index.get_value(doi) 

140 if people: 

141 for person in people: 

142 match = re.search(orcid_pattern, person) 

143 if match: 

144 orcid = match.group(0) 

145 name: str = person[:person.find(orcid)-1] 

146 found[orcid] = name.strip().lower() 

147 return found 

148 

149 def unzip_citing_entities(self, citing_entities): 

150 for dirpath, _, filenames in os.walk(citing_entities): 

151 for filename in filenames: 

152 if filename.endswith('.zip'): 

153 with ZipFile(os.path.join(citing_entities, filename), mode='r') as zipf: 

154 zipf.extractall(citing_entities) 

155 os.remove(os.path.join(citing_entities, filename)) 

156 

157 def get_pages(self, pages_list:list) -> str: 

158 ''' 

159 This function returns the pages interval.  

160 

161 :params pages_list: a list of pages 

162 :type item: dict 

163 :returns: str -- The output is a string in the format 'START-END', for example, '583-584'. If there are no pages, the output is an empty string. 

164 ''' 

165 roman_letters = {'I', 'V', 'X', 'L', 'C', 'D', 'M'} 

166 clean_pages_list = list() 

167 for page in pages_list: 

168 # e.g. 583-584 or 1_583-1_584 

169 if all(c.isdigit() or c == "_" for c in page): 

170 clean_pages_list.append(page) 

171 # e.g. G27. It is a born digital document. PeerJ uses this approach, where G27 identifies the whole document, since it has no pages. 

172 elif len(pages_list) == 1: 

173 clean_pages_list.append(page) 

174 # e.g. iv-vii. This syntax is used in the prefaces. 

175 elif all(c.upper() in roman_letters for c in page): 

176 clean_pages_list.append(page) 

177 # 583b-584. It is an error. The b must be removed. 

178 elif any(c.isdigit() for c in page): 

179 page_without_letters = ''.join([c for c in page if c.isdigit() or c == '_']) 

180 clean_pages_list.append(page_without_letters) 

181 if clean_pages_list: 

182 if len(clean_pages_list) == 1: 

183 clean_pages_list.append(clean_pages_list[0]) 

184 return '-'.join(clean_pages_list) 

185 return '' 

186 

187 @staticmethod 

188 def normalise_unicode(metadata: dict) -> dict: 

189 return {k:unicodedata.normalize('NFKC', v) for k, v in metadata.items()} 

190 

191 @staticmethod 

192 def id_worker(field, ids:list, func) -> None: 

193 if isinstance(field, list): 

194 for i in field: 

195 func(str(i), ids) 

196 else: 

197 id = str(field) 

198 func(id, ids) 

199 

200 @staticmethod 

201 def load_publishers_mapping(publishers_filepath: str) -> dict[str, dict[str, str | set[str]]]: 

202 publishers_mapping: dict[str, dict[str, str | set[str]]] = {} 

203 with open(publishers_filepath, 'r', encoding='utf-8') as f: 

204 data = DictReader(f) 

205 for row in data: 

206 pub_id = row['id'] 

207 if pub_id not in publishers_mapping: 

208 publishers_mapping[pub_id] = {'name': row['name'], 'prefixes': set()} 

209 else: 

210 publishers_mapping[pub_id]['name'] = row['name'] 

211 prefixes = publishers_mapping[pub_id]['prefixes'] 

212 if isinstance(prefixes, set): 

213 prefixes.add(row['prefix']) 

214 return publishers_mapping 

215 

216 @staticmethod 

217 def issn_worker(issnid: str, ids: list) -> None: 

218 issn_manager = ISSNManager() 

219 norm_issnid = issn_manager.normalise(issnid, include_prefix=False) 

220 if norm_issnid and issn_manager.check_digit(norm_issnid) and f'issn:{norm_issnid}' not in ids: 

221 ids.append('issn:' + norm_issnid) 

222 

223 @staticmethod 

224 def isbn_worker(isbnid: str, ids: list) -> None: 

225 isbn_manager = ISBNManager() 

226 norm_isbnid = isbn_manager.normalise(isbnid, include_prefix=False) 

227 if norm_isbnid and isbn_manager.check_digit(norm_isbnid) and f'isbn:{norm_isbnid}' not in ids: 

228 ids.append('isbn:' + norm_isbnid) 

229 

230 @staticmethod 

231 def uppercase_initials(inp_str: str): 

232 upper_word_list = [] 

233 words_list = inp_str.split() 

234 for w in words_list: 

235 upper_word_list.append(w[0].upper() + w[1:]) if len(w)>1 else upper_word_list.append(w[0].upper()) 

236 upper_str = " ".join(upper_word_list) 

237 return upper_str