Coverage for metaapi.py: 100%

75 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-02 10:19 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2022, Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17__author__ = 'Arcangelo Massari' 

18 

19import re 

20from difflib import get_close_matches 

21from typing import List, Tuple 

22from urllib.parse import quote 

23 

24# from publishers import PUBLISHERS 

25PUBLISHERS = list() 

26 

27URI_TYPE_DICT = { 

28 'http://purl.org/spar/doco/Abstract': 'abstract', 

29 'http://purl.org/spar/fabio/ArchivalDocument': 'archival document', 

30 'http://purl.org/spar/fabio/AudioDocument': 'audio document', 

31 'http://purl.org/spar/fabio/Book': 'book', 

32 'http://purl.org/spar/fabio/BookChapter': 'book chapter', 

33 'http://purl.org/spar/fabio/ExpressionCollection': 'book section', 

34 'http://purl.org/spar/fabio/BookSeries': 'book series', 

35 'http://purl.org/spar/fabio/BookSet': 'book set', 

36 'http://purl.org/spar/fabio/ComputerProgram': 'computer program', 

37 'http://purl.org/spar/doco/Part': 'book part', 

38 'http://purl.org/spar/fabio/Expression': '', 

39 'http://purl.org/spar/fabio/DataFile': 'dataset', 

40 'http://purl.org/spar/fabio/DataManagementPlan': 'data management plan', 

41 'http://purl.org/spar/fabio/Thesis': 'dissertation', 

42 'http://purl.org/spar/fabio/Editorial': 'editorial', 

43 'http://purl.org/spar/fabio/Journal': 'journal', 

44 'http://purl.org/spar/fabio/JournalArticle': 'journal article', 

45 'http://purl.org/spar/fabio/JournalEditorial': 'journal editorial', 

46 'http://purl.org/spar/fabio/JournalIssue': 'journal issue', 

47 'http://purl.org/spar/fabio/JournalVolume': 'journal volume', 

48 'http://purl.org/spar/fabio/Newspaper': 'newspaper', 

49 'http://purl.org/spar/fabio/NewspaperArticle': 'newspaper article', 

50 'http://purl.org/spar/fabio/NewspaperIssue': 'newspaper issue', 

51 'http://purl.org/spar/fr/ReviewVersion': 'peer review', 

52 'http://purl.org/spar/fabio/AcademicProceedings': 'proceedings', 

53 'http://purl.org/spar/fabio/Preprint': 'preprint', 

54 'http://purl.org/spar/fabio/Presentation': 'presentation', 

55 'http://purl.org/spar/fabio/ProceedingsPaper': 'proceedings article', 

56 'http://purl.org/spar/fabio/ReferenceBook': 'reference book', 

57 'http://purl.org/spar/fabio/ReferenceEntry': 'reference entry', 

58 'http://purl.org/spar/fabio/ReportDocument': 'report', 

59 'http://purl.org/spar/fabio/RetractionNotice': 'retraction notice', 

60 'http://purl.org/spar/fabio/Series': 'series', 

61 'http://purl.org/spar/fabio/SpecificationDocument': 'standard', 

62 'http://purl.org/spar/fabio/WebContent': 'web content' 

63} 

64 

65 

66# def generate_id_search(ids: str) -> Tuple[str]: 

67# id_searches = list() 

68# for identifier in ids.split('__'): 

69# scheme_literal_value = identifier.split(':', maxsplit=1) 

70# scheme = scheme_literal_value[0].lower() 

71# literal_value = quote(scheme_literal_value[1]) 

72# literal_value = literal_value.lower() if scheme == 'doi' else literal_value 

73 

74def generate_id_search(ids: str) -> Tuple[str]: 

75 id_searches = list() 

76 omid_values = [] 

77 other_values = [] 

78 

79 for identifier in ids.split('__'): 

80 scheme_literal_value = identifier.split(':', maxsplit=1) 

81 scheme = scheme_literal_value[0].lower() 

82 literal_value = scheme_literal_value[1] 

83 literal_value = literal_value.lower() if scheme == 'doi' else literal_value 

84 if scheme == 'omid': 

85 omid_values.append("{{ BIND(<https://w3id.org/oc/meta/"+literal_value+"> AS ?res) }}") 

86 elif scheme in {'doi', 'issn', 'isbn', 'openalex', 'pmid', 'pmcid', 'url', 'wikidata', 'wikipedia'}: 

87 other_values.append(''' 

88 {{ 

89 ?identifier literal:hasLiteralValue "'''+literal_value+'''"; 

90 datacite:usesIdentifierScheme datacite:'''+scheme+'''; 

91 ^datacite:hasIdentifier ?res. 

92 ?res a fabio:Expression. 

93 }} 

94 ''') 

95 

96 if omid_values: 

97 id_searches.append("?res a fabio:Expression."+" UNION ".join(omid_values)) 

98 

99 if other_values: 

100 id_searches.append(" UNION ".join(other_values)) 

101 

102 ids_search = " UNION ".join(id_searches) 

103 return ids_search, 

104 

105def generate_ra_search(identifier:str) -> Tuple[str]: 

106 scheme_literal_value = identifier.split(':') 

107 if len(scheme_literal_value) == 2: 

108 scheme = scheme_literal_value[0] 

109 literal_value = scheme_literal_value[1] 

110 else: 

111 scheme = 'orcid' 

112 literal_value = scheme_literal_value[0] 

113 if scheme == 'omid': 

114 return '<https://w3id.org/oc/meta/{0}> ^pro:isHeldBy ?knownRole.'.format(literal_value), 

115 else: 

116 return ''' 

117 ?knownPersonIdentifier literal:hasLiteralValue "{0}"^^<http://www.w3.org/2001/XMLSchema#string>; 

118 datacite:usesIdentifierScheme datacite:{1}; 

119 ^datacite:hasIdentifier ?knownPerson. 

120 ?knownPerson ^pro:isHeldBy ?knownRole. 

121 '''.format(literal_value, scheme), 

122 

123def create_metadata_output(results): 

124 header = results[0] 

125 output_results = [header] 

126 for result in results[1:]: 

127 output_result = list() 

128 for i, data in enumerate(result): 

129 if i == header.index('type'): 

130 beautiful_type = __postprocess_type(data[1]) 

131 output_result.append((data[0], beautiful_type)) 

132 elif i == header.index('author') or i == header.index('editor') or i == header.index('publisher'): 

133 ordered_list = process_ordered_list(data[1]) 

134 output_result.append((data[0], ordered_list)) 

135 else: 

136 output_result.append(data) 

137 output_results.append(output_result) 

138 return output_results, True 

139 

140def __postprocess_type(type_uri:str) -> str: 

141 if type_uri: 

142 type_string = URI_TYPE_DICT[type_uri] 

143 else: 

144 type_string = '' 

145 return type_string 

146 

147def process_ordered_list(items): 

148 if not items: 

149 return items 

150 items_dict = {} 

151 role_to_name = {} 

152 for item in items.split('|'): 

153 parts = item.split(':') 

154 name = ':'.join(parts[:-2]) 

155 current_role = parts[-2] 

156 next_role = parts[-1] if parts[-1] != '' else None 

157 items_dict[current_role] = next_role 

158 role_to_name[current_role] = name 

159 

160 ordered_items = [] 

161 start_role = next(iter(role for role, next_role in items_dict.items() if not role in items_dict.values())) 

162 

163 current_role = start_role 

164 while current_role: 

165 ordered_items.append(role_to_name[current_role]) 

166 current_role = items_dict.get(current_role, '') 

167 

168 return "; ".join(ordered_items) 

169 

170# def clean_name(name: str) -> str: 

171# if ',' in name: 

172# split_name = re.split('\s*,\s*', name) 

173# first_name = split_name[1].split() 

174# for i, w in enumerate(first_name): 

175# first_name[i] = clean_title(w) 

176# new_first_name = ' '.join(first_name) 

177# surname = split_name[0].split() 

178# for i, w in enumerate(surname): 

179# surname[i] = clean_title(w) 

180# new_surname = ' '.join(surname) 

181# if new_surname and new_first_name: 

182# new_name = new_surname + ', ' + new_first_name 

183# elif not new_surname and new_first_name: 

184# new_name = ', ' + new_first_name 

185# else: 

186# new_name = '' 

187# else: 

188# split_name = name.split() 

189# for i, w in enumerate(split_name): 

190# split_name[i] = clean_title(w) 

191# new_name = ' '.join(split_name) 

192# return new_name 

193 

194# def clean_title(title: str) -> str: 

195# if title.isupper(): 

196# title = title.lower() 

197# words = title.split() 

198# for i, w in enumerate(words): 

199# if not any(x.isupper() for x in w): 

200# words[i] = w.title() 

201# new_title = ' '.join(words) 

202# return new_title 

203 

204# class TextSearch(): 

205# def __init__(self, text:str): 

206# self.text = text 

207 

208# def get_text_search_on_id(self, ts_index:bool) -> str: 

209# schema_and_literal_value = self.text.split(':') 

210# schema = self.text = schema_and_literal_value[0].lower() 

211# literal_value = schema_and_literal_value[1] 

212# literal_value = literal_value.lower() if schema == 'doi' else literal_value 

213# return f''' 

214# {self.__gen_text_search(f'tsId{ts_index}', literal_value, True, ts_index)} 

215# ?tsIdentifier{ts_index} literal:hasLiteralValue ?tsId{ts_index}; 

216# datacite:usesIdentifierScheme datacite:{schema}. 

217# ?res datacite:hasIdentifier ?tsIdentifier{ts_index}; 

218# a fabio:Expression. 

219# ''' 

220 

221# def get_text_search_on_title(self, ts_index:bool) -> str: 

222# return f''' 

223# {self.__gen_text_search(f'tsTitle{ts_index}', self.text, False, ts_index)} 

224# ?res dcterm:title ?tsTitle{ts_index}; 

225# a fabio:Expression. 

226# ''' 

227 

228# def get_text_search_on_person(self, role:str, ts_index:bool) -> str: 

229# family_name = None 

230# given_name = None 

231# name = None 

232# clean_test = clean_name(self.text) 

233# if ',' in clean_test: 

234# name_parts = [part.strip() for part in clean_test.split(',')] 

235# if name_parts: 

236# family_name = name_parts[0] 

237# if len(name_parts) == 2: 

238# given_name = name_parts[1] 

239# given_name = '. '.join(given_name.split('.')) 

240# given_name = ' '.join([f"{name_part.rstrip('.')}.+?" if len(name_part.rstrip('.')) == 1 else name_part for name_part in given_name.split()]) 

241# given_name = given_name.replace('*', '.*?') 

242# else: 

243# name = clean_test 

244# role = role.title() 

245# text_search = '' 

246# base_query = f''' 

247# ?ts{role}{ts_index} pro:isHeldBy ?ts{role}Ra{ts_index}; 

248# pro:withRole pro:{role.lower()}. 

249# ?res pro:isDocumentContextFor ?ts{role}{ts_index}; 

250# a fabio:Expression. 

251# ''' 

252# if name: 

253# base_query = f''' 

254# ?ts{role}Ra{ts_index} ?namePredicate '{name}'. 

255# VALUES (?namePredicate) {{(foaf:name) (foaf:familyName)}}''' + base_query 

256# else: 

257# if family_name and given_name: 

258# base_query = f''' 

259# ?ts{role}Ra{ts_index} foaf:familyName '{family_name}'; 

260# foaf:givenName ?ts{role}Gn{ts_index}. 

261# FILTER(REGEX(?ts{role}Gn{ts_index}, '{given_name}'))''' + base_query 

262# elif family_name and not given_name: 

263# base_query = f"?ts{role}Ra{ts_index} foaf:familyName '{family_name}'." + base_query 

264# elif not family_name and given_name: 

265# base_query = f"?ts{role}Ra{ts_index} foaf:givenName '{given_name}'." + base_query 

266# return text_search + base_query 

267 

268# def get_text_search_on_publisher(self, ts_index:bool) -> str: 

269# close_match = get_close_matches(self.text.lower(), PUBLISHERS, n=1) 

270# if close_match: 

271# publisher = clean_name(close_match[0]) 

272# text_search_on_publisher = f''' 

273# ?tsPublisherRa{ts_index} foaf:name '{publisher}'. 

274# ?tsPublisher{ts_index} pro:isHeldBy ?tsPublisherRa{ts_index}; 

275# pro:withRole pro:publisher. 

276# ?res pro:isDocumentContextFor ?tsPublisher{ts_index}; 

277# a fabio:Expression. 

278# ''' 

279# else: 

280# text_search_on_publisher = f''' 

281# {self.__gen_text_search(f'tsPublisherName{ts_index}', self.text, False, ts_index)} 

282# ?tsPublisherRa{ts_index} foaf:name ?tsPublisherName{ts_index}. 

283# ?tsPublisher{ts_index} pro:isHeldBy ?tsPublisherRa{ts_index}; 

284# pro:withRole pro:publisher. 

285# ?res pro:isDocumentContextFor ?tsPublisher{ts_index}; 

286# a fabio:Expression. 

287# ''' 

288# return text_search_on_publisher 

289 

290# def get_text_search_on_vi(self, vi:str, ts_index:bool) -> str: 

291# v_or_i = vi.title() 

292# return f''' 

293# {self.__gen_text_search(f'ts{v_or_i}Number{ts_index}', self.text, False, ts_index)} 

294# ?ts{v_or_i}{ts_index} fabio:hasSequenceIdentifier ?ts{v_or_i}Number{ts_index}; 

295# a fabio:Journal{v_or_i}. 

296# ?res frbr:partOf+ ?ts{v_or_i}{ts_index}; 

297# a fabio:Expression. 

298# ''' 

299 

300# def get_text_search_on_venue(self, ts_index:bool) -> str: 

301# return f''' 

302# {self.__gen_text_search(f'tsVenueTitle{ts_index}', self.text, False, ts_index)} 

303# ?tsVenue{ts_index} dcterm:title ?tsVenueTitle{ts_index}. 

304# ?res frbr:partOf+ ?tsVenue{ts_index}. 

305# FILTER NOT EXISTS {{?res a fabio:JournalVolume}} 

306# FILTER NOT EXISTS {{?res a fabio:JournalIssue}} 

307# ''' 

308 

309# def __gen_text_search(self, variable:str, text:str, perfect_match:bool, ts_index:int) -> str: 

310# if str(ts_index).startswith('0'): 

311# min_relevance = f"bds:minRelevance '0.6'; bds:matchAllTerms 'true'." if not perfect_match else f"bds:matchRegex '^{text}$'." 

312# text_search = f"?{variable} bds:search '{text}'; {min_relevance}" 

313# else: 

314# pattern = f'^{text}$' if perfect_match else text 

315# text_search = f"FILTER REGEX (?{variable}, '{pattern}', 'i')" 

316# return text_search 

317 

318 

319# def to_text_search(request:str, ts_index:bool) -> Tuple[str, str]: 

320# text_search = None 

321# field = request[0] 

322# value = request[1] 

323# ts = TextSearch(value) 

324# if field in {'editor', 'author'}: 

325# text_search = getattr(ts, f'get_text_search_on_person')(field, ts_index) 

326# elif field in {'volume', 'issue'}: 

327# text_search = getattr(ts, f'get_text_search_on_vi')(field, ts_index) 

328# else: 

329# text_search = getattr(ts, f'get_text_search_on_{field}')(ts_index) 

330# return text_search 

331 

332# def generate_text_search(text_search:str) -> str: 

333# requests = reorder_requests(text_search) 

334# text_searches = [] 

335# for or_request in requests: 

336# and_text_search = '' 

337# for i, and_request in enumerate(or_request): 

338# and_text_search += f'{to_text_search(and_request, i)}' 

339# text_searches.append(and_text_search) 

340# if len(text_searches) > 1: 

341# query = '{' + '} UNION {'.join(text_searches) + '}' 

342# elif len(text_searches) == 1: 

343# query = text_searches[0] 

344# return query, 

345 

346# def reorder_requests(text_search:str) -> list: 

347# preferred_order = ['id', 'editor', 'author', 'title', 'venue', 'publisher', 'volume', 'issue'] 

348# reordered_requests = [] 

349# split_by_or = text_search.split('||') 

350# for or_request in split_by_or: 

351# split_by_and = or_request.split('&&') 

352# parsed_and_requests = parse_requests(split_by_and) 

353# sorted_and_requests = sorted(parsed_and_requests, key=lambda x: preferred_order.index(x[0])) 

354# reordered_requests.append(sorted_and_requests) 

355# return reordered_requests 

356 

357# def parse_requests(requests:list) -> List[Tuple]: 

358 # parsed_requests = list() 

359 # for request in requests: 

360 # field_value = re.search(r'(id|title|author|editor|publisher|venue|volume|issue)=((?:(?!&&|\|\|).)+)', request) 

361 # field = field_value.group(1) 

362 # value = field_value.group(2) 

363 # parsed_requests.append((field, value)) 

364 # if (any(field in {'volume', 'issue'} for field, _ in parsed_requests) and not any(field == 'venue' for field, _ in parsed_requests)): 

365 # raise(ValueError('Specify the venue if you want to search a volume or an issue')) 

366 # return parsed_requests