Coverage for oc_ocdm / support / support.py: 88%
225 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-28 18:52 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-28 18:52 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7# -*- coding: utf-8 -*-
8from __future__ import annotations
10import os
11import re
12from dataclasses import dataclass
13from datetime import datetime
14from functools import lru_cache
15from typing import TYPE_CHECKING
16from rdflib import URIRef, Graph
18if TYPE_CHECKING:
19 from typing import Optional, List, Tuple, Match, Dict, Set
20 from oc_ocdm.graph.entities.bibliographic.bibliographic_resource import BibliographicResource
21 from oc_ocdm.graph.entities.bibliographic.responsible_agent import ResponsibleAgent
22 from oc_ocdm.graph.entities.bibliographic.agent_role import AgentRole
24from urllib.parse import quote
27@dataclass
28class ParsedURI:
29 base_iri: str
30 short_name: str
31 prefix: str
32 count: str
33 is_prov: bool
34 prov_subject_short_name: str
35 prov_subject_prefix: str
36 prov_subject_count: str
38from rdflib import RDF, XSD, Literal
39from typing import Union
42def sparql_binding_to_term(binding: dict) -> Union[URIRef, Literal]:
43 """Convert a SPARQL JSON result binding to an rdflib term.
45 Per RDF 1.1, simple literals (no datatype, no language tag) are normalized to xsd:string.
46 """
47 if binding['type'] == 'uri':
48 return URIRef(binding['value'])
49 datatype = binding.get('datatype')
50 lang = binding.get('xml:lang')
51 if datatype is not None:
52 datatype = URIRef(datatype)
53 elif lang is None:
54 datatype = XSD.string
55 return Literal(binding['value'], datatype=datatype, lang=lang)
58def normalize_graph_literals(g: Graph) -> None:
59 triples_to_update = []
60 for s, p, o in g:
61 if isinstance(o, Literal) and o.datatype is None and o.language is None:
62 triples_to_update.append((s, p, o, Literal(str(o), datatype=XSD.string)))
63 for s, p, old_o, new_o in triples_to_update:
64 g.remove((s, p, old_o))
65 g.add((s, p, new_o))
68def create_date(date_list: Optional[List[Optional[int]]] = None) -> Optional[str]:
69 string: Optional[str] = None
70 if date_list is not None:
71 l_date_list: int = len(date_list)
72 if l_date_list != 0 and date_list[0] is not None:
73 if l_date_list == 3 and date_list[1] is not None and date_list[2] is not None and \
74 (date_list[1] != 1 or date_list[2] != 1):
75 string = datetime(date_list[0], date_list[1], date_list[2]).strftime('%Y-%m-%d')
76 elif l_date_list >= 2 and date_list[1] is not None:
77 string = datetime(date_list[0], date_list[1], 1).strftime('%Y-%m')
78 else:
79 string = datetime(date_list[0], 1, 1).strftime('%Y')
80 return string
83def get_datatype_from_iso_8601(string: str) -> Tuple[URIRef, str]:
84 # Keep only the "yyyy-mm-dd" part of the string
85 string = string[:10]
87 try:
88 date_parts: List[int] = [int(s) for s in string.split(sep='-', maxsplit=2)]
89 except ValueError:
90 raise ValueError("The provided date string is not ISO-8601 compliant!")
92 num_of_parts: int = len(date_parts)
93 if num_of_parts == 3:
94 return XSD.date, datetime(date_parts[0], date_parts[1], date_parts[2]).strftime('%Y-%m-%d')
95 elif num_of_parts == 2:
96 return XSD.gYearMonth, datetime(date_parts[0], date_parts[1], 1).strftime('%Y-%m')
97 else:
98 return XSD.gYear, datetime(date_parts[0], 1, 1).strftime('%Y')
100def get_ordered_contributors_from_br(br: BibliographicResource,
101 contributor_type: URIRef):
103 ar_list: List[AgentRole] = br.get_contributors()
105 list_id: int = 0
106 heads: Dict[URIRef, Dict] = {}
107 tails: Dict[URIRef, Dict] = {}
108 sub_lists: List[Dict] = []
109 from_id_to_res_in_heads: Dict[int, URIRef] = {}
110 for ar in ar_list:
111 role_type: Optional[URIRef] = ar.get_role_type()
112 ra: Optional[ResponsibleAgent] = ar.get_is_held_by()
113 next_ar: Optional[AgentRole] = ar.get_next()
114 if next_ar is not None:
115 next_ar_res: Optional[URIRef] = next_ar.res
116 else:
117 next_ar_res: Optional[URIRef] = None
119 if role_type is not None and role_type == contributor_type and ra is not None:
120 if next_ar_res is not None and next_ar_res in heads:
121 sub_list: Dict = heads[next_ar_res]
122 sub_list['list'].insert(0, ra)
123 del heads[next_ar_res]
124 heads[ar.res] = sub_list
125 from_id_to_res_in_heads[sub_list['id']] = ar.res
126 elif ar.res is not None and ar.res in tails:
127 sub_list: Dict = tails[ar.res]
128 sub_list['list'].append(ra)
129 del tails[ar.res]
131 if next_ar_res is not None:
132 tails[next_ar_res] = sub_list
133 else:
134 # This AR cannot be inserted into any list, so
135 # we need to create an entirely new list for it:
136 sub_list: Dict = {'id': list_id, 'list': [ra]}
137 list_id += 1
138 sub_lists.append(sub_list)
140 heads[ar.res] = sub_list
141 from_id_to_res_in_heads[sub_list['id']] = ar.res
142 if next_ar_res is not None:
143 tails[next_ar_res] = sub_list
145 ids_in_heads: Set[int] = {val['id'] for val in heads.values()}
146 ids_in_tails: Set[int] = {val['id'] for val in tails.values()}
147 diff_set: Set[int] = ids_in_heads - ids_in_tails
148 if len(diff_set) == 0:
149 # No contributor was found!
150 return []
151 elif len(diff_set) != 1:
152 raise ValueError('A malformed list of AgentRole entities was given.')
153 else:
154 result_list: List[ResponsibleAgent] = []
155 cur_id: int = diff_set.pop()
156 already_merged_list_ids: Set[int] = set()
157 finished: bool = False
158 while not finished:
159 found: bool = False
160 if cur_id in from_id_to_res_in_heads:
161 res: URIRef = from_id_to_res_in_heads[cur_id]
162 subl: Dict = heads[res]
163 subl_id: int = subl['id']
164 if subl_id not in already_merged_list_ids:
165 found = True
166 already_merged_list_ids.add(subl_id)
167 result_list = subl['list'] + result_list
169 # Now we need to get the next cur_id value:
170 if res in tails:
171 cur_id = tails[res]['id']
172 else:
173 finished = True
175 if not found:
176 raise ValueError('A malformed list of AgentRole entities was given.')
178 unmerged_list_ids: Set[int] = ids_in_heads - already_merged_list_ids
179 if len(unmerged_list_ids) != 0:
180 raise ValueError('A malformed list of AgentRole entities was given.')
182 return result_list
185def encode_url(u: str) -> str:
186 return quote(u, "://")
189def create_literal(g: Graph, res: URIRef, p: URIRef, s: str, dt: Optional[URIRef] = None, nor: bool = True) -> None:
190 if not is_string_empty(s):
191 dt = dt if dt is not None else XSD.string
192 g.add((res, p, Literal(s, datatype=dt, normalize=nor)))
195def create_type(g: Graph, res: URIRef, res_type: URIRef) -> None:
196 g.add((res, RDF.type, res_type))
199def is_string_empty(string: Optional[str]) -> bool:
200 return string is None or string.strip() == ""
203# Variable used in several functions
204entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))$"
205prov_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))/prov/([a-z][a-z])/([1-9][0-9]*)$"
207_compiled_entity_regex = re.compile(entity_regex)
208_compiled_prov_regex = re.compile(prov_regex)
211@lru_cache(maxsize=4096)
212def parse_uri(res: URIRef) -> ParsedURI:
213 string_iri = str(res)
214 if "/prov/" in string_iri:
215 match = _compiled_prov_regex.match(string_iri)
216 if match:
217 return ParsedURI(
218 base_iri=match.group(1),
219 short_name=match.group(5),
220 prefix="",
221 count=match.group(6),
222 is_prov=True,
223 prov_subject_short_name=match.group(2),
224 prov_subject_prefix=match.group(3) or "",
225 prov_subject_count=match.group(4),
226 )
227 else:
228 match = _compiled_entity_regex.match(string_iri)
229 if match:
230 return ParsedURI(
231 base_iri=match.group(1),
232 short_name=match.group(2),
233 prefix=match.group(3) or "",
234 count=match.group(4),
235 is_prov=False,
236 prov_subject_short_name="",
237 prov_subject_prefix="",
238 prov_subject_count="",
239 )
240 return ParsedURI("", "", "", "", False, "", "", "")
243def get_base_iri(res: URIRef) -> str:
244 return parse_uri(res).base_iri
247def get_short_name(res: URIRef) -> str:
248 return parse_uri(res).short_name
251def get_prefix(res: URIRef) -> str:
252 return parse_uri(res).prefix
255def get_count(res: URIRef) -> str:
256 return parse_uri(res).count
259def get_resource_number(res: URIRef) -> int:
260 parsed = parse_uri(res)
261 count = parsed.prov_subject_count if parsed.is_prov else parsed.count
262 return int(count) if count else 0
265def find_local_line_id(res: URIRef, n_file_item: int = 1) -> int:
266 cur_number: int = get_resource_number(res)
268 cur_file_split: int = 0
269 while True:
270 if cur_number > cur_file_split:
271 cur_file_split += n_file_item
272 else:
273 cur_file_split -= n_file_item
274 break
276 return cur_number - cur_file_split
279def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_split: int,
280 n_file_item: int, is_json: bool = True, process_id: int|str|None = None) -> Tuple[str, str]:
281 """
282 This function is responsible for looking for the correct JSON file that contains the data related to the
283 resource identified by the variable 'string_iri'. This search takes into account the organisation in
284 directories and files, as well as the particular supplier prefix for bibliographic entities, if specified.
285 In case no supplier prefix is specified, the 'default_dir' (usually set to "_") is used instead.
286 """
287 string_iri: str = str(res)
288 process_id_str: str = f"_{process_id}" if process_id else ""
290 if is_dataset(res):
291 cur_dir_path: str = (base_dir + re.sub(r"^%s(.*)$" % base_iri, r"\1", string_iri))[:-1]
292 cur_file_path: str = cur_dir_path + os.sep + "index" + process_id_str + ".json"
293 return cur_dir_path, cur_file_path
295 parsed = parse_uri(res)
296 cur_number: int = int(parsed.prov_subject_count) if parsed.is_prov else int(parsed.count)
298 cur_file_split: int = 0
299 while cur_number > cur_file_split:
300 cur_file_split += n_file_item
302 if dir_split and not string_iri.startswith(base_iri + "prov/"):
303 cur_split: int = 0
304 while cur_number > cur_split:
305 cur_split += dir_split
307 if parsed.is_prov:
308 sub_folder = parsed.prov_subject_prefix or default_dir or "_"
309 file_extension = '.json' if is_json else '.nq'
310 cur_dir_path = base_dir + parsed.prov_subject_short_name + os.sep + sub_folder + \
311 os.sep + str(cur_split) + os.sep + str(cur_file_split) + os.sep + "prov"
312 cur_file_path = cur_dir_path + os.sep + parsed.short_name + process_id_str + file_extension
313 else:
314 sub_folder = parsed.prefix or default_dir or "_"
315 file_extension = '.json' if is_json else '.nt'
316 cur_dir_path = base_dir + parsed.short_name + os.sep + sub_folder + os.sep + str(cur_split)
317 cur_file_path = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension
318 elif dir_split == 0:
319 if parsed.is_prov:
320 sub_folder = parsed.prov_subject_prefix or default_dir or "_"
321 file_extension = '.json' if is_json else '.nq'
322 cur_dir_path = base_dir + parsed.prov_subject_short_name + os.sep + sub_folder + \
323 os.sep + str(cur_file_split) + os.sep + "prov"
324 cur_file_path = cur_dir_path + os.sep + parsed.short_name + process_id_str + file_extension
325 else:
326 sub_folder = parsed.prefix or default_dir or "_"
327 file_extension = '.json' if is_json else '.nt'
328 cur_dir_path = base_dir + parsed.short_name + os.sep + sub_folder
329 cur_file_path = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension
330 else:
331 file_extension = '.json' if is_json else '.nq'
332 cur_dir_path = base_dir + parsed.short_name
333 cur_file_path = cur_dir_path + os.sep + parsed.prefix + parsed.count + process_id_str + file_extension
335 return cur_dir_path, cur_file_path
337def has_supplier_prefix(res: URIRef, base_iri: str) -> bool:
338 string_iri: str = str(res)
339 return re.search(r"^%s[a-z][a-z]/0" % base_iri, string_iri) is not None
341def build_graph_from_results(results: List[Dict]) -> Graph:
342 graph = Graph()
343 for triple in results:
344 s = sparql_binding_to_term(triple['s'])
345 p = sparql_binding_to_term(triple['p'])
346 o = sparql_binding_to_term(triple['o'])
347 graph.add((s, p, o))
348 return graph
351def is_dataset(res: URIRef) -> bool:
352 string_iri: str = str(res)
353 return re.search(r"^.+/[0-9]+(-[0-9]+)?(/[0-9]+)?$", string_iri) is None