Coverage for oc_meta/plugins/multiprocess/resp_agents_curator.py: 70%
286 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2022 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17from __future__ import annotations
19import json
20import os
21import re
22from typing import List
24import redis
25from oc_meta.core.curator import Curator
26from oc_meta.lib.cleaner import Cleaner
27from oc_meta.lib.file_manager import *
28from oc_meta.lib.finder import *
29from oc_meta.lib.master_of_regex import *
31from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
34class RespAgentsCurator(Curator):
35 def __init__(self, data:List[dict], ts:str, prov_config:str, counter_handler:RedisCounterHandler, base_iri:str='https://w3id.org/oc/meta', prefix:str='060', separator:str=None, settings:dict|None=None, meta_config_path: str = None):
36 self.everything_everywhere_allatonce = Graph()
37 self.finder = ResourceFinder(ts, base_iri, self.everything_everywhere_allatonce, settings=settings, meta_config_path=meta_config_path)
38 self.prov_config = prov_config
39 self.separator = separator
40 self.data = [{field:value.strip() for field,value in row.items()} for row in data]
41 self.prefix = prefix
42 self.counter_handler = counter_handler
43 self.radict:Dict[str, Dict[str, list]] = {}
44 self.idra = {} # key id; value metaid of id related to ra
45 self.conflict_ra = {}
46 self.rameta = dict()
47 self.wnb_cnt = 0 # wannabe counter
48 self.rowcnt = 0
49 self.log = dict()
50 self.preexisting_entities = set()
52 def curator(self, filename:str=None, path_csv:str=None, path_index:str=None):
53 metavals, identifiers, vvis = self.collect_identifiers(valid_dois_cache=dict())
54 self.finder.get_everything_about_res(metavals=metavals, identifiers=identifiers, vvis=vvis)
55 for row in self.data:
56 self.log[self.rowcnt] = {
57 'id': {},
58 'author': {},
59 'venue': {},
60 'editor': {},
61 'publisher': {},
62 'page': {},
63 'volume': {},
64 'issue': {},
65 'pub_date': {},
66 'type': {}
67 }
68 self.clean_ra(row, 'author')
69 self.clean_ra(row, 'publisher')
70 self.clean_ra(row, 'editor')
71 self.rowcnt += 1
72 self.radict.update(self.conflict_ra)
73 self.meta_maker()
74 self.log = self.log_update()
75 self.enrich()
76 if path_index:
77 path_index = os.path.join(path_index, filename)
78 self.filename = filename
79 self.indexer(path_index, path_csv)
81 def collect_identifiers(self, valid_dois_cache):
82 """
83 Override parent method to maintain compatibility - RespAgentsCurator doesn't need venue_ids.
84 """
85 all_metavals = set()
86 all_idslist = set()
87 all_vvis = set()
88 for row in self.data:
89 metavals, idslist, vvis = self.extract_identifiers_and_metavals(
90 row, valid_dois_cache=valid_dois_cache
91 )
92 all_metavals.update(metavals)
93 all_idslist.update(idslist)
94 all_vvis.update(vvis)
95 return all_metavals, all_idslist, all_vvis
97 def clean_ra(self, row, col_name):
98 '''
99 This method performs the deduplication process for responsible agents (authors, publishers and editors).
101 :params row: a dictionary representing a CSV row
102 :type row: Dict[str, str]
103 :params col_name: the CSV column name. It can be 'author', 'publisher', or 'editor'
104 :type col_name: str
105 :returns: None -- This method modifies self.radict and self.idra, and returns None.
106 '''
107 if row[col_name]:
108 if col_name in {'author', 'editor'}:
109 ra_list = re.split(semicolon_in_people_field, row[col_name])
110 elif col_name == 'publisher':
111 ra_list = [row[col_name]]
112 ra_metaids = list()
113 ra_list = Cleaner.clean_ra_list(ra_list)
114 for ra in ra_list:
115 ra_id = re.search(name_and_ids, ra)
116 name = Cleaner(ra_id.group(1)).clean_name()
117 ra_id = ra_id.group(2)
118 if self.separator:
119 ra_id_list = re.sub(colon_and_spaces, ':', ra_id).split(self.separator)
120 else:
121 ra_id_list = re.split(one_or_more_spaces, re.sub(colon_and_spaces, ':', ra_id))
122 ra_id_list, metaval = self.clean_id_list(ra_id_list)
123 if col_name == 'publisher':
124 metaval = self.id_worker('publisher', name, ra_id_list, metaval, publ_entity=True)
125 else:
126 metaval = self.id_worker(col_name, name, ra_id_list, metaval, publ_entity=False)
127 if col_name != 'publisher' and metaval in self.radict:
128 full_name:str = self.radict[metaval]['title']
129 if ',' in name and ',' in full_name:
130 first_name = name.split(',')[1].strip()
131 if not full_name.split(',')[1].strip() and first_name: # first name found!
132 given_name = full_name.split(',')[0]
133 self.radict[metaval]['title'] = given_name + ', ' + first_name
134 ra_metaids.append(f'{name} [omid:ra/{metaval}]')
135 row[col_name] = '; '.join(ra_metaids)
137 def meta_maker(self):
138 '''
139 The MetaID dictionary 'rameta' is created from 'radict'.
140 '''
141 for identifier in self.radict:
142 if 'wannabe' in identifier:
143 other = identifier
144 count = self.counter_handler.increment_counter('ra', supplier_prefix=self.prefix)
145 meta = self.prefix + str(count)
146 self.rameta[meta] = self.radict[identifier]
147 self.rameta[meta]['others'].append(other)
148 self.rameta[meta]['ids'].append('omid:ra/' + meta)
149 else:
150 self.rameta[identifier] = self.radict[identifier]
151 self.preexisting_entities.add(f'ra/{identifier}')
152 self.rameta[identifier]['ids'].append('omid:ra/' + identifier)
154 for _, omid in self.idra.items():
155 self.preexisting_entities.add(f'id/{omid}')
157 def indexer(self, path_index:str, path_csv:str) -> None:
158 '''
159 This method is used to transform idra in such a way as to be saved as a csv file.
160 Finally, it generates the enriched CSV and saves it.
162 :params path_index: a directory path. It will contain the indexes
163 :type path_index: str
164 :params path_csv: a file path. It will be the output enriched CSV
165 :type path_csv: str
166 '''
167 # ID
168 self.index_id_ra = list()
169 cur_index = self.idra
170 if cur_index:
171 for literal in cur_index:
172 row = dict()
173 row['id'] = str(literal)
174 row['meta'] = str(cur_index[literal])
175 self.index_id_ra.append(row)
176 else:
177 row = dict()
178 row['id'] = ''
179 row['meta'] = ''
180 self.index_id_ra.append(row)
181 if self.filename:
182 if not os.path.exists(path_index):
183 os.makedirs(path_index)
184 ra_path = os.path.join(path_index, 'index_id_ra.csv')
185 write_csv(ra_path, self.index_id_ra)
186 if self.log:
187 log_file = os.path.join(path_index + 'log.json')
188 with open(log_file, 'w') as lf:
189 json.dump(self.log, lf)
190 if self.data:
191 name = self.filename + '.csv'
192 data_file = os.path.join(path_csv, name)
193 write_csv(data_file, self.data)
195 @staticmethod
196 def clean_id_list(id_list:List[str], br=None, valid_dois_cache=dict()) -> Tuple[list, str]:
197 '''
198 Clean IDs in the input list and check if there is a MetaID.
200 :params: id_list: a list of IDs
201 :type: id_list: List[str]
202 :params: br: True if the IDs in id_list refer to bibliographic resources, False otherwise
203 :type: br: bool
204 :returns: Tuple[list, str]: -- it returns a two-elements tuple, where the first element is the list of cleaned IDs, while the second is a MetaID if any was found.
205 '''
206 pattern = 'ra/'
207 metaid = ''
208 id_list = list(filter(None, id_list))
209 clean_list = list()
210 for elem in id_list:
211 if elem in clean_list:
212 continue
213 elem = Cleaner(elem).normalize_hyphens()
214 identifier = elem.split(':', 1)
215 schema = identifier[0].lower()
216 value = identifier[1]
217 if schema == 'omid':
218 metaid = value.replace(pattern, '')
219 else:
220 normalized_id = Cleaner(elem).normalize_id(valid_dois_cache=dict())
221 if normalized_id:
222 clean_list.append(normalized_id)
223 how_many_meta = [i for i in id_list if i.lower().startswith('omid')]
224 if len(how_many_meta) > 1:
225 clean_list = [i for i in clean_list if not i.lower().startswith('omid')]
226 return clean_list, metaid
228 def id_worker(self, col_name, name, idslist:List[str], metaval: str, publ_entity:bool):
229 id_dict = self.idra
230 entity_dict = self.radict
231 # there's meta
232 if metaval:
233 # MetaID exists among data?
234 # meta already in entity_dict (no care about conflicts, we have a meta specified)
235 if metaval in entity_dict:
236 self.merge_entities_in_csv(idslist, metaval, name, entity_dict, id_dict)
237 else:
238 found_meta_ts = None
239 found_meta_ts = self.finder.retrieve_ra_from_meta(metaval)
240 # meta in triplestore
241 # 2 Retrieve EntityA data in triplestore to update EntityA inside CSV
242 if found_meta_ts:
243 entity_dict[metaval] = dict()
244 entity_dict[metaval]['ids'] = list()
245 if col_name == 'author' or col_name == 'editor':
246 entity_dict[metaval]['title'] = self.name_check(found_meta_ts[0], name)
247 else:
248 entity_dict[metaval]['title'] = found_meta_ts[0]
249 entity_dict[metaval]['others'] = list()
250 self.merge_entities_in_csv(idslist, metaval, name, entity_dict, id_dict)
251 existing_ids = found_meta_ts[1]
252 for identifier in existing_ids:
253 if identifier[1] not in id_dict:
254 id_dict[identifier[1]] = identifier[0]
255 if identifier[1] not in entity_dict[metaval]['ids']:
256 entity_dict[metaval]['ids'].append(identifier[1])
257 # Look for MetaId in the provenance
258 else:
259 metaid_uri = f'{self.base_iri}/ra/{str(metaval)}'
260 # The entity MetaId after merge if it was merged, None otherwise. If None, the MetaId is considered invalid
261 metaval = self.finder.retrieve_metaid_from_merged_entity(metaid_uri=metaid_uri, prov_config=self.prov_config)
262 # there's no meta or there was one but it didn't exist
263 # Are there other IDs?
264 if idslist and not metaval:
265 local_match = self.__local_match(idslist, entity_dict)
266 # IDs already exist among data?
267 # check in entity_dict
268 if local_match['existing']:
269 # ids refer to multiple existing entities
270 if len(local_match['existing']) > 1:
271 # !
272 return self.conflict(idslist, name, id_dict, col_name)
273 # ids refer to ONE existing entity
274 elif len(local_match['existing']) == 1:
275 metaval = str(local_match['existing'][0])
276 suspect_ids = list()
277 for identifier in idslist:
278 if identifier not in entity_dict[metaval]['ids']:
279 suspect_ids.append(identifier)
280 if suspect_ids:
281 sparql_match = self.finder_sparql(suspect_ids, br=False, ra=True, vvi=False, publ=publ_entity)
282 if len(sparql_match) > 1:
283 # !
284 return self.conflict(idslist, name, id_dict, col_name)
285 # ids refers to 1 or more wannabe entities
286 elif local_match['wannabe']:
287 metaval = str(local_match['wannabe'].pop(0))
288 # 5 Merge data from entityA (CSV) with data from EntityX (CSV)
289 for old_meta in local_match['wannabe']:
290 self.merge(entity_dict, metaval, old_meta, name)
291 suspect_ids = list()
292 for identifier in idslist:
293 if identifier not in entity_dict[metaval]['ids']:
294 suspect_ids.append(identifier)
295 if suspect_ids:
296 sparql_match = self.finder_sparql(suspect_ids, br=False, ra=True, vvi=False, publ=publ_entity)
297 if sparql_match:
298 if 'wannabe' not in metaval or len(sparql_match) > 1:
299 # Two entities previously disconnected on the triplestore now become connected
300 # !
301 return self.conflict(idslist, name, id_dict, col_name)
302 else:
303 existing_ids = sparql_match[0][2]
304 new_idslist = [x[1] for x in existing_ids]
305 new_sparql_match = self.finder_sparql(new_idslist, br=False, ra=True, vvi=False, publ=publ_entity)
306 if len(new_sparql_match) > 1:
307 # Two entities previously disconnected on the triplestore now become connected
308 # !
309 return self.conflict(idslist, name, id_dict, col_name)
310 else:
311 # 4 Merge data from EntityA (CSV) with data from EntityX (CSV) (it has already happened in # 5), update both with data from EntityA (RDF)
312 old_metaval = metaval
313 metaval = sparql_match[0][0]
314 entity_dict[metaval] = dict()
315 entity_dict[metaval]['ids'] = list()
316 entity_dict[metaval]['others'] = list()
317 entity_dict[metaval]['title'] = sparql_match[0][1] if sparql_match[0][1] else ''
318 self.merge(entity_dict, metaval, old_metaval, sparql_match[0][1])
319 for identifier in existing_ids:
320 if identifier[1] not in id_dict:
321 id_dict[identifier[1]] = identifier[0]
322 if identifier[1] not in entity_dict[metaval]['ids']:
323 entity_dict[metaval]['ids'].append(identifier[1])
324 else:
325 sparql_match = self.finder_sparql(idslist, br=False, ra=True, vvi=False, publ=publ_entity)
326 if len(sparql_match) > 1:
327 # !
328 return self.conflict(idslist, name, id_dict, col_name)
329 elif len(sparql_match) == 1:
330 existing_ids = sparql_match[0][2]
331 new_idslist = [x[1] for x in existing_ids]
332 new_sparql_match = self.finder_sparql(new_idslist, br=False, ra=True, vvi=False, publ=publ_entity)
333 if len(new_sparql_match) > 1:
334 # Two entities previously disconnected on the triplestore now become connected
335 # !
336 return self.conflict(idslist, name, id_dict, col_name)
337 # 2 Retrieve EntityA data in triplestore to update EntityA inside CSV
338 # 3 CONFLICT beteen MetaIDs. MetaID specified in EntityA inside CSV has precedence.
339 elif len(new_sparql_match) == 1:
340 metaval = sparql_match[0][0]
341 entity_dict[metaval] = dict()
342 entity_dict[metaval]['ids'] = list()
343 entity_dict[metaval]['others'] = list()
344 if col_name == 'author' or col_name == 'editor':
345 entity_dict[metaval]['title'] = self.name_check(sparql_match[0][1], name)
346 else:
347 entity_dict[metaval]['title'] = sparql_match[0][1]
349 if not entity_dict[metaval]['title'] and name:
350 entity_dict[metaval]['title'] = name
352 for identifier in existing_ids:
353 if identifier[1] not in id_dict:
354 id_dict[identifier[1]] = identifier[0]
355 if identifier[1] not in entity_dict[metaval]['ids']:
356 entity_dict[metaval]['ids'].append(identifier[1])
357 else:
358 # 1 EntityA is a new one
359 metaval = self.new_entity(entity_dict, name)
360 for identifier in idslist:
361 if identifier not in id_dict:
362 self.__update_id_count(id_dict, identifier)
363 if identifier not in entity_dict[metaval]['ids']:
364 entity_dict[metaval]['ids'].append(identifier)
365 if not entity_dict[metaval]['title'] and name:
366 entity_dict[metaval]['title'] = name
367 # 1 EntityA is a new one
368 if not idslist and not metaval:
369 metaval = self.new_entity(entity_dict, name)
370 return metaval
372 def enrich(self):
373 '''
374 This method replaces the wannabeID placeholders with the
375 actual data and MetaIDs as a result of the deduplication process.
376 '''
377 for row in self.data:
378 for field in {'author', 'editor', 'publisher'}:
379 if row[field]:
380 ras_list = list()
381 if field in {'author', 'editor'}:
382 ra_list = re.split(semicolon_in_people_field, row[field])
383 else:
384 ra_list = [row[field]]
385 for ra_entity in ra_list:
386 metaid = re.search(name_and_ids, ra_entity).group(2).replace('omid:ra/', '')
387 if 'wannabe' in metaid:
388 for ra_metaid in self.rameta:
389 if metaid in self.rameta[ra_metaid]['others']:
390 metaid = ra_metaid
391 ra_name = self.rameta[metaid]['title']
392 ra_ids = ' '.join(self.rameta[metaid]['ids'])
393 ra = ra_name + ' [' + ra_ids + ']'
394 ras_list.append(ra)
395 row[field] = '; '.join(ras_list)
397 @staticmethod
398 def __local_match(list_to_match, dict_to_match:dict):
399 match_elem = dict()
400 match_elem['existing'] = list()
401 match_elem['wannabe'] = list()
402 for elem in list_to_match:
403 for k, va in dict_to_match.items():
404 if elem in va['ids']:
405 if 'wannabe' in k:
406 if k not in match_elem['wannabe']:
407 match_elem['wannabe'].append(k) # TODO: valutare uso di un set
408 else:
409 if k not in match_elem['existing']:
410 match_elem['existing'].append(k)
411 return match_elem
413 def __update_id_count(self, id_dict, identifier):
414 count = self.counter_handler.increment_counter('id', supplier_prefix=self.prefix)
415 id_dict[identifier] = self.prefix + str(count)