Coverage for oc_meta/plugins/multiprocess/resp_agents_curator.py: 71%
277 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2022 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17from __future__ import annotations
19import os
20import re
21from typing import List
23from oc_meta.core.curator import Curator
24from oc_meta.lib.cleaner import Cleaner
25from oc_meta.lib.file_manager import *
26from oc_meta.lib.finder import *
27from oc_meta.lib.master_of_regex import *
29from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
32class RespAgentsCurator(Curator):
33 def __init__(self, data:List[dict], ts:str, prov_config:str, counter_handler:RedisCounterHandler, base_iri:str='https://w3id.org/oc/meta', prefix:str='060', separator:str=None, settings:dict|None=None, meta_config_path: str = None, timer=None):
34 self.timer = timer
35 self.everything_everywhere_allatonce = Graph()
36 self.finder = ResourceFinder(ts, base_iri, self.everything_everywhere_allatonce, settings=settings, meta_config_path=meta_config_path)
37 self.prov_config = prov_config
38 self.separator = separator
39 self.data = [{field:value.strip() for field,value in row.items()} for row in data]
40 self.prefix = prefix
41 self.counter_handler = counter_handler
42 self.radict:Dict[str, Dict[str, list]] = {}
43 self.idra = {} # key id; value metaid of id related to ra
44 self.conflict_ra = {}
45 self.rameta = dict()
46 self.wnb_cnt = 0 # wannabe counter
47 self.rowcnt = 0
48 self.log = dict()
49 self.preexisting_entities = set()
51 def curator(self, filename: str = None, path_csv: str = None):
52 # Phase 1: Collect identifiers and SPARQL prefetch
53 with self._timed("curation__collect_identifiers"):
54 metavals, identifiers, vvis = self.collect_identifiers(valid_dois_cache=dict())
55 self.finder.get_everything_about_res(metavals=metavals, identifiers=identifiers, vvis=vvis)
57 # Phase 5: Clean RA (author + publisher + editor aggregated)
58 with self._timed("curation__clean_ra"):
59 for row in self.data:
60 self.log[self.rowcnt] = {
61 'id': {},
62 'author': {},
63 'venue': {},
64 'editor': {},
65 'publisher': {},
66 'page': {},
67 'volume': {},
68 'issue': {},
69 'pub_date': {},
70 'type': {}
71 }
72 self.clean_ra(row, 'author')
73 self.clean_ra(row, 'publisher')
74 self.clean_ra(row, 'editor')
75 self.rowcnt += 1
76 self.radict.update(self.conflict_ra)
78 # Phase 6: Finalize (meta_maker + enrich + indexer)
79 with self._timed("curation__finalize"):
80 self.meta_maker()
81 self.log = self.log_update()
82 self.enrich()
83 self.filename = filename
84 self.indexer(path_csv=path_csv)
86 def collect_identifiers(self, valid_dois_cache):
87 """
88 Override parent method to maintain compatibility - RespAgentsCurator doesn't need venue_ids.
89 """
90 all_metavals = set()
91 all_idslist = set()
92 all_vvis = set()
93 for row in self.data:
94 metavals, idslist, vvis = self.extract_identifiers_and_metavals(
95 row, valid_dois_cache=valid_dois_cache
96 )
97 all_metavals.update(metavals)
98 all_idslist.update(idslist)
99 all_vvis.update(vvis)
100 return all_metavals, all_idslist, all_vvis
102 def clean_ra(self, row, col_name):
103 '''
104 This method performs the deduplication process for responsible agents (authors, publishers and editors).
106 :params row: a dictionary representing a CSV row
107 :type row: Dict[str, str]
108 :params col_name: the CSV column name. It can be 'author', 'publisher', or 'editor'
109 :type col_name: str
110 :returns: None -- This method modifies self.radict and self.idra, and returns None.
111 '''
112 if row[col_name]:
113 if col_name in {'author', 'editor'}:
114 ra_list = re.split(semicolon_in_people_field, row[col_name])
115 elif col_name == 'publisher':
116 ra_list = [row[col_name]]
117 ra_metaids = list()
118 ra_list = Cleaner.clean_ra_list(ra_list)
119 for ra in ra_list:
120 ra_id = re.search(name_and_ids, ra)
121 name = Cleaner(ra_id.group(1)).clean_name()
122 ra_id = ra_id.group(2)
123 if self.separator:
124 ra_id_list = re.sub(colon_and_spaces, ':', ra_id).split(self.separator)
125 else:
126 ra_id_list = re.split(one_or_more_spaces, re.sub(colon_and_spaces, ':', ra_id))
127 ra_id_list, metaval = self.clean_id_list(ra_id_list)
128 if col_name == 'publisher':
129 metaval = self.id_worker('publisher', name, ra_id_list, metaval, publ_entity=True)
130 else:
131 metaval = self.id_worker(col_name, name, ra_id_list, metaval, publ_entity=False)
132 if col_name != 'publisher' and metaval in self.radict:
133 full_name:str = self.radict[metaval]['title']
134 if ',' in name and ',' in full_name:
135 first_name = name.split(',')[1].strip()
136 if not full_name.split(',')[1].strip() and first_name: # first name found!
137 given_name = full_name.split(',')[0]
138 self.radict[metaval]['title'] = given_name + ', ' + first_name
139 ra_metaids.append(f'{name} [omid:ra/{metaval}]')
140 row[col_name] = '; '.join(ra_metaids)
142 def meta_maker(self):
143 '''
144 The MetaID dictionary 'rameta' is created from 'radict'.
145 '''
146 for identifier in self.radict:
147 if 'wannabe' in identifier:
148 other = identifier
149 count = self.counter_handler.increment_counter('ra', supplier_prefix=self.prefix)
150 meta = self.prefix + str(count)
151 self.rameta[meta] = self.radict[identifier]
152 self.rameta[meta]['others'].append(other)
153 self.rameta[meta]['ids'].append('omid:ra/' + meta)
154 else:
155 self.rameta[identifier] = self.radict[identifier]
156 self.preexisting_entities.add(f'ra/{identifier}')
157 self.rameta[identifier]['ids'].append('omid:ra/' + identifier)
159 for _, omid in self.idra.items():
160 self.preexisting_entities.add(f'id/{omid}')
162 def indexer(self, path_csv: str = None) -> None:
163 """
164 Transform idra dict to list-of-dicts format for Creator consumption.
165 Optionally saves the enriched CSV file.
167 :params path_csv: Directory path for the enriched CSV output (optional)
168 :type path_csv: str
169 """
170 # ID
171 self.index_id_ra = list()
172 cur_index = self.idra
173 if cur_index:
174 for literal in cur_index:
175 row = dict()
176 row['id'] = str(literal)
177 row['meta'] = str(cur_index[literal])
178 self.index_id_ra.append(row)
179 else:
180 row = dict()
181 row['id'] = ''
182 row['meta'] = ''
183 self.index_id_ra.append(row)
184 # Save enriched CSV if path provided
185 if self.filename and path_csv and self.data:
186 name = self.filename + '.csv'
187 data_file = os.path.join(path_csv, name)
188 write_csv(data_file, self.data)
190 @staticmethod
191 def clean_id_list(id_list:List[str], br=None, valid_dois_cache=dict()) -> Tuple[list, str]:
192 '''
193 Clean IDs in the input list and check if there is a MetaID.
195 :params: id_list: a list of IDs
196 :type: id_list: List[str]
197 :params: br: True if the IDs in id_list refer to bibliographic resources, False otherwise
198 :type: br: bool
199 :returns: Tuple[list, str]: -- it returns a two-elements tuple, where the first element is the list of cleaned IDs, while the second is a MetaID if any was found.
200 '''
201 pattern = 'ra/'
202 metaid = ''
203 id_list = list(filter(None, id_list))
204 clean_list = list()
205 for elem in id_list:
206 if elem in clean_list:
207 continue
208 elem = Cleaner(elem).normalize_hyphens()
209 identifier = elem.split(':', 1)
210 schema = identifier[0].lower()
211 value = identifier[1]
212 if schema == 'omid':
213 metaid = value.replace(pattern, '')
214 else:
215 normalized_id = Cleaner(elem).normalize_id(valid_dois_cache=dict())
216 if normalized_id:
217 clean_list.append(normalized_id)
218 how_many_meta = [i for i in id_list if i.lower().startswith('omid')]
219 if len(how_many_meta) > 1:
220 clean_list = [i for i in clean_list if not i.lower().startswith('omid')]
221 return clean_list, metaid
223 def id_worker(self, col_name, name, idslist:List[str], metaval: str, publ_entity:bool):
224 id_dict = self.idra
225 entity_dict = self.radict
226 # there's meta
227 if metaval:
228 # MetaID exists among data?
229 # meta already in entity_dict (no care about conflicts, we have a meta specified)
230 if metaval in entity_dict:
231 self.merge_entities_in_csv(idslist, metaval, name, entity_dict, id_dict)
232 else:
233 found_meta_ts = None
234 found_meta_ts = self.finder.retrieve_ra_from_meta(metaval)
235 # meta in triplestore
236 # 2 Retrieve EntityA data in triplestore to update EntityA inside CSV
237 if found_meta_ts:
238 entity_dict[metaval] = dict()
239 entity_dict[metaval]['ids'] = list()
240 if col_name == 'author' or col_name == 'editor':
241 entity_dict[metaval]['title'] = self.name_check(found_meta_ts[0], name)
242 else:
243 entity_dict[metaval]['title'] = found_meta_ts[0]
244 entity_dict[metaval]['others'] = list()
245 self.merge_entities_in_csv(idslist, metaval, name, entity_dict, id_dict)
246 existing_ids = found_meta_ts[1]
247 for identifier in existing_ids:
248 if identifier[1] not in id_dict:
249 id_dict[identifier[1]] = identifier[0]
250 if identifier[1] not in entity_dict[metaval]['ids']:
251 entity_dict[metaval]['ids'].append(identifier[1])
252 # Look for MetaId in the provenance
253 else:
254 metaid_uri = f'{self.base_iri}/ra/{str(metaval)}'
255 # The entity MetaId after merge if it was merged, None otherwise. If None, the MetaId is considered invalid
256 metaval = self.finder.retrieve_metaid_from_merged_entity(metaid_uri=metaid_uri, prov_config=self.prov_config)
257 # there's no meta or there was one but it didn't exist
258 # Are there other IDs?
259 if idslist and not metaval:
260 local_match = self.__local_match(idslist, entity_dict)
261 # IDs already exist among data?
262 # check in entity_dict
263 if local_match['existing']:
264 # ids refer to multiple existing entities
265 if len(local_match['existing']) > 1:
266 # !
267 return self.conflict(idslist, name, id_dict, col_name)
268 # ids refer to ONE existing entity
269 elif len(local_match['existing']) == 1:
270 metaval = str(local_match['existing'][0])
271 suspect_ids = list()
272 for identifier in idslist:
273 if identifier not in entity_dict[metaval]['ids']:
274 suspect_ids.append(identifier)
275 if suspect_ids:
276 sparql_match = self.finder_sparql(suspect_ids, br=False, ra=True, vvi=False, publ=publ_entity)
277 if len(sparql_match) > 1:
278 # !
279 return self.conflict(idslist, name, id_dict, col_name)
280 # ids refers to 1 or more wannabe entities
281 elif local_match['wannabe']:
282 metaval = str(local_match['wannabe'].pop(0))
283 # 5 Merge data from entityA (CSV) with data from EntityX (CSV)
284 for old_meta in local_match['wannabe']:
285 self.merge(entity_dict, metaval, old_meta, name)
286 suspect_ids = list()
287 for identifier in idslist:
288 if identifier not in entity_dict[metaval]['ids']:
289 suspect_ids.append(identifier)
290 if suspect_ids:
291 sparql_match = self.finder_sparql(suspect_ids, br=False, ra=True, vvi=False, publ=publ_entity)
292 if sparql_match:
293 if 'wannabe' not in metaval or len(sparql_match) > 1:
294 # Two entities previously disconnected on the triplestore now become connected
295 # !
296 return self.conflict(idslist, name, id_dict, col_name)
297 else:
298 existing_ids = sparql_match[0][2]
299 new_idslist = [x[1] for x in existing_ids]
300 new_sparql_match = self.finder_sparql(new_idslist, br=False, ra=True, vvi=False, publ=publ_entity)
301 if len(new_sparql_match) > 1:
302 # Two entities previously disconnected on the triplestore now become connected
303 # !
304 return self.conflict(idslist, name, id_dict, col_name)
305 else:
306 # 4 Merge data from EntityA (CSV) with data from EntityX (CSV) (it has already happened in # 5), update both with data from EntityA (RDF)
307 old_metaval = metaval
308 metaval = sparql_match[0][0]
309 entity_dict[metaval] = dict()
310 entity_dict[metaval]['ids'] = list()
311 entity_dict[metaval]['others'] = list()
312 entity_dict[metaval]['title'] = sparql_match[0][1] if sparql_match[0][1] else ''
313 self.merge(entity_dict, metaval, old_metaval, sparql_match[0][1])
314 for identifier in existing_ids:
315 if identifier[1] not in id_dict:
316 id_dict[identifier[1]] = identifier[0]
317 if identifier[1] not in entity_dict[metaval]['ids']:
318 entity_dict[metaval]['ids'].append(identifier[1])
319 else:
320 sparql_match = self.finder_sparql(idslist, br=False, ra=True, vvi=False, publ=publ_entity)
321 if len(sparql_match) > 1:
322 # !
323 return self.conflict(idslist, name, id_dict, col_name)
324 elif len(sparql_match) == 1:
325 existing_ids = sparql_match[0][2]
326 new_idslist = [x[1] for x in existing_ids]
327 new_sparql_match = self.finder_sparql(new_idslist, br=False, ra=True, vvi=False, publ=publ_entity)
328 if len(new_sparql_match) > 1:
329 # Two entities previously disconnected on the triplestore now become connected
330 # !
331 return self.conflict(idslist, name, id_dict, col_name)
332 # 2 Retrieve EntityA data in triplestore to update EntityA inside CSV
333 # 3 CONFLICT beteen MetaIDs. MetaID specified in EntityA inside CSV has precedence.
334 elif len(new_sparql_match) == 1:
335 metaval = sparql_match[0][0]
336 entity_dict[metaval] = dict()
337 entity_dict[metaval]['ids'] = list()
338 entity_dict[metaval]['others'] = list()
339 if col_name == 'author' or col_name == 'editor':
340 entity_dict[metaval]['title'] = self.name_check(sparql_match[0][1], name)
341 else:
342 entity_dict[metaval]['title'] = sparql_match[0][1]
344 if not entity_dict[metaval]['title'] and name:
345 entity_dict[metaval]['title'] = name
347 for identifier in existing_ids:
348 if identifier[1] not in id_dict:
349 id_dict[identifier[1]] = identifier[0]
350 if identifier[1] not in entity_dict[metaval]['ids']:
351 entity_dict[metaval]['ids'].append(identifier[1])
352 else:
353 # 1 EntityA is a new one
354 metaval = self.new_entity(entity_dict, name)
355 for identifier in idslist:
356 if identifier not in id_dict:
357 self.__update_id_count(id_dict, identifier)
358 if identifier not in entity_dict[metaval]['ids']:
359 entity_dict[metaval]['ids'].append(identifier)
360 if not entity_dict[metaval]['title'] and name:
361 entity_dict[metaval]['title'] = name
362 # 1 EntityA is a new one
363 if not idslist and not metaval:
364 metaval = self.new_entity(entity_dict, name)
365 return metaval
367 def enrich(self):
368 '''
369 This method replaces the wannabeID placeholders with the
370 actual data and MetaIDs as a result of the deduplication process.
371 '''
372 for row in self.data:
373 for field in {'author', 'editor', 'publisher'}:
374 if row[field]:
375 ras_list = list()
376 if field in {'author', 'editor'}:
377 ra_list = re.split(semicolon_in_people_field, row[field])
378 else:
379 ra_list = [row[field]]
380 for ra_entity in ra_list:
381 metaid = re.search(name_and_ids, ra_entity).group(2).replace('omid:ra/', '')
382 if 'wannabe' in metaid:
383 for ra_metaid in self.rameta:
384 if metaid in self.rameta[ra_metaid]['others']:
385 metaid = ra_metaid
386 ra_name = self.rameta[metaid]['title']
387 ra_ids = ' '.join(self.rameta[metaid]['ids'])
388 ra = ra_name + ' [' + ra_ids + ']'
389 ras_list.append(ra)
390 row[field] = '; '.join(ras_list)
392 @staticmethod
393 def __local_match(list_to_match, dict_to_match:dict):
394 match_elem = dict()
395 match_elem['existing'] = list()
396 match_elem['wannabe'] = list()
397 for elem in list_to_match:
398 for k, va in dict_to_match.items():
399 if elem in va['ids']:
400 if 'wannabe' in k:
401 if k not in match_elem['wannabe']:
402 match_elem['wannabe'].append(k) # TODO: valutare uso di un set
403 else:
404 if k not in match_elem['existing']:
405 match_elem['existing'].append(k)
406 return match_elem
408 def __update_id_count(self, id_dict, identifier):
409 count = self.counter_handler.increment_counter('id', supplier_prefix=self.prefix)
410 id_dict[identifier] = self.prefix + str(count)