Coverage for oc_ocdm/support/support.py: 76%

250 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-05-30 22:05 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16from __future__ import annotations 

17 

18import os 

19import re 

20from datetime import datetime 

21from typing import TYPE_CHECKING 

22from rdflib import URIRef, Graph 

23 

24if TYPE_CHECKING: 

25 from typing import Optional, List, Tuple, Match, Dict, Set 

26 from oc_ocdm.graph.entities.bibliographic.bibliographic_resource import BibliographicResource 

27 from oc_ocdm.graph.entities.bibliographic.responsible_agent import ResponsibleAgent 

28 from oc_ocdm.graph.entities.bibliographic.agent_role import AgentRole 

29 

30from urllib.parse import quote 

31 

32from rdflib import RDF, XSD, Literal 

33 

34 

35def create_date(date_list: List[Optional[int]] = None) -> Optional[str]: 

36 string: Optional[str] = None 

37 if date_list is not None: 

38 l_date_list: int = len(date_list) 

39 if l_date_list != 0 and date_list[0] is not None: 

40 if l_date_list == 3 and \ 

41 ((date_list[1] is not None and date_list[1] != 1) or 

42 (date_list[2] is not None and date_list[2] != 1)): 

43 string = datetime(date_list[0], date_list[1], date_list[2]).strftime('%Y-%m-%d') 

44 elif l_date_list == 2 and date_list[1] is not None: 

45 string = datetime(date_list[0], date_list[1], 1).strftime('%Y-%m') 

46 else: 

47 string = datetime(date_list[0], 1, 1).strftime('%Y') 

48 return string 

49 

50 

51def get_datatype_from_iso_8601(string: str) -> Tuple[URIRef, str]: 

52 # Keep only the "yyyy-mm-dd" part of the string 

53 string = string[:10] 

54 

55 try: 

56 date_parts: List[int] = [int(s) for s in string.split(sep='-', maxsplit=2)] 

57 except ValueError: 

58 raise ValueError("The provided date string is not ISO-8601 compliant!") 

59 

60 num_of_parts: int = len(date_parts) 

61 if num_of_parts == 3: 

62 return XSD.date, datetime(*date_parts).strftime('%Y-%m-%d') 

63 elif num_of_parts == 2: 

64 return XSD.gYearMonth, datetime(*date_parts, 1).strftime('%Y-%m') 

65 else: 

66 return XSD.gYear, datetime(*date_parts, 1, 1).strftime('%Y') 

67 

68def get_ordered_contributors_from_br(br: BibliographicResource, 

69 contributor_type: URIRef): 

70 

71 ar_list: List[AgentRole] = br.get_contributors() 

72 

73 list_id: int = 0 

74 heads: Dict[URIRef, Dict] = {} 

75 tails: Dict[URIRef, Dict] = {} 

76 sub_lists: List[Dict] = [] 

77 from_id_to_res_in_heads: Dict[int, URIRef] = {} 

78 for ar in ar_list: 

79 role_type: URIRef = ar.get_role_type() 

80 ra: ResponsibleAgent = ar.get_is_held_by() 

81 next_ar: AgentRole = ar.get_next() 

82 if next_ar is not None: 

83 next_ar_res: Optional[URIRef] = next_ar.res 

84 else: 

85 next_ar_res: Optional[URIRef] = None 

86 

87 if role_type is not None and role_type == contributor_type and ra is not None: 

88 if next_ar_res is not None and next_ar_res in heads: 

89 sub_list: Dict = heads[next_ar_res] 

90 sub_list['list'].insert(0, ra) 

91 del heads[next_ar_res] 

92 heads[ar.res] = sub_list 

93 from_id_to_res_in_heads[sub_list['id']] = ar.res 

94 elif ar.res is not None and ar.res in tails: 

95 sub_list: Dict = tails[ar.res] 

96 sub_list['list'].append(ra) 

97 del tails[ar.res] 

98 

99 if next_ar_res is not None: 

100 tails[next_ar_res] = sub_list 

101 else: 

102 # This AR cannot be inserted into any list, so 

103 # we need to create an entirely new list for it: 

104 sub_list: Dict = {'id': list_id, 'list': [ra]} 

105 list_id += 1 

106 sub_lists.append(sub_list) 

107 

108 heads[ar.res] = sub_list 

109 from_id_to_res_in_heads[sub_list['id']] = ar.res 

110 if next_ar_res is not None: 

111 tails[next_ar_res] = sub_list 

112 

113 ids_in_heads: Set[int] = {val['id'] for val in heads.values()} 

114 ids_in_tails: Set[int] = {val['id'] for val in tails.values()} 

115 diff_set: Set[int] = ids_in_heads - ids_in_tails 

116 if len(diff_set) == 0: 

117 # No contributor was found! 

118 return [] 

119 elif len(diff_set) != 1: 

120 raise ValueError('A malformed list of AgentRole entities was given.') 

121 else: 

122 result_list: List[ResponsibleAgent] = [] 

123 cur_id: int = diff_set.pop() 

124 already_merged_list_ids: Set[int] = set() 

125 finished: bool = False 

126 while not finished: 

127 found: bool = False 

128 if cur_id in from_id_to_res_in_heads: 

129 res: URIRef = from_id_to_res_in_heads[cur_id] 

130 subl: Dict = heads[res] 

131 subl_id: int = subl['id'] 

132 if subl_id not in already_merged_list_ids: 

133 found = True 

134 already_merged_list_ids.add(subl_id) 

135 result_list = subl['list'] + result_list 

136 

137 # Now we need to get the next cur_id value: 

138 if res in tails: 

139 cur_id = tails[res]['id'] 

140 else: 

141 finished = True 

142 

143 if not found: 

144 raise ValueError('A malformed list of AgentRole entities was given.') 

145 

146 unmerged_list_ids: Set[int] = ids_in_heads - already_merged_list_ids 

147 if len(unmerged_list_ids) != 0: 

148 raise ValueError('A malformed list of AgentRole entities was given.') 

149 

150 return result_list 

151 

152 

153def encode_url(u: str) -> str: 

154 return quote(u, "://") 

155 

156 

157def create_literal(g: Graph, res: URIRef, p: URIRef, s: str, dt: URIRef = None, nor: bool = True) -> None: 

158 if not is_string_empty(s): 

159 dt = dt if dt is not None else XSD.string 

160 g.add((res, p, Literal(s, datatype=dt, normalize=nor))) 

161 

162 

163def create_type(g: Graph, res: URIRef, res_type: URIRef) -> None: 

164 g.add((res, RDF.type, res_type)) 

165 

166 

167def is_string_empty(string: str) -> bool: 

168 return string is None or string.strip() == "" 

169 

170 

171# Variable used in several functions 

172entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))$" 

173prov_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))/prov/([a-z][a-z])/([1-9][0-9]*)$" 

174 

175 

176def _get_match(regex: str, group: int, string: str) -> str: 

177 match: Match = re.match(regex, string) 

178 if match is not None: 

179 return match.group(group) 

180 else: 

181 return "" 

182 

183 

184def get_base_iri(res: URIRef) -> str: 

185 string_iri: str = str(res) 

186 if "/prov/" in string_iri: 

187 return _get_match(prov_regex, 1, string_iri) 

188 else: 

189 return _get_match(entity_regex, 1, string_iri) 

190 

191 

192def get_short_name(res: URIRef) -> str: 

193 string_iri: str = str(res) 

194 if "/prov/" in string_iri: 

195 return _get_match(prov_regex, 5, string_iri) 

196 else: 

197 return _get_match(entity_regex, 2, string_iri) 

198 

199 

200def get_prov_subject_short_name(prov_res: URIRef) -> str: 

201 string_iri: str = str(prov_res) 

202 if "/prov/" in string_iri: 

203 return _get_match(prov_regex, 2, string_iri) 

204 else: 

205 return "" # non-provenance entities do not have a prov_subject! 

206 

207 

208def get_prefix(res: URIRef) -> str: 

209 string_iri: str = str(res) 

210 if "/prov/" in string_iri: 

211 return "" # provenance entities cannot have a supplier prefix 

212 else: 

213 return _get_match(entity_regex, 3, string_iri) 

214 

215 

216def get_prov_subject_prefix(prov_res: URIRef) -> str: 

217 string_iri: str = str(prov_res) 

218 if "/prov/" in string_iri: 

219 return _get_match(prov_regex, 3, string_iri) 

220 else: 

221 return "" # non-provenance entities do not have a prov_subject! 

222 

223 

224def get_count(res: URIRef) -> str: 

225 string_iri: str = str(res) 

226 if "/prov/" in string_iri: 

227 return _get_match(prov_regex, 6, string_iri) 

228 else: 

229 return _get_match(entity_regex, 4, string_iri) 

230 

231 

232def get_prov_subject_count(prov_res: URIRef) -> str: 

233 string_iri: str = str(prov_res) 

234 if "/prov/" in string_iri: 

235 return _get_match(prov_regex, 4, string_iri) 

236 else: 

237 return "" # non-provenance entities do not have a prov_subject! 

238 

239 

240def get_resource_number(res: URIRef) -> int: 

241 string_iri: str = str(res) 

242 if "/prov/" in string_iri: 

243 return int(_get_match(prov_regex, 4, string_iri)) 

244 else: 

245 return int(_get_match(entity_regex, 4, string_iri)) 

246 

247 

248def find_local_line_id(res: URIRef, n_file_item: int = 1) -> int: 

249 cur_number: int = get_resource_number(res) 

250 

251 cur_file_split: int = 0 

252 while True: 

253 if cur_number > cur_file_split: 

254 cur_file_split += n_file_item 

255 else: 

256 cur_file_split -= n_file_item 

257 break 

258 

259 return cur_number - cur_file_split 

260 

261 

262def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_split: int, 

263 n_file_item: int, is_json: bool = True, process_id: int|str = None) -> Tuple[str, str]: 

264 """ 

265 This function is responsible for looking for the correct JSON file that contains the data related to the 

266 resource identified by the variable 'string_iri'. This search takes into account the organisation in 

267 directories and files, as well as the particular supplier prefix for bibliographic entities, if specified. 

268 In case no supplier prefix is specified, the 'default_dir' (usually set to "_") is used instead. 

269 """ 

270 string_iri: str = str(res) 

271 process_id_str: str = f"_{process_id}" if process_id else "" 

272 

273 if is_dataset(res): 

274 cur_dir_path: str = (base_dir + re.sub(r"^%s(.*)$" % base_iri, r"\1", string_iri))[:-1] 

275 # In case of dataset, the file path is different from regular files, e.g. 

276 # /corpus/br/index.json 

277 cur_file_path: str = cur_dir_path + os.sep + "index" + process_id_str + ".json" 

278 else: 

279 cur_number: int = get_resource_number(res) 

280 

281 # Find the correct file number where to save the resources 

282 cur_file_split: int = 0 

283 while True: 

284 if cur_number > cur_file_split: 

285 cur_file_split += n_file_item 

286 else: 

287 break 

288 

289 # The data have been split in multiple directories and it is not something related 

290 # with the provenance data of the whole corpus (e.g. provenance agents) 

291 if dir_split and not string_iri.startswith(base_iri + "prov/"): 

292 # Find the correct directory number where to save the file 

293 cur_split: int = 0 

294 while True: 

295 if cur_number > cur_split: 

296 cur_split += dir_split 

297 else: 

298 break 

299 

300 if "/prov/" in string_iri: # provenance file of a bibliographic entity 

301 subj_short_name: str = get_prov_subject_short_name(res) 

302 short_name: str = get_short_name(res) 

303 sub_folder: str = get_prov_subject_prefix(res) 

304 file_extension: str = '.json' if is_json else '.nq' 

305 if sub_folder == "": 

306 sub_folder = default_dir 

307 if sub_folder == "": 

308 sub_folder = "_" # enforce default value 

309 

310 cur_dir_path: str = base_dir + subj_short_name + os.sep + sub_folder + \ 

311 os.sep + str(cur_split) + os.sep + str(cur_file_split) + os.sep + "prov" 

312 cur_file_path: str = cur_dir_path + os.sep + short_name + process_id_str + file_extension 

313 else: # regular bibliographic entity 

314 short_name: str = get_short_name(res) 

315 sub_folder: str = get_prefix(res) 

316 file_extension: str = '.json' if is_json else '.nt' 

317 if sub_folder == "": 

318 sub_folder = default_dir 

319 if sub_folder == "": 

320 sub_folder = "_" # enforce default value 

321 

322 cur_dir_path: str = base_dir + short_name + os.sep + sub_folder + os.sep + str(cur_split) 

323 cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension 

324 # Enter here if no split is needed 

325 elif dir_split == 0: 

326 if "/prov/" in string_iri: 

327 subj_short_name: str = get_prov_subject_short_name(res) 

328 short_name: str = get_short_name(res) 

329 sub_folder: str = get_prov_subject_prefix(res) 

330 file_extension: str = '.json' if is_json else '.nq' 

331 if sub_folder == "": 

332 sub_folder = default_dir 

333 if sub_folder == "": 

334 sub_folder = "_" # enforce default value 

335 

336 cur_dir_path: str = base_dir + subj_short_name + os.sep + sub_folder + \ 

337 os.sep + str(cur_file_split) + os.sep + "prov" 

338 cur_file_path: str = cur_dir_path + os.sep + short_name + process_id_str + file_extension 

339 else: 

340 short_name: str = get_short_name(res) 

341 sub_folder: str = get_prefix(res) 

342 file_extension: str = '.json' if is_json else '.nt' 

343 if sub_folder == "": 

344 sub_folder = default_dir 

345 if sub_folder == "": 

346 sub_folder = "_" # enforce default value 

347 

348 cur_dir_path: str = base_dir + short_name + os.sep + sub_folder 

349 cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension 

350 # Enter here if the data is about a provenance agent, e.g. /corpus/prov/ 

351 else: 

352 short_name: str = get_short_name(res) 

353 prefix: str = get_prefix(res) 

354 count: str = get_count(res) 

355 file_extension: str = '.json' if is_json else '.nq' 

356 

357 cur_dir_path: str = base_dir + short_name 

358 cur_file_path: str = cur_dir_path + os.sep + prefix + count + process_id_str + file_extension 

359 

360 return cur_dir_path, cur_file_path 

361 

362def has_supplier_prefix(res: URIRef, base_iri: str) -> bool: 

363 string_iri: str = str(res) 

364 return re.search(r"^%s[a-z][a-z]/0" % base_iri, string_iri) is not None 

365 

366def build_graph_from_results(results: List[Dict]) -> Graph: 

367 graph = Graph() 

368 for triple in results: 

369 s = URIRef(triple['s']['value']) 

370 p = URIRef(triple['p']['value']) 

371 if triple['o']['type'] == 'uri': 

372 o = URIRef(triple['o']['value']) 

373 else: 

374 datatype = triple['o'].get('datatype', None) 

375 datatype = URIRef(datatype) if datatype is not None else None 

376 o = Literal(triple['o']['value'], datatype=datatype) 

377 graph.add((s, p, o)) 

378 return graph 

379 

380 

381def is_dataset(res: URIRef) -> bool: 

382 string_iri: str = str(res) 

383 return re.search(r"^.+/[0-9]+(-[0-9]+)?(/[0-9]+)?$", string_iri) is None