Coverage for oc_ocdm / support / support.py: 90%

217 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-08 20:23 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7# -*- coding: utf-8 -*- 

8from __future__ import annotations 

9 

10import os 

11import re 

12from dataclasses import dataclass 

13from datetime import datetime 

14from functools import lru_cache 

15from typing import TYPE_CHECKING 

16from urllib.parse import quote 

17 

18from rdflib import Graph, Literal 

19from rdflib.namespace import XSD as _RDFLIB_XSD 

20from triplelite import XSD_STRING, RDFTerm, TripleLite 

21 

22from oc_ocdm.constants import RDF_TYPE, XSD_DATE, XSD_GYEAR, XSD_GYEARMONTH, XSD_STRING 

23 

24_RDFLIB_XSD_STRING = _RDFLIB_XSD.string 

25 

26if TYPE_CHECKING: 

27 from typing import Dict, List, Optional, Set, Tuple 

28 

29 from oc_ocdm.graph.entities.bibliographic.agent_role import AgentRole 

30 from oc_ocdm.graph.entities.bibliographic.bibliographic_resource import BibliographicResource 

31 from oc_ocdm.graph.entities.bibliographic.responsible_agent import ResponsibleAgent 

32 

33 

34@dataclass 

35class ParsedURI: 

36 base_iri: str 

37 short_name: str 

38 prefix: str 

39 count: str 

40 is_prov: bool 

41 prov_subject_short_name: str 

42 prov_subject_prefix: str 

43 prov_subject_count: str 

44 

45 

46def sparql_binding_to_rdfterm(binding: dict) -> RDFTerm: 

47 if binding['type'] == 'uri': 

48 return RDFTerm("uri", binding['value']) 

49 datatype = binding.get('datatype', '') 

50 lang = binding.get('xml:lang', '') 

51 if not datatype and not lang: 

52 datatype = XSD_STRING 

53 return RDFTerm("literal", binding['value'], datatype, lang) 

54 

55 

56def normalize_graph_literals(g: Graph) -> None: 

57 triples_to_update = [] 

58 for s, p, o in g: 

59 if isinstance(o, Literal) and o.datatype is None and o.language is None: 

60 triples_to_update.append((s, p, o, Literal(str(o), datatype=_RDFLIB_XSD_STRING))) 

61 for s, p, old_o, new_o in triples_to_update: 

62 g.remove((s, p, old_o)) 

63 g.add((s, p, new_o)) 

64 

65 

66def create_date(date_list: Optional[List[Optional[int]]] = None) -> Optional[str]: 

67 string: Optional[str] = None 

68 if date_list is not None: 

69 l_date_list: int = len(date_list) 

70 if l_date_list != 0 and date_list[0] is not None: 

71 if l_date_list == 3 and date_list[1] is not None and date_list[2] is not None and \ 

72 (date_list[1] != 1 or date_list[2] != 1): 

73 string = datetime(date_list[0], date_list[1], date_list[2]).strftime('%Y-%m-%d') 

74 elif l_date_list >= 2 and date_list[1] is not None: 

75 string = datetime(date_list[0], date_list[1], 1).strftime('%Y-%m') 

76 else: 

77 string = datetime(date_list[0], 1, 1).strftime('%Y') 

78 return string 

79 

80 

81def get_datatype_from_iso_8601(string: str) -> Tuple[str, str]: 

82 # Keep only the "yyyy-mm-dd" part of the string 

83 string = string[:10] 

84 

85 try: 

86 date_parts: List[int] = [int(s) for s in string.split(sep='-', maxsplit=2)] 

87 except ValueError: 

88 raise ValueError("The provided date string is not ISO-8601 compliant!") 

89 

90 num_of_parts: int = len(date_parts) 

91 if num_of_parts == 3: 

92 return XSD_DATE, datetime(date_parts[0], date_parts[1], date_parts[2]).strftime('%Y-%m-%d') 

93 elif num_of_parts == 2: 

94 return XSD_GYEARMONTH, datetime(date_parts[0], date_parts[1], 1).strftime('%Y-%m') 

95 else: 

96 return XSD_GYEAR, datetime(date_parts[0], 1, 1).strftime('%Y') 

97 

98def get_ordered_contributors_from_br(br: BibliographicResource, 

99 contributor_type: str): 

100 

101 ar_list: List[AgentRole] = br.get_contributors() 

102 

103 list_id: int = 0 

104 heads: Dict[str, Dict] = {} 

105 tails: Dict[str, Dict] = {} 

106 sub_lists: List[Dict] = [] 

107 from_id_to_res_in_heads: Dict[int, str] = {} 

108 for ar in ar_list: 

109 role_type: Optional[str] = ar.get_role_type() 

110 ra: Optional[ResponsibleAgent] = ar.get_is_held_by() 

111 next_ar: Optional[AgentRole] = ar.get_next() 

112 if next_ar is not None: 

113 next_ar_res: Optional[str] = next_ar.res 

114 else: 

115 next_ar_res: Optional[str] = None 

116 

117 if role_type is not None and role_type == str(contributor_type) and ra is not None: 

118 if next_ar_res is not None and next_ar_res in heads: 

119 sub_list: Dict = heads[next_ar_res] 

120 sub_list['list'].insert(0, ra) 

121 del heads[next_ar_res] 

122 heads[ar.res] = sub_list 

123 from_id_to_res_in_heads[sub_list['id']] = ar.res 

124 elif ar.res is not None and ar.res in tails: 

125 sub_list: Dict = tails[ar.res] 

126 sub_list['list'].append(ra) 

127 del tails[ar.res] 

128 

129 if next_ar_res is not None: 

130 tails[next_ar_res] = sub_list 

131 else: 

132 # This AR cannot be inserted into any list, so 

133 # we need to create an entirely new list for it: 

134 sub_list: Dict = {'id': list_id, 'list': [ra]} 

135 list_id += 1 

136 sub_lists.append(sub_list) 

137 

138 heads[ar.res] = sub_list 

139 from_id_to_res_in_heads[sub_list['id']] = ar.res 

140 if next_ar_res is not None: 

141 tails[next_ar_res] = sub_list 

142 

143 ids_in_heads: Set[int] = {val['id'] for val in heads.values()} 

144 ids_in_tails: Set[int] = {val['id'] for val in tails.values()} 

145 diff_set: Set[int] = ids_in_heads - ids_in_tails 

146 if len(diff_set) == 0: 

147 # No contributor was found! 

148 return [] 

149 elif len(diff_set) != 1: 

150 raise ValueError('A malformed list of AgentRole entities was given.') 

151 else: 

152 result_list: List[ResponsibleAgent] = [] 

153 cur_id: int = diff_set.pop() 

154 already_merged_list_ids: Set[int] = set() 

155 finished: bool = False 

156 while not finished: 

157 found: bool = False 

158 if cur_id in from_id_to_res_in_heads: 

159 res: str = from_id_to_res_in_heads[cur_id] 

160 subl: Dict = heads[res] 

161 subl_id: int = subl['id'] 

162 if subl_id not in already_merged_list_ids: 

163 found = True 

164 already_merged_list_ids.add(subl_id) 

165 result_list = subl['list'] + result_list 

166 

167 # Now we need to get the next cur_id value: 

168 if res in tails: 

169 cur_id = tails[res]['id'] 

170 else: 

171 finished = True 

172 

173 if not found: 

174 raise ValueError('A malformed list of AgentRole entities was given.') 

175 

176 unmerged_list_ids: Set[int] = ids_in_heads - already_merged_list_ids 

177 if len(unmerged_list_ids) != 0: 

178 raise ValueError('A malformed list of AgentRole entities was given.') 

179 

180 return result_list 

181 

182 

183def encode_url(u: str) -> str: 

184 return quote(u, "://") 

185 

186 

187def create_literal(g: TripleLite, res: str, p: str, s: str, dt: str | None = None, nor: bool = True) -> None: 

188 if not is_string_empty(s): 

189 g.add((res, p, RDFTerm("literal", s, dt if dt is not None else XSD_STRING))) 

190 

191 

192def create_type(g: TripleLite, res: str, res_type: str) -> None: 

193 g.add((res, RDF_TYPE, RDFTerm("uri", res_type))) 

194 

195 

196def is_string_empty(string: Optional[str]) -> bool: 

197 return string is None or string.strip() == "" 

198 

199 

200# Variable used in several functions 

201entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))$" 

202prov_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))/prov/([a-z][a-z])/([1-9][0-9]*)$" 

203 

204_compiled_entity_regex = re.compile(entity_regex) 

205_compiled_prov_regex = re.compile(prov_regex) 

206 

207 

208@lru_cache(maxsize=4096) 

209def parse_uri(res: str) -> ParsedURI: 

210 string_iri = str(res) 

211 if "/prov/" in string_iri: 

212 match = _compiled_prov_regex.match(string_iri) 

213 if match: 

214 return ParsedURI( 

215 base_iri=match.group(1), 

216 short_name=match.group(5), 

217 prefix="", 

218 count=match.group(6), 

219 is_prov=True, 

220 prov_subject_short_name=match.group(2), 

221 prov_subject_prefix=match.group(3) or "", 

222 prov_subject_count=match.group(4), 

223 ) 

224 else: 

225 match = _compiled_entity_regex.match(string_iri) 

226 if match: 

227 return ParsedURI( 

228 base_iri=match.group(1), 

229 short_name=match.group(2), 

230 prefix=match.group(3) or "", 

231 count=match.group(4), 

232 is_prov=False, 

233 prov_subject_short_name="", 

234 prov_subject_prefix="", 

235 prov_subject_count="", 

236 ) 

237 return ParsedURI("", "", "", "", False, "", "", "") 

238 

239 

240def get_base_iri(res: str) -> str: 

241 return parse_uri(res).base_iri 

242 

243 

244def get_short_name(res: str) -> str: 

245 return parse_uri(res).short_name 

246 

247 

248def get_prefix(res: str) -> str: 

249 return parse_uri(res).prefix 

250 

251 

252def get_count(res: str) -> str: 

253 return parse_uri(res).count 

254 

255 

256def get_resource_number(res: str) -> int: 

257 parsed = parse_uri(res) 

258 count = parsed.prov_subject_count if parsed.is_prov else parsed.count 

259 return int(count) if count else 0 

260 

261 

262def find_local_line_id(res: str, n_file_item: int = 1) -> int: 

263 cur_number: int = get_resource_number(res) 

264 

265 cur_file_split: int = 0 

266 while True: 

267 if cur_number > cur_file_split: 

268 cur_file_split += n_file_item 

269 else: 

270 cur_file_split -= n_file_item 

271 break 

272 

273 return cur_number - cur_file_split 

274 

275 

276def find_paths(res: str, base_dir: str, base_iri: str, default_dir: str, dir_split: int, 

277 n_file_item: int, is_json: bool = True, process_id: int|str|None = None) -> Tuple[str, str]: 

278 """ 

279 This function is responsible for looking for the correct JSON file that contains the data related to the 

280 resource identified by the variable 'string_iri'. This search takes into account the organisation in 

281 directories and files, as well as the particular supplier prefix for bibliographic entities, if specified. 

282 In case no supplier prefix is specified, the 'default_dir' (usually set to "_") is used instead. 

283 """ 

284 string_iri: str = str(res) 

285 process_id_str: str = f"_{process_id}" if process_id else "" 

286 

287 if is_dataset(res): 

288 cur_dir_path: str = (base_dir + re.sub(r"^%s(.*)$" % base_iri, r"\1", string_iri))[:-1] 

289 cur_file_path: str = cur_dir_path + os.sep + "index" + process_id_str + ".json" 

290 return cur_dir_path, cur_file_path 

291 

292 parsed = parse_uri(res) 

293 cur_number: int = int(parsed.prov_subject_count) if parsed.is_prov else int(parsed.count) 

294 

295 cur_file_split: int = ((cur_number - 1) // n_file_item + 1) * n_file_item if cur_number > 0 else n_file_item 

296 

297 if dir_split and not string_iri.startswith(base_iri + "prov/"): 

298 cur_split: int = ((cur_number - 1) // dir_split + 1) * dir_split if cur_number > 0 else dir_split 

299 

300 if parsed.is_prov: 

301 sub_folder = parsed.prov_subject_prefix or default_dir or "_" 

302 file_extension = '.json' if is_json else '.nq' 

303 cur_dir_path = base_dir + parsed.prov_subject_short_name + os.sep + sub_folder + \ 

304 os.sep + str(cur_split) + os.sep + str(cur_file_split) + os.sep + "prov" 

305 cur_file_path = cur_dir_path + os.sep + parsed.short_name + process_id_str + file_extension 

306 else: 

307 sub_folder = parsed.prefix or default_dir or "_" 

308 file_extension = '.json' if is_json else '.nt' 

309 cur_dir_path = base_dir + parsed.short_name + os.sep + sub_folder + os.sep + str(cur_split) 

310 cur_file_path = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension 

311 elif dir_split == 0: 

312 if parsed.is_prov: 

313 sub_folder = parsed.prov_subject_prefix or default_dir or "_" 

314 file_extension = '.json' if is_json else '.nq' 

315 cur_dir_path = base_dir + parsed.prov_subject_short_name + os.sep + sub_folder + \ 

316 os.sep + str(cur_file_split) + os.sep + "prov" 

317 cur_file_path = cur_dir_path + os.sep + parsed.short_name + process_id_str + file_extension 

318 else: 

319 sub_folder = parsed.prefix or default_dir or "_" 

320 file_extension = '.json' if is_json else '.nt' 

321 cur_dir_path = base_dir + parsed.short_name + os.sep + sub_folder 

322 cur_file_path = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension 

323 else: 

324 file_extension = '.json' if is_json else '.nq' 

325 cur_dir_path = base_dir + parsed.short_name 

326 cur_file_path = cur_dir_path + os.sep + parsed.prefix + parsed.count + process_id_str + file_extension 

327 

328 return cur_dir_path, cur_file_path 

329 

330def has_supplier_prefix(res: str, base_iri: str) -> bool: 

331 string_iri: str = str(res) 

332 return re.search(r"^%s[a-z][a-z]/0" % base_iri, string_iri) is not None 

333 

334def build_graph_from_results(results: List[Dict]) -> TripleLite: 

335 graph = TripleLite() 

336 for triple in results: 

337 graph.add((triple['s']['value'], triple['p']['value'], sparql_binding_to_rdfterm(triple['o']))) 

338 return graph 

339 

340 

341def is_dataset(res: str) -> bool: 

342 string_iri: str = str(res) 

343 return re.search(r"^.+/[0-9]+(-[0-9]+)?(/[0-9]+)?$", string_iri) is None