Coverage for oc_ocdm / support / support.py: 88%

225 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-28 18:52 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7# -*- coding: utf-8 -*- 

8from __future__ import annotations 

9 

10import os 

11import re 

12from dataclasses import dataclass 

13from datetime import datetime 

14from functools import lru_cache 

15from typing import TYPE_CHECKING 

16from rdflib import URIRef, Graph 

17 

18if TYPE_CHECKING: 

19 from typing import Optional, List, Tuple, Match, Dict, Set 

20 from oc_ocdm.graph.entities.bibliographic.bibliographic_resource import BibliographicResource 

21 from oc_ocdm.graph.entities.bibliographic.responsible_agent import ResponsibleAgent 

22 from oc_ocdm.graph.entities.bibliographic.agent_role import AgentRole 

23 

24from urllib.parse import quote 

25 

26 

27@dataclass 

28class ParsedURI: 

29 base_iri: str 

30 short_name: str 

31 prefix: str 

32 count: str 

33 is_prov: bool 

34 prov_subject_short_name: str 

35 prov_subject_prefix: str 

36 prov_subject_count: str 

37 

38from rdflib import RDF, XSD, Literal 

39from typing import Union 

40 

41 

42def sparql_binding_to_term(binding: dict) -> Union[URIRef, Literal]: 

43 """Convert a SPARQL JSON result binding to an rdflib term. 

44 

45 Per RDF 1.1, simple literals (no datatype, no language tag) are normalized to xsd:string. 

46 """ 

47 if binding['type'] == 'uri': 

48 return URIRef(binding['value']) 

49 datatype = binding.get('datatype') 

50 lang = binding.get('xml:lang') 

51 if datatype is not None: 

52 datatype = URIRef(datatype) 

53 elif lang is None: 

54 datatype = XSD.string 

55 return Literal(binding['value'], datatype=datatype, lang=lang) 

56 

57 

58def normalize_graph_literals(g: Graph) -> None: 

59 triples_to_update = [] 

60 for s, p, o in g: 

61 if isinstance(o, Literal) and o.datatype is None and o.language is None: 

62 triples_to_update.append((s, p, o, Literal(str(o), datatype=XSD.string))) 

63 for s, p, old_o, new_o in triples_to_update: 

64 g.remove((s, p, old_o)) 

65 g.add((s, p, new_o)) 

66 

67 

68def create_date(date_list: Optional[List[Optional[int]]] = None) -> Optional[str]: 

69 string: Optional[str] = None 

70 if date_list is not None: 

71 l_date_list: int = len(date_list) 

72 if l_date_list != 0 and date_list[0] is not None: 

73 if l_date_list == 3 and date_list[1] is not None and date_list[2] is not None and \ 

74 (date_list[1] != 1 or date_list[2] != 1): 

75 string = datetime(date_list[0], date_list[1], date_list[2]).strftime('%Y-%m-%d') 

76 elif l_date_list >= 2 and date_list[1] is not None: 

77 string = datetime(date_list[0], date_list[1], 1).strftime('%Y-%m') 

78 else: 

79 string = datetime(date_list[0], 1, 1).strftime('%Y') 

80 return string 

81 

82 

83def get_datatype_from_iso_8601(string: str) -> Tuple[URIRef, str]: 

84 # Keep only the "yyyy-mm-dd" part of the string 

85 string = string[:10] 

86 

87 try: 

88 date_parts: List[int] = [int(s) for s in string.split(sep='-', maxsplit=2)] 

89 except ValueError: 

90 raise ValueError("The provided date string is not ISO-8601 compliant!") 

91 

92 num_of_parts: int = len(date_parts) 

93 if num_of_parts == 3: 

94 return XSD.date, datetime(date_parts[0], date_parts[1], date_parts[2]).strftime('%Y-%m-%d') 

95 elif num_of_parts == 2: 

96 return XSD.gYearMonth, datetime(date_parts[0], date_parts[1], 1).strftime('%Y-%m') 

97 else: 

98 return XSD.gYear, datetime(date_parts[0], 1, 1).strftime('%Y') 

99 

100def get_ordered_contributors_from_br(br: BibliographicResource, 

101 contributor_type: URIRef): 

102 

103 ar_list: List[AgentRole] = br.get_contributors() 

104 

105 list_id: int = 0 

106 heads: Dict[URIRef, Dict] = {} 

107 tails: Dict[URIRef, Dict] = {} 

108 sub_lists: List[Dict] = [] 

109 from_id_to_res_in_heads: Dict[int, URIRef] = {} 

110 for ar in ar_list: 

111 role_type: Optional[URIRef] = ar.get_role_type() 

112 ra: Optional[ResponsibleAgent] = ar.get_is_held_by() 

113 next_ar: Optional[AgentRole] = ar.get_next() 

114 if next_ar is not None: 

115 next_ar_res: Optional[URIRef] = next_ar.res 

116 else: 

117 next_ar_res: Optional[URIRef] = None 

118 

119 if role_type is not None and role_type == contributor_type and ra is not None: 

120 if next_ar_res is not None and next_ar_res in heads: 

121 sub_list: Dict = heads[next_ar_res] 

122 sub_list['list'].insert(0, ra) 

123 del heads[next_ar_res] 

124 heads[ar.res] = sub_list 

125 from_id_to_res_in_heads[sub_list['id']] = ar.res 

126 elif ar.res is not None and ar.res in tails: 

127 sub_list: Dict = tails[ar.res] 

128 sub_list['list'].append(ra) 

129 del tails[ar.res] 

130 

131 if next_ar_res is not None: 

132 tails[next_ar_res] = sub_list 

133 else: 

134 # This AR cannot be inserted into any list, so 

135 # we need to create an entirely new list for it: 

136 sub_list: Dict = {'id': list_id, 'list': [ra]} 

137 list_id += 1 

138 sub_lists.append(sub_list) 

139 

140 heads[ar.res] = sub_list 

141 from_id_to_res_in_heads[sub_list['id']] = ar.res 

142 if next_ar_res is not None: 

143 tails[next_ar_res] = sub_list 

144 

145 ids_in_heads: Set[int] = {val['id'] for val in heads.values()} 

146 ids_in_tails: Set[int] = {val['id'] for val in tails.values()} 

147 diff_set: Set[int] = ids_in_heads - ids_in_tails 

148 if len(diff_set) == 0: 

149 # No contributor was found! 

150 return [] 

151 elif len(diff_set) != 1: 

152 raise ValueError('A malformed list of AgentRole entities was given.') 

153 else: 

154 result_list: List[ResponsibleAgent] = [] 

155 cur_id: int = diff_set.pop() 

156 already_merged_list_ids: Set[int] = set() 

157 finished: bool = False 

158 while not finished: 

159 found: bool = False 

160 if cur_id in from_id_to_res_in_heads: 

161 res: URIRef = from_id_to_res_in_heads[cur_id] 

162 subl: Dict = heads[res] 

163 subl_id: int = subl['id'] 

164 if subl_id not in already_merged_list_ids: 

165 found = True 

166 already_merged_list_ids.add(subl_id) 

167 result_list = subl['list'] + result_list 

168 

169 # Now we need to get the next cur_id value: 

170 if res in tails: 

171 cur_id = tails[res]['id'] 

172 else: 

173 finished = True 

174 

175 if not found: 

176 raise ValueError('A malformed list of AgentRole entities was given.') 

177 

178 unmerged_list_ids: Set[int] = ids_in_heads - already_merged_list_ids 

179 if len(unmerged_list_ids) != 0: 

180 raise ValueError('A malformed list of AgentRole entities was given.') 

181 

182 return result_list 

183 

184 

185def encode_url(u: str) -> str: 

186 return quote(u, "://") 

187 

188 

189def create_literal(g: Graph, res: URIRef, p: URIRef, s: str, dt: Optional[URIRef] = None, nor: bool = True) -> None: 

190 if not is_string_empty(s): 

191 dt = dt if dt is not None else XSD.string 

192 g.add((res, p, Literal(s, datatype=dt, normalize=nor))) 

193 

194 

195def create_type(g: Graph, res: URIRef, res_type: URIRef) -> None: 

196 g.add((res, RDF.type, res_type)) 

197 

198 

199def is_string_empty(string: Optional[str]) -> bool: 

200 return string is None or string.strip() == "" 

201 

202 

203# Variable used in several functions 

204entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))$" 

205prov_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))/prov/([a-z][a-z])/([1-9][0-9]*)$" 

206 

207_compiled_entity_regex = re.compile(entity_regex) 

208_compiled_prov_regex = re.compile(prov_regex) 

209 

210 

211@lru_cache(maxsize=4096) 

212def parse_uri(res: URIRef) -> ParsedURI: 

213 string_iri = str(res) 

214 if "/prov/" in string_iri: 

215 match = _compiled_prov_regex.match(string_iri) 

216 if match: 

217 return ParsedURI( 

218 base_iri=match.group(1), 

219 short_name=match.group(5), 

220 prefix="", 

221 count=match.group(6), 

222 is_prov=True, 

223 prov_subject_short_name=match.group(2), 

224 prov_subject_prefix=match.group(3) or "", 

225 prov_subject_count=match.group(4), 

226 ) 

227 else: 

228 match = _compiled_entity_regex.match(string_iri) 

229 if match: 

230 return ParsedURI( 

231 base_iri=match.group(1), 

232 short_name=match.group(2), 

233 prefix=match.group(3) or "", 

234 count=match.group(4), 

235 is_prov=False, 

236 prov_subject_short_name="", 

237 prov_subject_prefix="", 

238 prov_subject_count="", 

239 ) 

240 return ParsedURI("", "", "", "", False, "", "", "") 

241 

242 

243def get_base_iri(res: URIRef) -> str: 

244 return parse_uri(res).base_iri 

245 

246 

247def get_short_name(res: URIRef) -> str: 

248 return parse_uri(res).short_name 

249 

250 

251def get_prefix(res: URIRef) -> str: 

252 return parse_uri(res).prefix 

253 

254 

255def get_count(res: URIRef) -> str: 

256 return parse_uri(res).count 

257 

258 

259def get_resource_number(res: URIRef) -> int: 

260 parsed = parse_uri(res) 

261 count = parsed.prov_subject_count if parsed.is_prov else parsed.count 

262 return int(count) if count else 0 

263 

264 

265def find_local_line_id(res: URIRef, n_file_item: int = 1) -> int: 

266 cur_number: int = get_resource_number(res) 

267 

268 cur_file_split: int = 0 

269 while True: 

270 if cur_number > cur_file_split: 

271 cur_file_split += n_file_item 

272 else: 

273 cur_file_split -= n_file_item 

274 break 

275 

276 return cur_number - cur_file_split 

277 

278 

279def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_split: int, 

280 n_file_item: int, is_json: bool = True, process_id: int|str|None = None) -> Tuple[str, str]: 

281 """ 

282 This function is responsible for looking for the correct JSON file that contains the data related to the 

283 resource identified by the variable 'string_iri'. This search takes into account the organisation in 

284 directories and files, as well as the particular supplier prefix for bibliographic entities, if specified. 

285 In case no supplier prefix is specified, the 'default_dir' (usually set to "_") is used instead. 

286 """ 

287 string_iri: str = str(res) 

288 process_id_str: str = f"_{process_id}" if process_id else "" 

289 

290 if is_dataset(res): 

291 cur_dir_path: str = (base_dir + re.sub(r"^%s(.*)$" % base_iri, r"\1", string_iri))[:-1] 

292 cur_file_path: str = cur_dir_path + os.sep + "index" + process_id_str + ".json" 

293 return cur_dir_path, cur_file_path 

294 

295 parsed = parse_uri(res) 

296 cur_number: int = int(parsed.prov_subject_count) if parsed.is_prov else int(parsed.count) 

297 

298 cur_file_split: int = 0 

299 while cur_number > cur_file_split: 

300 cur_file_split += n_file_item 

301 

302 if dir_split and not string_iri.startswith(base_iri + "prov/"): 

303 cur_split: int = 0 

304 while cur_number > cur_split: 

305 cur_split += dir_split 

306 

307 if parsed.is_prov: 

308 sub_folder = parsed.prov_subject_prefix or default_dir or "_" 

309 file_extension = '.json' if is_json else '.nq' 

310 cur_dir_path = base_dir + parsed.prov_subject_short_name + os.sep + sub_folder + \ 

311 os.sep + str(cur_split) + os.sep + str(cur_file_split) + os.sep + "prov" 

312 cur_file_path = cur_dir_path + os.sep + parsed.short_name + process_id_str + file_extension 

313 else: 

314 sub_folder = parsed.prefix or default_dir or "_" 

315 file_extension = '.json' if is_json else '.nt' 

316 cur_dir_path = base_dir + parsed.short_name + os.sep + sub_folder + os.sep + str(cur_split) 

317 cur_file_path = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension 

318 elif dir_split == 0: 

319 if parsed.is_prov: 

320 sub_folder = parsed.prov_subject_prefix or default_dir or "_" 

321 file_extension = '.json' if is_json else '.nq' 

322 cur_dir_path = base_dir + parsed.prov_subject_short_name + os.sep + sub_folder + \ 

323 os.sep + str(cur_file_split) + os.sep + "prov" 

324 cur_file_path = cur_dir_path + os.sep + parsed.short_name + process_id_str + file_extension 

325 else: 

326 sub_folder = parsed.prefix or default_dir or "_" 

327 file_extension = '.json' if is_json else '.nt' 

328 cur_dir_path = base_dir + parsed.short_name + os.sep + sub_folder 

329 cur_file_path = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension 

330 else: 

331 file_extension = '.json' if is_json else '.nq' 

332 cur_dir_path = base_dir + parsed.short_name 

333 cur_file_path = cur_dir_path + os.sep + parsed.prefix + parsed.count + process_id_str + file_extension 

334 

335 return cur_dir_path, cur_file_path 

336 

337def has_supplier_prefix(res: URIRef, base_iri: str) -> bool: 

338 string_iri: str = str(res) 

339 return re.search(r"^%s[a-z][a-z]/0" % base_iri, string_iri) is not None 

340 

341def build_graph_from_results(results: List[Dict]) -> Graph: 

342 graph = Graph() 

343 for triple in results: 

344 s = sparql_binding_to_term(triple['s']) 

345 p = sparql_binding_to_term(triple['p']) 

346 o = sparql_binding_to_term(triple['o']) 

347 graph.add((s, p, o)) 

348 return graph 

349 

350 

351def is_dataset(res: URIRef) -> bool: 

352 string_iri: str = str(res) 

353 return re.search(r"^.+/[0-9]+(-[0-9]+)?(/[0-9]+)?$", string_iri) is None