Coverage for oc_meta/plugins/multiprocess/resp_agents_curator.py: 71%

277 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-12-20 08:55 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2022 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17from __future__ import annotations 

18 

19import os 

20import re 

21from typing import List 

22 

23from oc_meta.core.curator import Curator 

24from oc_meta.lib.cleaner import Cleaner 

25from oc_meta.lib.file_manager import * 

26from oc_meta.lib.finder import * 

27from oc_meta.lib.master_of_regex import * 

28 

29from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler 

30 

31 

32class RespAgentsCurator(Curator): 

33 def __init__(self, data:List[dict], ts:str, prov_config:str, counter_handler:RedisCounterHandler, base_iri:str='https://w3id.org/oc/meta', prefix:str='060', separator:str=None, settings:dict|None=None, meta_config_path: str = None, timer=None): 

34 self.timer = timer 

35 self.everything_everywhere_allatonce = Graph() 

36 self.finder = ResourceFinder(ts, base_iri, self.everything_everywhere_allatonce, settings=settings, meta_config_path=meta_config_path) 

37 self.prov_config = prov_config 

38 self.separator = separator 

39 self.data = [{field:value.strip() for field,value in row.items()} for row in data] 

40 self.prefix = prefix 

41 self.counter_handler = counter_handler 

42 self.radict:Dict[str, Dict[str, list]] = {} 

43 self.idra = {} # key id; value metaid of id related to ra 

44 self.conflict_ra = {} 

45 self.rameta = dict() 

46 self.wnb_cnt = 0 # wannabe counter 

47 self.rowcnt = 0 

48 self.log = dict() 

49 self.preexisting_entities = set() 

50 

51 def curator(self, filename: str = None, path_csv: str = None): 

52 # Phase 1: Collect identifiers and SPARQL prefetch 

53 with self._timed("curation__collect_identifiers"): 

54 metavals, identifiers, vvis = self.collect_identifiers(valid_dois_cache=dict()) 

55 self.finder.get_everything_about_res(metavals=metavals, identifiers=identifiers, vvis=vvis) 

56 

57 # Phase 5: Clean RA (author + publisher + editor aggregated) 

58 with self._timed("curation__clean_ra"): 

59 for row in self.data: 

60 self.log[self.rowcnt] = { 

61 'id': {}, 

62 'author': {}, 

63 'venue': {}, 

64 'editor': {}, 

65 'publisher': {}, 

66 'page': {}, 

67 'volume': {}, 

68 'issue': {}, 

69 'pub_date': {}, 

70 'type': {} 

71 } 

72 self.clean_ra(row, 'author') 

73 self.clean_ra(row, 'publisher') 

74 self.clean_ra(row, 'editor') 

75 self.rowcnt += 1 

76 self.radict.update(self.conflict_ra) 

77 

78 # Phase 6: Finalize (meta_maker + enrich + indexer) 

79 with self._timed("curation__finalize"): 

80 self.meta_maker() 

81 self.log = self.log_update() 

82 self.enrich() 

83 self.filename = filename 

84 self.indexer(path_csv=path_csv) 

85 

86 def collect_identifiers(self, valid_dois_cache): 

87 """ 

88 Override parent method to maintain compatibility - RespAgentsCurator doesn't need venue_ids. 

89 """ 

90 all_metavals = set() 

91 all_idslist = set() 

92 all_vvis = set() 

93 for row in self.data: 

94 metavals, idslist, vvis = self.extract_identifiers_and_metavals( 

95 row, valid_dois_cache=valid_dois_cache 

96 ) 

97 all_metavals.update(metavals) 

98 all_idslist.update(idslist) 

99 all_vvis.update(vvis) 

100 return all_metavals, all_idslist, all_vvis 

101 

102 def clean_ra(self, row, col_name): 

103 ''' 

104 This method performs the deduplication process for responsible agents (authors, publishers and editors). 

105 

106 :params row: a dictionary representing a CSV row 

107 :type row: Dict[str, str] 

108 :params col_name: the CSV column name. It can be 'author', 'publisher', or 'editor' 

109 :type col_name: str 

110 :returns: None -- This method modifies self.radict and self.idra, and returns None. 

111 ''' 

112 if row[col_name]: 

113 if col_name in {'author', 'editor'}: 

114 ra_list = re.split(semicolon_in_people_field, row[col_name]) 

115 elif col_name == 'publisher': 

116 ra_list = [row[col_name]] 

117 ra_metaids = list() 

118 ra_list = Cleaner.clean_ra_list(ra_list) 

119 for ra in ra_list: 

120 ra_id = re.search(name_and_ids, ra) 

121 name = Cleaner(ra_id.group(1)).clean_name() 

122 ra_id = ra_id.group(2) 

123 if self.separator: 

124 ra_id_list = re.sub(colon_and_spaces, ':', ra_id).split(self.separator) 

125 else: 

126 ra_id_list = re.split(one_or_more_spaces, re.sub(colon_and_spaces, ':', ra_id)) 

127 ra_id_list, metaval = self.clean_id_list(ra_id_list) 

128 if col_name == 'publisher': 

129 metaval = self.id_worker('publisher', name, ra_id_list, metaval, publ_entity=True) 

130 else: 

131 metaval = self.id_worker(col_name, name, ra_id_list, metaval, publ_entity=False) 

132 if col_name != 'publisher' and metaval in self.radict: 

133 full_name:str = self.radict[metaval]['title'] 

134 if ',' in name and ',' in full_name: 

135 first_name = name.split(',')[1].strip() 

136 if not full_name.split(',')[1].strip() and first_name: # first name found! 

137 given_name = full_name.split(',')[0] 

138 self.radict[metaval]['title'] = given_name + ', ' + first_name 

139 ra_metaids.append(f'{name} [omid:ra/{metaval}]') 

140 row[col_name] = '; '.join(ra_metaids) 

141 

142 def meta_maker(self): 

143 ''' 

144 The MetaID dictionary 'rameta' is created from 'radict'. 

145 ''' 

146 for identifier in self.radict: 

147 if 'wannabe' in identifier: 

148 other = identifier 

149 count = self.counter_handler.increment_counter('ra', supplier_prefix=self.prefix) 

150 meta = self.prefix + str(count) 

151 self.rameta[meta] = self.radict[identifier] 

152 self.rameta[meta]['others'].append(other) 

153 self.rameta[meta]['ids'].append('omid:ra/' + meta) 

154 else: 

155 self.rameta[identifier] = self.radict[identifier] 

156 self.preexisting_entities.add(f'ra/{identifier}') 

157 self.rameta[identifier]['ids'].append('omid:ra/' + identifier) 

158 

159 for _, omid in self.idra.items(): 

160 self.preexisting_entities.add(f'id/{omid}') 

161 

162 def indexer(self, path_csv: str = None) -> None: 

163 """ 

164 Transform idra dict to list-of-dicts format for Creator consumption. 

165 Optionally saves the enriched CSV file. 

166 

167 :params path_csv: Directory path for the enriched CSV output (optional) 

168 :type path_csv: str 

169 """ 

170 # ID 

171 self.index_id_ra = list() 

172 cur_index = self.idra 

173 if cur_index: 

174 for literal in cur_index: 

175 row = dict() 

176 row['id'] = str(literal) 

177 row['meta'] = str(cur_index[literal]) 

178 self.index_id_ra.append(row) 

179 else: 

180 row = dict() 

181 row['id'] = '' 

182 row['meta'] = '' 

183 self.index_id_ra.append(row) 

184 # Save enriched CSV if path provided 

185 if self.filename and path_csv and self.data: 

186 name = self.filename + '.csv' 

187 data_file = os.path.join(path_csv, name) 

188 write_csv(data_file, self.data) 

189 

190 @staticmethod 

191 def clean_id_list(id_list:List[str], br=None, valid_dois_cache=dict()) -> Tuple[list, str]: 

192 ''' 

193 Clean IDs in the input list and check if there is a MetaID. 

194 

195 :params: id_list: a list of IDs 

196 :type: id_list: List[str] 

197 :params: br: True if the IDs in id_list refer to bibliographic resources, False otherwise 

198 :type: br: bool 

199 :returns: Tuple[list, str]: -- it returns a two-elements tuple, where the first element is the list of cleaned IDs, while the second is a MetaID if any was found. 

200 ''' 

201 pattern = 'ra/' 

202 metaid = '' 

203 id_list = list(filter(None, id_list)) 

204 clean_list = list() 

205 for elem in id_list: 

206 if elem in clean_list: 

207 continue 

208 elem = Cleaner(elem).normalize_hyphens() 

209 identifier = elem.split(':', 1) 

210 schema = identifier[0].lower() 

211 value = identifier[1] 

212 if schema == 'omid': 

213 metaid = value.replace(pattern, '') 

214 else: 

215 normalized_id = Cleaner(elem).normalize_id(valid_dois_cache=dict()) 

216 if normalized_id: 

217 clean_list.append(normalized_id) 

218 how_many_meta = [i for i in id_list if i.lower().startswith('omid')] 

219 if len(how_many_meta) > 1: 

220 clean_list = [i for i in clean_list if not i.lower().startswith('omid')] 

221 return clean_list, metaid 

222 

223 def id_worker(self, col_name, name, idslist:List[str], metaval: str, publ_entity:bool): 

224 id_dict = self.idra 

225 entity_dict = self.radict 

226 # there's meta 

227 if metaval: 

228 # MetaID exists among data? 

229 # meta already in entity_dict (no care about conflicts, we have a meta specified) 

230 if metaval in entity_dict: 

231 self.merge_entities_in_csv(idslist, metaval, name, entity_dict, id_dict) 

232 else: 

233 found_meta_ts = None 

234 found_meta_ts = self.finder.retrieve_ra_from_meta(metaval) 

235 # meta in triplestore 

236 # 2 Retrieve EntityA data in triplestore to update EntityA inside CSV 

237 if found_meta_ts: 

238 entity_dict[metaval] = dict() 

239 entity_dict[metaval]['ids'] = list() 

240 if col_name == 'author' or col_name == 'editor': 

241 entity_dict[metaval]['title'] = self.name_check(found_meta_ts[0], name) 

242 else: 

243 entity_dict[metaval]['title'] = found_meta_ts[0] 

244 entity_dict[metaval]['others'] = list() 

245 self.merge_entities_in_csv(idslist, metaval, name, entity_dict, id_dict) 

246 existing_ids = found_meta_ts[1] 

247 for identifier in existing_ids: 

248 if identifier[1] not in id_dict: 

249 id_dict[identifier[1]] = identifier[0] 

250 if identifier[1] not in entity_dict[metaval]['ids']: 

251 entity_dict[metaval]['ids'].append(identifier[1]) 

252 # Look for MetaId in the provenance 

253 else: 

254 metaid_uri = f'{self.base_iri}/ra/{str(metaval)}' 

255 # The entity MetaId after merge if it was merged, None otherwise. If None, the MetaId is considered invalid 

256 metaval = self.finder.retrieve_metaid_from_merged_entity(metaid_uri=metaid_uri, prov_config=self.prov_config) 

257 # there's no meta or there was one but it didn't exist 

258 # Are there other IDs? 

259 if idslist and not metaval: 

260 local_match = self.__local_match(idslist, entity_dict) 

261 # IDs already exist among data? 

262 # check in entity_dict 

263 if local_match['existing']: 

264 # ids refer to multiple existing entities 

265 if len(local_match['existing']) > 1: 

266 # ! 

267 return self.conflict(idslist, name, id_dict, col_name) 

268 # ids refer to ONE existing entity 

269 elif len(local_match['existing']) == 1: 

270 metaval = str(local_match['existing'][0]) 

271 suspect_ids = list() 

272 for identifier in idslist: 

273 if identifier not in entity_dict[metaval]['ids']: 

274 suspect_ids.append(identifier) 

275 if suspect_ids: 

276 sparql_match = self.finder_sparql(suspect_ids, br=False, ra=True, vvi=False, publ=publ_entity) 

277 if len(sparql_match) > 1: 

278 # ! 

279 return self.conflict(idslist, name, id_dict, col_name) 

280 # ids refers to 1 or more wannabe entities 

281 elif local_match['wannabe']: 

282 metaval = str(local_match['wannabe'].pop(0)) 

283 # 5 Merge data from entityA (CSV) with data from EntityX (CSV) 

284 for old_meta in local_match['wannabe']: 

285 self.merge(entity_dict, metaval, old_meta, name) 

286 suspect_ids = list() 

287 for identifier in idslist: 

288 if identifier not in entity_dict[metaval]['ids']: 

289 suspect_ids.append(identifier) 

290 if suspect_ids: 

291 sparql_match = self.finder_sparql(suspect_ids, br=False, ra=True, vvi=False, publ=publ_entity) 

292 if sparql_match: 

293 if 'wannabe' not in metaval or len(sparql_match) > 1: 

294 # Two entities previously disconnected on the triplestore now become connected 

295 # ! 

296 return self.conflict(idslist, name, id_dict, col_name) 

297 else: 

298 existing_ids = sparql_match[0][2] 

299 new_idslist = [x[1] for x in existing_ids] 

300 new_sparql_match = self.finder_sparql(new_idslist, br=False, ra=True, vvi=False, publ=publ_entity) 

301 if len(new_sparql_match) > 1: 

302 # Two entities previously disconnected on the triplestore now become connected 

303 # ! 

304 return self.conflict(idslist, name, id_dict, col_name) 

305 else: 

306 # 4 Merge data from EntityA (CSV) with data from EntityX (CSV) (it has already happened in # 5), update both with data from EntityA (RDF) 

307 old_metaval = metaval 

308 metaval = sparql_match[0][0] 

309 entity_dict[metaval] = dict() 

310 entity_dict[metaval]['ids'] = list() 

311 entity_dict[metaval]['others'] = list() 

312 entity_dict[metaval]['title'] = sparql_match[0][1] if sparql_match[0][1] else '' 

313 self.merge(entity_dict, metaval, old_metaval, sparql_match[0][1]) 

314 for identifier in existing_ids: 

315 if identifier[1] not in id_dict: 

316 id_dict[identifier[1]] = identifier[0] 

317 if identifier[1] not in entity_dict[metaval]['ids']: 

318 entity_dict[metaval]['ids'].append(identifier[1]) 

319 else: 

320 sparql_match = self.finder_sparql(idslist, br=False, ra=True, vvi=False, publ=publ_entity) 

321 if len(sparql_match) > 1: 

322 # ! 

323 return self.conflict(idslist, name, id_dict, col_name) 

324 elif len(sparql_match) == 1: 

325 existing_ids = sparql_match[0][2] 

326 new_idslist = [x[1] for x in existing_ids] 

327 new_sparql_match = self.finder_sparql(new_idslist, br=False, ra=True, vvi=False, publ=publ_entity) 

328 if len(new_sparql_match) > 1: 

329 # Two entities previously disconnected on the triplestore now become connected 

330 # ! 

331 return self.conflict(idslist, name, id_dict, col_name) 

332 # 2 Retrieve EntityA data in triplestore to update EntityA inside CSV 

333 # 3 CONFLICT beteen MetaIDs. MetaID specified in EntityA inside CSV has precedence. 

334 elif len(new_sparql_match) == 1: 

335 metaval = sparql_match[0][0] 

336 entity_dict[metaval] = dict() 

337 entity_dict[metaval]['ids'] = list() 

338 entity_dict[metaval]['others'] = list() 

339 if col_name == 'author' or col_name == 'editor': 

340 entity_dict[metaval]['title'] = self.name_check(sparql_match[0][1], name) 

341 else: 

342 entity_dict[metaval]['title'] = sparql_match[0][1] 

343 

344 if not entity_dict[metaval]['title'] and name: 

345 entity_dict[metaval]['title'] = name 

346 

347 for identifier in existing_ids: 

348 if identifier[1] not in id_dict: 

349 id_dict[identifier[1]] = identifier[0] 

350 if identifier[1] not in entity_dict[metaval]['ids']: 

351 entity_dict[metaval]['ids'].append(identifier[1]) 

352 else: 

353 # 1 EntityA is a new one 

354 metaval = self.new_entity(entity_dict, name) 

355 for identifier in idslist: 

356 if identifier not in id_dict: 

357 self.__update_id_count(id_dict, identifier) 

358 if identifier not in entity_dict[metaval]['ids']: 

359 entity_dict[metaval]['ids'].append(identifier) 

360 if not entity_dict[metaval]['title'] and name: 

361 entity_dict[metaval]['title'] = name 

362 # 1 EntityA is a new one 

363 if not idslist and not metaval: 

364 metaval = self.new_entity(entity_dict, name) 

365 return metaval 

366 

367 def enrich(self): 

368 ''' 

369 This method replaces the wannabeID placeholders with the 

370 actual data and MetaIDs as a result of the deduplication process. 

371 ''' 

372 for row in self.data: 

373 for field in {'author', 'editor', 'publisher'}: 

374 if row[field]: 

375 ras_list = list() 

376 if field in {'author', 'editor'}: 

377 ra_list = re.split(semicolon_in_people_field, row[field]) 

378 else: 

379 ra_list = [row[field]] 

380 for ra_entity in ra_list: 

381 metaid = re.search(name_and_ids, ra_entity).group(2).replace('omid:ra/', '') 

382 if 'wannabe' in metaid: 

383 for ra_metaid in self.rameta: 

384 if metaid in self.rameta[ra_metaid]['others']: 

385 metaid = ra_metaid 

386 ra_name = self.rameta[metaid]['title'] 

387 ra_ids = ' '.join(self.rameta[metaid]['ids']) 

388 ra = ra_name + ' [' + ra_ids + ']' 

389 ras_list.append(ra) 

390 row[field] = '; '.join(ras_list) 

391 

392 @staticmethod 

393 def __local_match(list_to_match, dict_to_match:dict): 

394 match_elem = dict() 

395 match_elem['existing'] = list() 

396 match_elem['wannabe'] = list() 

397 for elem in list_to_match: 

398 for k, va in dict_to_match.items(): 

399 if elem in va['ids']: 

400 if 'wannabe' in k: 

401 if k not in match_elem['wannabe']: 

402 match_elem['wannabe'].append(k) # TODO: valutare uso di un set 

403 else: 

404 if k not in match_elem['existing']: 

405 match_elem['existing'].append(k) 

406 return match_elem 

407 

408 def __update_id_count(self, id_dict, identifier): 

409 count = self.counter_handler.increment_counter('id', supplier_prefix=self.prefix) 

410 id_dict[identifier] = self.prefix + str(count)