Coverage for oc_ocdm/support/support.py: 80%

215 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-12-05 23:58 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16from __future__ import annotations 

17 

18import os 

19import re 

20from dataclasses import dataclass 

21from datetime import datetime 

22from functools import lru_cache 

23from typing import TYPE_CHECKING 

24from rdflib import URIRef, Graph 

25 

26if TYPE_CHECKING: 

27 from typing import Optional, List, Tuple, Match, Dict, Set 

28 from oc_ocdm.graph.entities.bibliographic.bibliographic_resource import BibliographicResource 

29 from oc_ocdm.graph.entities.bibliographic.responsible_agent import ResponsibleAgent 

30 from oc_ocdm.graph.entities.bibliographic.agent_role import AgentRole 

31 

32from urllib.parse import quote 

33 

34 

35@dataclass 

36class ParsedURI: 

37 base_iri: str 

38 short_name: str 

39 prefix: str 

40 count: str 

41 is_prov: bool 

42 prov_subject_short_name: str 

43 prov_subject_prefix: str 

44 prov_subject_count: str 

45 

46from rdflib import RDF, XSD, Literal 

47 

48 

49def create_date(date_list: List[Optional[int]] = None) -> Optional[str]: 

50 string: Optional[str] = None 

51 if date_list is not None: 

52 l_date_list: int = len(date_list) 

53 if l_date_list != 0 and date_list[0] is not None: 

54 if l_date_list == 3 and \ 

55 ((date_list[1] is not None and date_list[1] != 1) or 

56 (date_list[2] is not None and date_list[2] != 1)): 

57 string = datetime(date_list[0], date_list[1], date_list[2]).strftime('%Y-%m-%d') 

58 elif l_date_list == 2 and date_list[1] is not None: 

59 string = datetime(date_list[0], date_list[1], 1).strftime('%Y-%m') 

60 else: 

61 string = datetime(date_list[0], 1, 1).strftime('%Y') 

62 return string 

63 

64 

65def get_datatype_from_iso_8601(string: str) -> Tuple[URIRef, str]: 

66 # Keep only the "yyyy-mm-dd" part of the string 

67 string = string[:10] 

68 

69 try: 

70 date_parts: List[int] = [int(s) for s in string.split(sep='-', maxsplit=2)] 

71 except ValueError: 

72 raise ValueError("The provided date string is not ISO-8601 compliant!") 

73 

74 num_of_parts: int = len(date_parts) 

75 if num_of_parts == 3: 

76 return XSD.date, datetime(*date_parts).strftime('%Y-%m-%d') 

77 elif num_of_parts == 2: 

78 return XSD.gYearMonth, datetime(*date_parts, 1).strftime('%Y-%m') 

79 else: 

80 return XSD.gYear, datetime(*date_parts, 1, 1).strftime('%Y') 

81 

82def get_ordered_contributors_from_br(br: BibliographicResource, 

83 contributor_type: URIRef): 

84 

85 ar_list: List[AgentRole] = br.get_contributors() 

86 

87 list_id: int = 0 

88 heads: Dict[URIRef, Dict] = {} 

89 tails: Dict[URIRef, Dict] = {} 

90 sub_lists: List[Dict] = [] 

91 from_id_to_res_in_heads: Dict[int, URIRef] = {} 

92 for ar in ar_list: 

93 role_type: URIRef = ar.get_role_type() 

94 ra: ResponsibleAgent = ar.get_is_held_by() 

95 next_ar: AgentRole = ar.get_next() 

96 if next_ar is not None: 

97 next_ar_res: Optional[URIRef] = next_ar.res 

98 else: 

99 next_ar_res: Optional[URIRef] = None 

100 

101 if role_type is not None and role_type == contributor_type and ra is not None: 

102 if next_ar_res is not None and next_ar_res in heads: 

103 sub_list: Dict = heads[next_ar_res] 

104 sub_list['list'].insert(0, ra) 

105 del heads[next_ar_res] 

106 heads[ar.res] = sub_list 

107 from_id_to_res_in_heads[sub_list['id']] = ar.res 

108 elif ar.res is not None and ar.res in tails: 

109 sub_list: Dict = tails[ar.res] 

110 sub_list['list'].append(ra) 

111 del tails[ar.res] 

112 

113 if next_ar_res is not None: 

114 tails[next_ar_res] = sub_list 

115 else: 

116 # This AR cannot be inserted into any list, so 

117 # we need to create an entirely new list for it: 

118 sub_list: Dict = {'id': list_id, 'list': [ra]} 

119 list_id += 1 

120 sub_lists.append(sub_list) 

121 

122 heads[ar.res] = sub_list 

123 from_id_to_res_in_heads[sub_list['id']] = ar.res 

124 if next_ar_res is not None: 

125 tails[next_ar_res] = sub_list 

126 

127 ids_in_heads: Set[int] = {val['id'] for val in heads.values()} 

128 ids_in_tails: Set[int] = {val['id'] for val in tails.values()} 

129 diff_set: Set[int] = ids_in_heads - ids_in_tails 

130 if len(diff_set) == 0: 

131 # No contributor was found! 

132 return [] 

133 elif len(diff_set) != 1: 

134 raise ValueError('A malformed list of AgentRole entities was given.') 

135 else: 

136 result_list: List[ResponsibleAgent] = [] 

137 cur_id: int = diff_set.pop() 

138 already_merged_list_ids: Set[int] = set() 

139 finished: bool = False 

140 while not finished: 

141 found: bool = False 

142 if cur_id in from_id_to_res_in_heads: 

143 res: URIRef = from_id_to_res_in_heads[cur_id] 

144 subl: Dict = heads[res] 

145 subl_id: int = subl['id'] 

146 if subl_id not in already_merged_list_ids: 

147 found = True 

148 already_merged_list_ids.add(subl_id) 

149 result_list = subl['list'] + result_list 

150 

151 # Now we need to get the next cur_id value: 

152 if res in tails: 

153 cur_id = tails[res]['id'] 

154 else: 

155 finished = True 

156 

157 if not found: 

158 raise ValueError('A malformed list of AgentRole entities was given.') 

159 

160 unmerged_list_ids: Set[int] = ids_in_heads - already_merged_list_ids 

161 if len(unmerged_list_ids) != 0: 

162 raise ValueError('A malformed list of AgentRole entities was given.') 

163 

164 return result_list 

165 

166 

167def encode_url(u: str) -> str: 

168 return quote(u, "://") 

169 

170 

171def create_literal(g: Graph, res: URIRef, p: URIRef, s: str, dt: URIRef = None, nor: bool = True) -> None: 

172 if not is_string_empty(s): 

173 dt = dt if dt is not None else XSD.string 

174 g.add((res, p, Literal(s, datatype=dt, normalize=nor))) 

175 

176 

177def create_type(g: Graph, res: URIRef, res_type: URIRef) -> None: 

178 g.add((res, RDF.type, res_type)) 

179 

180 

181def is_string_empty(string: str) -> bool: 

182 return string is None or string.strip() == "" 

183 

184 

185# Variable used in several functions 

186entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))$" 

187prov_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))/prov/([a-z][a-z])/([1-9][0-9]*)$" 

188 

189_compiled_entity_regex = re.compile(entity_regex) 

190_compiled_prov_regex = re.compile(prov_regex) 

191 

192 

193@lru_cache(maxsize=4096) 

194def parse_uri(res: URIRef) -> ParsedURI: 

195 string_iri = str(res) 

196 if "/prov/" in string_iri: 

197 match = _compiled_prov_regex.match(string_iri) 

198 if match: 

199 return ParsedURI( 

200 base_iri=match.group(1), 

201 short_name=match.group(5), 

202 prefix="", 

203 count=match.group(6), 

204 is_prov=True, 

205 prov_subject_short_name=match.group(2), 

206 prov_subject_prefix=match.group(3) or "", 

207 prov_subject_count=match.group(4), 

208 ) 

209 else: 

210 match = _compiled_entity_regex.match(string_iri) 

211 if match: 

212 return ParsedURI( 

213 base_iri=match.group(1), 

214 short_name=match.group(2), 

215 prefix=match.group(3) or "", 

216 count=match.group(4), 

217 is_prov=False, 

218 prov_subject_short_name="", 

219 prov_subject_prefix="", 

220 prov_subject_count="", 

221 ) 

222 return ParsedURI("", "", "", "", False, "", "", "") 

223 

224 

225def get_base_iri(res: URIRef) -> str: 

226 return parse_uri(res).base_iri 

227 

228 

229def get_short_name(res: URIRef) -> str: 

230 return parse_uri(res).short_name 

231 

232 

233def get_prefix(res: URIRef) -> str: 

234 return parse_uri(res).prefix 

235 

236 

237def get_count(res: URIRef) -> str: 

238 return parse_uri(res).count 

239 

240 

241def get_resource_number(res: URIRef) -> int: 

242 parsed = parse_uri(res) 

243 count = parsed.prov_subject_count if parsed.is_prov else parsed.count 

244 return int(count) if count else 0 

245 

246 

247def find_local_line_id(res: URIRef, n_file_item: int = 1) -> int: 

248 cur_number: int = get_resource_number(res) 

249 

250 cur_file_split: int = 0 

251 while True: 

252 if cur_number > cur_file_split: 

253 cur_file_split += n_file_item 

254 else: 

255 cur_file_split -= n_file_item 

256 break 

257 

258 return cur_number - cur_file_split 

259 

260 

261def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_split: int, 

262 n_file_item: int, is_json: bool = True, process_id: int|str = None) -> Tuple[str, str]: 

263 """ 

264 This function is responsible for looking for the correct JSON file that contains the data related to the 

265 resource identified by the variable 'string_iri'. This search takes into account the organisation in 

266 directories and files, as well as the particular supplier prefix for bibliographic entities, if specified. 

267 In case no supplier prefix is specified, the 'default_dir' (usually set to "_") is used instead. 

268 """ 

269 string_iri: str = str(res) 

270 process_id_str: str = f"_{process_id}" if process_id else "" 

271 

272 if is_dataset(res): 

273 cur_dir_path: str = (base_dir + re.sub(r"^%s(.*)$" % base_iri, r"\1", string_iri))[:-1] 

274 cur_file_path: str = cur_dir_path + os.sep + "index" + process_id_str + ".json" 

275 return cur_dir_path, cur_file_path 

276 

277 parsed = parse_uri(res) 

278 cur_number: int = int(parsed.prov_subject_count) if parsed.is_prov else int(parsed.count) 

279 

280 cur_file_split: int = 0 

281 while cur_number > cur_file_split: 

282 cur_file_split += n_file_item 

283 

284 if dir_split and not string_iri.startswith(base_iri + "prov/"): 

285 cur_split: int = 0 

286 while cur_number > cur_split: 

287 cur_split += dir_split 

288 

289 if parsed.is_prov: 

290 sub_folder = parsed.prov_subject_prefix or default_dir or "_" 

291 file_extension = '.json' if is_json else '.nq' 

292 cur_dir_path = base_dir + parsed.prov_subject_short_name + os.sep + sub_folder + \ 

293 os.sep + str(cur_split) + os.sep + str(cur_file_split) + os.sep + "prov" 

294 cur_file_path = cur_dir_path + os.sep + parsed.short_name + process_id_str + file_extension 

295 else: 

296 sub_folder = parsed.prefix or default_dir or "_" 

297 file_extension = '.json' if is_json else '.nt' 

298 cur_dir_path = base_dir + parsed.short_name + os.sep + sub_folder + os.sep + str(cur_split) 

299 cur_file_path = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension 

300 elif dir_split == 0: 

301 if parsed.is_prov: 

302 sub_folder = parsed.prov_subject_prefix or default_dir or "_" 

303 file_extension = '.json' if is_json else '.nq' 

304 cur_dir_path = base_dir + parsed.prov_subject_short_name + os.sep + sub_folder + \ 

305 os.sep + str(cur_file_split) + os.sep + "prov" 

306 cur_file_path = cur_dir_path + os.sep + parsed.short_name + process_id_str + file_extension 

307 else: 

308 sub_folder = parsed.prefix or default_dir or "_" 

309 file_extension = '.json' if is_json else '.nt' 

310 cur_dir_path = base_dir + parsed.short_name + os.sep + sub_folder 

311 cur_file_path = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension 

312 else: 

313 file_extension = '.json' if is_json else '.nq' 

314 cur_dir_path = base_dir + parsed.short_name 

315 cur_file_path = cur_dir_path + os.sep + parsed.prefix + parsed.count + process_id_str + file_extension 

316 

317 return cur_dir_path, cur_file_path 

318 

319def has_supplier_prefix(res: URIRef, base_iri: str) -> bool: 

320 string_iri: str = str(res) 

321 return re.search(r"^%s[a-z][a-z]/0" % base_iri, string_iri) is not None 

322 

323def build_graph_from_results(results: List[Dict]) -> Graph: 

324 graph = Graph() 

325 for triple in results: 

326 s = URIRef(triple['s']['value']) 

327 p = URIRef(triple['p']['value']) 

328 if triple['o']['type'] == 'uri': 

329 o = URIRef(triple['o']['value']) 

330 else: 

331 datatype = triple['o'].get('datatype', None) 

332 datatype = URIRef(datatype) if datatype is not None else None 

333 o = Literal(triple['o']['value'], datatype=datatype) 

334 graph.add((s, p, o)) 

335 return graph 

336 

337 

338def is_dataset(res: URIRef) -> bool: 

339 string_iri: str = str(res) 

340 return re.search(r"^.+/[0-9]+(-[0-9]+)?(/[0-9]+)?$", string_iri) is None