Coverage for oc_ocdm / support / support.py: 90%
217 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-08 20:23 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-08 20:23 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7# -*- coding: utf-8 -*-
8from __future__ import annotations
10import os
11import re
12from dataclasses import dataclass
13from datetime import datetime
14from functools import lru_cache
15from typing import TYPE_CHECKING
16from urllib.parse import quote
18from rdflib import Graph, Literal
19from rdflib.namespace import XSD as _RDFLIB_XSD
20from triplelite import XSD_STRING, RDFTerm, TripleLite
22from oc_ocdm.constants import RDF_TYPE, XSD_DATE, XSD_GYEAR, XSD_GYEARMONTH, XSD_STRING
24_RDFLIB_XSD_STRING = _RDFLIB_XSD.string
26if TYPE_CHECKING:
27 from typing import Dict, List, Optional, Set, Tuple
29 from oc_ocdm.graph.entities.bibliographic.agent_role import AgentRole
30 from oc_ocdm.graph.entities.bibliographic.bibliographic_resource import BibliographicResource
31 from oc_ocdm.graph.entities.bibliographic.responsible_agent import ResponsibleAgent
34@dataclass
35class ParsedURI:
36 base_iri: str
37 short_name: str
38 prefix: str
39 count: str
40 is_prov: bool
41 prov_subject_short_name: str
42 prov_subject_prefix: str
43 prov_subject_count: str
46def sparql_binding_to_rdfterm(binding: dict) -> RDFTerm:
47 if binding['type'] == 'uri':
48 return RDFTerm("uri", binding['value'])
49 datatype = binding.get('datatype', '')
50 lang = binding.get('xml:lang', '')
51 if not datatype and not lang:
52 datatype = XSD_STRING
53 return RDFTerm("literal", binding['value'], datatype, lang)
56def normalize_graph_literals(g: Graph) -> None:
57 triples_to_update = []
58 for s, p, o in g:
59 if isinstance(o, Literal) and o.datatype is None and o.language is None:
60 triples_to_update.append((s, p, o, Literal(str(o), datatype=_RDFLIB_XSD_STRING)))
61 for s, p, old_o, new_o in triples_to_update:
62 g.remove((s, p, old_o))
63 g.add((s, p, new_o))
66def create_date(date_list: Optional[List[Optional[int]]] = None) -> Optional[str]:
67 string: Optional[str] = None
68 if date_list is not None:
69 l_date_list: int = len(date_list)
70 if l_date_list != 0 and date_list[0] is not None:
71 if l_date_list == 3 and date_list[1] is not None and date_list[2] is not None and \
72 (date_list[1] != 1 or date_list[2] != 1):
73 string = datetime(date_list[0], date_list[1], date_list[2]).strftime('%Y-%m-%d')
74 elif l_date_list >= 2 and date_list[1] is not None:
75 string = datetime(date_list[0], date_list[1], 1).strftime('%Y-%m')
76 else:
77 string = datetime(date_list[0], 1, 1).strftime('%Y')
78 return string
81def get_datatype_from_iso_8601(string: str) -> Tuple[str, str]:
82 # Keep only the "yyyy-mm-dd" part of the string
83 string = string[:10]
85 try:
86 date_parts: List[int] = [int(s) for s in string.split(sep='-', maxsplit=2)]
87 except ValueError:
88 raise ValueError("The provided date string is not ISO-8601 compliant!")
90 num_of_parts: int = len(date_parts)
91 if num_of_parts == 3:
92 return XSD_DATE, datetime(date_parts[0], date_parts[1], date_parts[2]).strftime('%Y-%m-%d')
93 elif num_of_parts == 2:
94 return XSD_GYEARMONTH, datetime(date_parts[0], date_parts[1], 1).strftime('%Y-%m')
95 else:
96 return XSD_GYEAR, datetime(date_parts[0], 1, 1).strftime('%Y')
98def get_ordered_contributors_from_br(br: BibliographicResource,
99 contributor_type: str):
101 ar_list: List[AgentRole] = br.get_contributors()
103 list_id: int = 0
104 heads: Dict[str, Dict] = {}
105 tails: Dict[str, Dict] = {}
106 sub_lists: List[Dict] = []
107 from_id_to_res_in_heads: Dict[int, str] = {}
108 for ar in ar_list:
109 role_type: Optional[str] = ar.get_role_type()
110 ra: Optional[ResponsibleAgent] = ar.get_is_held_by()
111 next_ar: Optional[AgentRole] = ar.get_next()
112 if next_ar is not None:
113 next_ar_res: Optional[str] = next_ar.res
114 else:
115 next_ar_res: Optional[str] = None
117 if role_type is not None and role_type == str(contributor_type) and ra is not None:
118 if next_ar_res is not None and next_ar_res in heads:
119 sub_list: Dict = heads[next_ar_res]
120 sub_list['list'].insert(0, ra)
121 del heads[next_ar_res]
122 heads[ar.res] = sub_list
123 from_id_to_res_in_heads[sub_list['id']] = ar.res
124 elif ar.res is not None and ar.res in tails:
125 sub_list: Dict = tails[ar.res]
126 sub_list['list'].append(ra)
127 del tails[ar.res]
129 if next_ar_res is not None:
130 tails[next_ar_res] = sub_list
131 else:
132 # This AR cannot be inserted into any list, so
133 # we need to create an entirely new list for it:
134 sub_list: Dict = {'id': list_id, 'list': [ra]}
135 list_id += 1
136 sub_lists.append(sub_list)
138 heads[ar.res] = sub_list
139 from_id_to_res_in_heads[sub_list['id']] = ar.res
140 if next_ar_res is not None:
141 tails[next_ar_res] = sub_list
143 ids_in_heads: Set[int] = {val['id'] for val in heads.values()}
144 ids_in_tails: Set[int] = {val['id'] for val in tails.values()}
145 diff_set: Set[int] = ids_in_heads - ids_in_tails
146 if len(diff_set) == 0:
147 # No contributor was found!
148 return []
149 elif len(diff_set) != 1:
150 raise ValueError('A malformed list of AgentRole entities was given.')
151 else:
152 result_list: List[ResponsibleAgent] = []
153 cur_id: int = diff_set.pop()
154 already_merged_list_ids: Set[int] = set()
155 finished: bool = False
156 while not finished:
157 found: bool = False
158 if cur_id in from_id_to_res_in_heads:
159 res: str = from_id_to_res_in_heads[cur_id]
160 subl: Dict = heads[res]
161 subl_id: int = subl['id']
162 if subl_id not in already_merged_list_ids:
163 found = True
164 already_merged_list_ids.add(subl_id)
165 result_list = subl['list'] + result_list
167 # Now we need to get the next cur_id value:
168 if res in tails:
169 cur_id = tails[res]['id']
170 else:
171 finished = True
173 if not found:
174 raise ValueError('A malformed list of AgentRole entities was given.')
176 unmerged_list_ids: Set[int] = ids_in_heads - already_merged_list_ids
177 if len(unmerged_list_ids) != 0:
178 raise ValueError('A malformed list of AgentRole entities was given.')
180 return result_list
183def encode_url(u: str) -> str:
184 return quote(u, "://")
187def create_literal(g: TripleLite, res: str, p: str, s: str, dt: str | None = None, nor: bool = True) -> None:
188 if not is_string_empty(s):
189 g.add((res, p, RDFTerm("literal", s, dt if dt is not None else XSD_STRING)))
192def create_type(g: TripleLite, res: str, res_type: str) -> None:
193 g.add((res, RDF_TYPE, RDFTerm("uri", res_type)))
196def is_string_empty(string: Optional[str]) -> bool:
197 return string is None or string.strip() == ""
200# Variable used in several functions
201entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))$"
202prov_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))/prov/([a-z][a-z])/([1-9][0-9]*)$"
204_compiled_entity_regex = re.compile(entity_regex)
205_compiled_prov_regex = re.compile(prov_regex)
208@lru_cache(maxsize=4096)
209def parse_uri(res: str) -> ParsedURI:
210 string_iri = str(res)
211 if "/prov/" in string_iri:
212 match = _compiled_prov_regex.match(string_iri)
213 if match:
214 return ParsedURI(
215 base_iri=match.group(1),
216 short_name=match.group(5),
217 prefix="",
218 count=match.group(6),
219 is_prov=True,
220 prov_subject_short_name=match.group(2),
221 prov_subject_prefix=match.group(3) or "",
222 prov_subject_count=match.group(4),
223 )
224 else:
225 match = _compiled_entity_regex.match(string_iri)
226 if match:
227 return ParsedURI(
228 base_iri=match.group(1),
229 short_name=match.group(2),
230 prefix=match.group(3) or "",
231 count=match.group(4),
232 is_prov=False,
233 prov_subject_short_name="",
234 prov_subject_prefix="",
235 prov_subject_count="",
236 )
237 return ParsedURI("", "", "", "", False, "", "", "")
240def get_base_iri(res: str) -> str:
241 return parse_uri(res).base_iri
244def get_short_name(res: str) -> str:
245 return parse_uri(res).short_name
248def get_prefix(res: str) -> str:
249 return parse_uri(res).prefix
252def get_count(res: str) -> str:
253 return parse_uri(res).count
256def get_resource_number(res: str) -> int:
257 parsed = parse_uri(res)
258 count = parsed.prov_subject_count if parsed.is_prov else parsed.count
259 return int(count) if count else 0
262def find_local_line_id(res: str, n_file_item: int = 1) -> int:
263 cur_number: int = get_resource_number(res)
265 cur_file_split: int = 0
266 while True:
267 if cur_number > cur_file_split:
268 cur_file_split += n_file_item
269 else:
270 cur_file_split -= n_file_item
271 break
273 return cur_number - cur_file_split
276def find_paths(res: str, base_dir: str, base_iri: str, default_dir: str, dir_split: int,
277 n_file_item: int, is_json: bool = True, process_id: int|str|None = None) -> Tuple[str, str]:
278 """
279 This function is responsible for looking for the correct JSON file that contains the data related to the
280 resource identified by the variable 'string_iri'. This search takes into account the organisation in
281 directories and files, as well as the particular supplier prefix for bibliographic entities, if specified.
282 In case no supplier prefix is specified, the 'default_dir' (usually set to "_") is used instead.
283 """
284 string_iri: str = str(res)
285 process_id_str: str = f"_{process_id}" if process_id else ""
287 if is_dataset(res):
288 cur_dir_path: str = (base_dir + re.sub(r"^%s(.*)$" % base_iri, r"\1", string_iri))[:-1]
289 cur_file_path: str = cur_dir_path + os.sep + "index" + process_id_str + ".json"
290 return cur_dir_path, cur_file_path
292 parsed = parse_uri(res)
293 cur_number: int = int(parsed.prov_subject_count) if parsed.is_prov else int(parsed.count)
295 cur_file_split: int = ((cur_number - 1) // n_file_item + 1) * n_file_item if cur_number > 0 else n_file_item
297 if dir_split and not string_iri.startswith(base_iri + "prov/"):
298 cur_split: int = ((cur_number - 1) // dir_split + 1) * dir_split if cur_number > 0 else dir_split
300 if parsed.is_prov:
301 sub_folder = parsed.prov_subject_prefix or default_dir or "_"
302 file_extension = '.json' if is_json else '.nq'
303 cur_dir_path = base_dir + parsed.prov_subject_short_name + os.sep + sub_folder + \
304 os.sep + str(cur_split) + os.sep + str(cur_file_split) + os.sep + "prov"
305 cur_file_path = cur_dir_path + os.sep + parsed.short_name + process_id_str + file_extension
306 else:
307 sub_folder = parsed.prefix or default_dir or "_"
308 file_extension = '.json' if is_json else '.nt'
309 cur_dir_path = base_dir + parsed.short_name + os.sep + sub_folder + os.sep + str(cur_split)
310 cur_file_path = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension
311 elif dir_split == 0:
312 if parsed.is_prov:
313 sub_folder = parsed.prov_subject_prefix or default_dir or "_"
314 file_extension = '.json' if is_json else '.nq'
315 cur_dir_path = base_dir + parsed.prov_subject_short_name + os.sep + sub_folder + \
316 os.sep + str(cur_file_split) + os.sep + "prov"
317 cur_file_path = cur_dir_path + os.sep + parsed.short_name + process_id_str + file_extension
318 else:
319 sub_folder = parsed.prefix or default_dir or "_"
320 file_extension = '.json' if is_json else '.nt'
321 cur_dir_path = base_dir + parsed.short_name + os.sep + sub_folder
322 cur_file_path = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension
323 else:
324 file_extension = '.json' if is_json else '.nq'
325 cur_dir_path = base_dir + parsed.short_name
326 cur_file_path = cur_dir_path + os.sep + parsed.prefix + parsed.count + process_id_str + file_extension
328 return cur_dir_path, cur_file_path
330def has_supplier_prefix(res: str, base_iri: str) -> bool:
331 string_iri: str = str(res)
332 return re.search(r"^%s[a-z][a-z]/0" % base_iri, string_iri) is not None
334def build_graph_from_results(results: List[Dict]) -> TripleLite:
335 graph = TripleLite()
336 for triple in results:
337 graph.add((triple['s']['value'], triple['p']['value'], sparql_binding_to_rdfterm(triple['o'])))
338 return graph
341def is_dataset(res: str) -> bool:
342 string_iri: str = str(res)
343 return re.search(r"^.+/[0-9]+(-[0-9]+)?(/[0-9]+)?$", string_iri) is None