Coverage for oc_meta/plugins/multiprocess/resp_agents_curator.py: 70%

286 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-07-14 14:06 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2022 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17from __future__ import annotations 

18 

19import json 

20import os 

21import re 

22from typing import List 

23 

24import redis 

25from oc_meta.core.curator import Curator 

26from oc_meta.lib.cleaner import Cleaner 

27from oc_meta.lib.file_manager import * 

28from oc_meta.lib.finder import * 

29from oc_meta.lib.master_of_regex import * 

30 

31from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler 

32 

33 

34class RespAgentsCurator(Curator): 

35 def __init__(self, data:List[dict], ts:str, prov_config:str, counter_handler:RedisCounterHandler, base_iri:str='https://w3id.org/oc/meta', prefix:str='060', separator:str=None, settings:dict|None=None, meta_config_path: str = None): 

36 self.everything_everywhere_allatonce = Graph() 

37 self.finder = ResourceFinder(ts, base_iri, self.everything_everywhere_allatonce, settings=settings, meta_config_path=meta_config_path) 

38 self.prov_config = prov_config 

39 self.separator = separator 

40 self.data = [{field:value.strip() for field,value in row.items()} for row in data] 

41 self.prefix = prefix 

42 self.counter_handler = counter_handler 

43 self.radict:Dict[str, Dict[str, list]] = {} 

44 self.idra = {} # key id; value metaid of id related to ra 

45 self.conflict_ra = {} 

46 self.rameta = dict() 

47 self.wnb_cnt = 0 # wannabe counter 

48 self.rowcnt = 0 

49 self.log = dict() 

50 self.preexisting_entities = set() 

51 

52 def curator(self, filename:str=None, path_csv:str=None, path_index:str=None): 

53 metavals, identifiers, vvis = self.collect_identifiers(valid_dois_cache=dict()) 

54 self.finder.get_everything_about_res(metavals=metavals, identifiers=identifiers, vvis=vvis) 

55 for row in self.data: 

56 self.log[self.rowcnt] = { 

57 'id': {}, 

58 'author': {}, 

59 'venue': {}, 

60 'editor': {}, 

61 'publisher': {}, 

62 'page': {}, 

63 'volume': {}, 

64 'issue': {}, 

65 'pub_date': {}, 

66 'type': {} 

67 } 

68 self.clean_ra(row, 'author') 

69 self.clean_ra(row, 'publisher') 

70 self.clean_ra(row, 'editor') 

71 self.rowcnt += 1 

72 self.radict.update(self.conflict_ra) 

73 self.meta_maker() 

74 self.log = self.log_update() 

75 self.enrich() 

76 if path_index: 

77 path_index = os.path.join(path_index, filename) 

78 self.filename = filename 

79 self.indexer(path_index, path_csv) 

80 

81 def collect_identifiers(self, valid_dois_cache): 

82 """ 

83 Override parent method to maintain compatibility - RespAgentsCurator doesn't need venue_ids. 

84 """ 

85 all_metavals = set() 

86 all_idslist = set() 

87 all_vvis = set() 

88 for row in self.data: 

89 metavals, idslist, vvis = self.extract_identifiers_and_metavals( 

90 row, valid_dois_cache=valid_dois_cache 

91 ) 

92 all_metavals.update(metavals) 

93 all_idslist.update(idslist) 

94 all_vvis.update(vvis) 

95 return all_metavals, all_idslist, all_vvis 

96 

97 def clean_ra(self, row, col_name): 

98 ''' 

99 This method performs the deduplication process for responsible agents (authors, publishers and editors). 

100 

101 :params row: a dictionary representing a CSV row 

102 :type row: Dict[str, str] 

103 :params col_name: the CSV column name. It can be 'author', 'publisher', or 'editor' 

104 :type col_name: str 

105 :returns: None -- This method modifies self.radict and self.idra, and returns None. 

106 ''' 

107 if row[col_name]: 

108 if col_name in {'author', 'editor'}: 

109 ra_list = re.split(semicolon_in_people_field, row[col_name]) 

110 elif col_name == 'publisher': 

111 ra_list = [row[col_name]] 

112 ra_metaids = list() 

113 ra_list = Cleaner.clean_ra_list(ra_list) 

114 for ra in ra_list: 

115 ra_id = re.search(name_and_ids, ra) 

116 name = Cleaner(ra_id.group(1)).clean_name() 

117 ra_id = ra_id.group(2) 

118 if self.separator: 

119 ra_id_list = re.sub(colon_and_spaces, ':', ra_id).split(self.separator) 

120 else: 

121 ra_id_list = re.split(one_or_more_spaces, re.sub(colon_and_spaces, ':', ra_id)) 

122 ra_id_list, metaval = self.clean_id_list(ra_id_list) 

123 if col_name == 'publisher': 

124 metaval = self.id_worker('publisher', name, ra_id_list, metaval, publ_entity=True) 

125 else: 

126 metaval = self.id_worker(col_name, name, ra_id_list, metaval, publ_entity=False) 

127 if col_name != 'publisher' and metaval in self.radict: 

128 full_name:str = self.radict[metaval]['title'] 

129 if ',' in name and ',' in full_name: 

130 first_name = name.split(',')[1].strip() 

131 if not full_name.split(',')[1].strip() and first_name: # first name found! 

132 given_name = full_name.split(',')[0] 

133 self.radict[metaval]['title'] = given_name + ', ' + first_name 

134 ra_metaids.append(f'{name} [omid:ra/{metaval}]') 

135 row[col_name] = '; '.join(ra_metaids) 

136 

137 def meta_maker(self): 

138 ''' 

139 The MetaID dictionary 'rameta' is created from 'radict'. 

140 ''' 

141 for identifier in self.radict: 

142 if 'wannabe' in identifier: 

143 other = identifier 

144 count = self.counter_handler.increment_counter('ra', supplier_prefix=self.prefix) 

145 meta = self.prefix + str(count) 

146 self.rameta[meta] = self.radict[identifier] 

147 self.rameta[meta]['others'].append(other) 

148 self.rameta[meta]['ids'].append('omid:ra/' + meta) 

149 else: 

150 self.rameta[identifier] = self.radict[identifier] 

151 self.preexisting_entities.add(f'ra/{identifier}') 

152 self.rameta[identifier]['ids'].append('omid:ra/' + identifier) 

153 

154 for _, omid in self.idra.items(): 

155 self.preexisting_entities.add(f'id/{omid}') 

156 

157 def indexer(self, path_index:str, path_csv:str) -> None: 

158 ''' 

159 This method is used to transform idra in such a way as to be saved as a csv file. 

160 Finally, it generates the enriched CSV and saves it. 

161 

162 :params path_index: a directory path. It will contain the indexes 

163 :type path_index: str 

164 :params path_csv: a file path. It will be the output enriched CSV 

165 :type path_csv: str 

166 ''' 

167 # ID 

168 self.index_id_ra = list() 

169 cur_index = self.idra 

170 if cur_index: 

171 for literal in cur_index: 

172 row = dict() 

173 row['id'] = str(literal) 

174 row['meta'] = str(cur_index[literal]) 

175 self.index_id_ra.append(row) 

176 else: 

177 row = dict() 

178 row['id'] = '' 

179 row['meta'] = '' 

180 self.index_id_ra.append(row) 

181 if self.filename: 

182 if not os.path.exists(path_index): 

183 os.makedirs(path_index) 

184 ra_path = os.path.join(path_index, 'index_id_ra.csv') 

185 write_csv(ra_path, self.index_id_ra) 

186 if self.log: 

187 log_file = os.path.join(path_index + 'log.json') 

188 with open(log_file, 'w') as lf: 

189 json.dump(self.log, lf) 

190 if self.data: 

191 name = self.filename + '.csv' 

192 data_file = os.path.join(path_csv, name) 

193 write_csv(data_file, self.data) 

194 

195 @staticmethod 

196 def clean_id_list(id_list:List[str], br=None, valid_dois_cache=dict()) -> Tuple[list, str]: 

197 ''' 

198 Clean IDs in the input list and check if there is a MetaID. 

199 

200 :params: id_list: a list of IDs 

201 :type: id_list: List[str] 

202 :params: br: True if the IDs in id_list refer to bibliographic resources, False otherwise 

203 :type: br: bool 

204 :returns: Tuple[list, str]: -- it returns a two-elements tuple, where the first element is the list of cleaned IDs, while the second is a MetaID if any was found. 

205 ''' 

206 pattern = 'ra/' 

207 metaid = '' 

208 id_list = list(filter(None, id_list)) 

209 clean_list = list() 

210 for elem in id_list: 

211 if elem in clean_list: 

212 continue 

213 elem = Cleaner(elem).normalize_hyphens() 

214 identifier = elem.split(':', 1) 

215 schema = identifier[0].lower() 

216 value = identifier[1] 

217 if schema == 'omid': 

218 metaid = value.replace(pattern, '') 

219 else: 

220 normalized_id = Cleaner(elem).normalize_id(valid_dois_cache=dict()) 

221 if normalized_id: 

222 clean_list.append(normalized_id) 

223 how_many_meta = [i for i in id_list if i.lower().startswith('omid')] 

224 if len(how_many_meta) > 1: 

225 clean_list = [i for i in clean_list if not i.lower().startswith('omid')] 

226 return clean_list, metaid 

227 

228 def id_worker(self, col_name, name, idslist:List[str], metaval: str, publ_entity:bool): 

229 id_dict = self.idra 

230 entity_dict = self.radict 

231 # there's meta 

232 if metaval: 

233 # MetaID exists among data? 

234 # meta already in entity_dict (no care about conflicts, we have a meta specified) 

235 if metaval in entity_dict: 

236 self.merge_entities_in_csv(idslist, metaval, name, entity_dict, id_dict) 

237 else: 

238 found_meta_ts = None 

239 found_meta_ts = self.finder.retrieve_ra_from_meta(metaval) 

240 # meta in triplestore 

241 # 2 Retrieve EntityA data in triplestore to update EntityA inside CSV 

242 if found_meta_ts: 

243 entity_dict[metaval] = dict() 

244 entity_dict[metaval]['ids'] = list() 

245 if col_name == 'author' or col_name == 'editor': 

246 entity_dict[metaval]['title'] = self.name_check(found_meta_ts[0], name) 

247 else: 

248 entity_dict[metaval]['title'] = found_meta_ts[0] 

249 entity_dict[metaval]['others'] = list() 

250 self.merge_entities_in_csv(idslist, metaval, name, entity_dict, id_dict) 

251 existing_ids = found_meta_ts[1] 

252 for identifier in existing_ids: 

253 if identifier[1] not in id_dict: 

254 id_dict[identifier[1]] = identifier[0] 

255 if identifier[1] not in entity_dict[metaval]['ids']: 

256 entity_dict[metaval]['ids'].append(identifier[1]) 

257 # Look for MetaId in the provenance 

258 else: 

259 metaid_uri = f'{self.base_iri}/ra/{str(metaval)}' 

260 # The entity MetaId after merge if it was merged, None otherwise. If None, the MetaId is considered invalid 

261 metaval = self.finder.retrieve_metaid_from_merged_entity(metaid_uri=metaid_uri, prov_config=self.prov_config) 

262 # there's no meta or there was one but it didn't exist 

263 # Are there other IDs? 

264 if idslist and not metaval: 

265 local_match = self.__local_match(idslist, entity_dict) 

266 # IDs already exist among data? 

267 # check in entity_dict 

268 if local_match['existing']: 

269 # ids refer to multiple existing entities 

270 if len(local_match['existing']) > 1: 

271 # ! 

272 return self.conflict(idslist, name, id_dict, col_name) 

273 # ids refer to ONE existing entity 

274 elif len(local_match['existing']) == 1: 

275 metaval = str(local_match['existing'][0]) 

276 suspect_ids = list() 

277 for identifier in idslist: 

278 if identifier not in entity_dict[metaval]['ids']: 

279 suspect_ids.append(identifier) 

280 if suspect_ids: 

281 sparql_match = self.finder_sparql(suspect_ids, br=False, ra=True, vvi=False, publ=publ_entity) 

282 if len(sparql_match) > 1: 

283 # ! 

284 return self.conflict(idslist, name, id_dict, col_name) 

285 # ids refers to 1 or more wannabe entities 

286 elif local_match['wannabe']: 

287 metaval = str(local_match['wannabe'].pop(0)) 

288 # 5 Merge data from entityA (CSV) with data from EntityX (CSV) 

289 for old_meta in local_match['wannabe']: 

290 self.merge(entity_dict, metaval, old_meta, name) 

291 suspect_ids = list() 

292 for identifier in idslist: 

293 if identifier not in entity_dict[metaval]['ids']: 

294 suspect_ids.append(identifier) 

295 if suspect_ids: 

296 sparql_match = self.finder_sparql(suspect_ids, br=False, ra=True, vvi=False, publ=publ_entity) 

297 if sparql_match: 

298 if 'wannabe' not in metaval or len(sparql_match) > 1: 

299 # Two entities previously disconnected on the triplestore now become connected 

300 # ! 

301 return self.conflict(idslist, name, id_dict, col_name) 

302 else: 

303 existing_ids = sparql_match[0][2] 

304 new_idslist = [x[1] for x in existing_ids] 

305 new_sparql_match = self.finder_sparql(new_idslist, br=False, ra=True, vvi=False, publ=publ_entity) 

306 if len(new_sparql_match) > 1: 

307 # Two entities previously disconnected on the triplestore now become connected 

308 # ! 

309 return self.conflict(idslist, name, id_dict, col_name) 

310 else: 

311 # 4 Merge data from EntityA (CSV) with data from EntityX (CSV) (it has already happened in # 5), update both with data from EntityA (RDF) 

312 old_metaval = metaval 

313 metaval = sparql_match[0][0] 

314 entity_dict[metaval] = dict() 

315 entity_dict[metaval]['ids'] = list() 

316 entity_dict[metaval]['others'] = list() 

317 entity_dict[metaval]['title'] = sparql_match[0][1] if sparql_match[0][1] else '' 

318 self.merge(entity_dict, metaval, old_metaval, sparql_match[0][1]) 

319 for identifier in existing_ids: 

320 if identifier[1] not in id_dict: 

321 id_dict[identifier[1]] = identifier[0] 

322 if identifier[1] not in entity_dict[metaval]['ids']: 

323 entity_dict[metaval]['ids'].append(identifier[1]) 

324 else: 

325 sparql_match = self.finder_sparql(idslist, br=False, ra=True, vvi=False, publ=publ_entity) 

326 if len(sparql_match) > 1: 

327 # ! 

328 return self.conflict(idslist, name, id_dict, col_name) 

329 elif len(sparql_match) == 1: 

330 existing_ids = sparql_match[0][2] 

331 new_idslist = [x[1] for x in existing_ids] 

332 new_sparql_match = self.finder_sparql(new_idslist, br=False, ra=True, vvi=False, publ=publ_entity) 

333 if len(new_sparql_match) > 1: 

334 # Two entities previously disconnected on the triplestore now become connected 

335 # ! 

336 return self.conflict(idslist, name, id_dict, col_name) 

337 # 2 Retrieve EntityA data in triplestore to update EntityA inside CSV 

338 # 3 CONFLICT beteen MetaIDs. MetaID specified in EntityA inside CSV has precedence. 

339 elif len(new_sparql_match) == 1: 

340 metaval = sparql_match[0][0] 

341 entity_dict[metaval] = dict() 

342 entity_dict[metaval]['ids'] = list() 

343 entity_dict[metaval]['others'] = list() 

344 if col_name == 'author' or col_name == 'editor': 

345 entity_dict[metaval]['title'] = self.name_check(sparql_match[0][1], name) 

346 else: 

347 entity_dict[metaval]['title'] = sparql_match[0][1] 

348 

349 if not entity_dict[metaval]['title'] and name: 

350 entity_dict[metaval]['title'] = name 

351 

352 for identifier in existing_ids: 

353 if identifier[1] not in id_dict: 

354 id_dict[identifier[1]] = identifier[0] 

355 if identifier[1] not in entity_dict[metaval]['ids']: 

356 entity_dict[metaval]['ids'].append(identifier[1]) 

357 else: 

358 # 1 EntityA is a new one 

359 metaval = self.new_entity(entity_dict, name) 

360 for identifier in idslist: 

361 if identifier not in id_dict: 

362 self.__update_id_count(id_dict, identifier) 

363 if identifier not in entity_dict[metaval]['ids']: 

364 entity_dict[metaval]['ids'].append(identifier) 

365 if not entity_dict[metaval]['title'] and name: 

366 entity_dict[metaval]['title'] = name 

367 # 1 EntityA is a new one 

368 if not idslist and not metaval: 

369 metaval = self.new_entity(entity_dict, name) 

370 return metaval 

371 

372 def enrich(self): 

373 ''' 

374 This method replaces the wannabeID placeholders with the 

375 actual data and MetaIDs as a result of the deduplication process. 

376 ''' 

377 for row in self.data: 

378 for field in {'author', 'editor', 'publisher'}: 

379 if row[field]: 

380 ras_list = list() 

381 if field in {'author', 'editor'}: 

382 ra_list = re.split(semicolon_in_people_field, row[field]) 

383 else: 

384 ra_list = [row[field]] 

385 for ra_entity in ra_list: 

386 metaid = re.search(name_and_ids, ra_entity).group(2).replace('omid:ra/', '') 

387 if 'wannabe' in metaid: 

388 for ra_metaid in self.rameta: 

389 if metaid in self.rameta[ra_metaid]['others']: 

390 metaid = ra_metaid 

391 ra_name = self.rameta[metaid]['title'] 

392 ra_ids = ' '.join(self.rameta[metaid]['ids']) 

393 ra = ra_name + ' [' + ra_ids + ']' 

394 ras_list.append(ra) 

395 row[field] = '; '.join(ras_list) 

396 

397 @staticmethod 

398 def __local_match(list_to_match, dict_to_match:dict): 

399 match_elem = dict() 

400 match_elem['existing'] = list() 

401 match_elem['wannabe'] = list() 

402 for elem in list_to_match: 

403 for k, va in dict_to_match.items(): 

404 if elem in va['ids']: 

405 if 'wannabe' in k: 

406 if k not in match_elem['wannabe']: 

407 match_elem['wannabe'].append(k) # TODO: valutare uso di un set 

408 else: 

409 if k not in match_elem['existing']: 

410 match_elem['existing'].append(k) 

411 return match_elem 

412 

413 def __update_id_count(self, id_dict, identifier): 

414 count = self.counter_handler.increment_counter('id', supplier_prefix=self.prefix) 

415 id_dict[identifier] = self.prefix + str(count)