Coverage for oc_ocdm/support/support.py: 80%
215 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-05 23:58 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-05 23:58 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
16from __future__ import annotations
18import os
19import re
20from dataclasses import dataclass
21from datetime import datetime
22from functools import lru_cache
23from typing import TYPE_CHECKING
24from rdflib import URIRef, Graph
26if TYPE_CHECKING:
27 from typing import Optional, List, Tuple, Match, Dict, Set
28 from oc_ocdm.graph.entities.bibliographic.bibliographic_resource import BibliographicResource
29 from oc_ocdm.graph.entities.bibliographic.responsible_agent import ResponsibleAgent
30 from oc_ocdm.graph.entities.bibliographic.agent_role import AgentRole
32from urllib.parse import quote
35@dataclass
36class ParsedURI:
37 base_iri: str
38 short_name: str
39 prefix: str
40 count: str
41 is_prov: bool
42 prov_subject_short_name: str
43 prov_subject_prefix: str
44 prov_subject_count: str
46from rdflib import RDF, XSD, Literal
49def create_date(date_list: List[Optional[int]] = None) -> Optional[str]:
50 string: Optional[str] = None
51 if date_list is not None:
52 l_date_list: int = len(date_list)
53 if l_date_list != 0 and date_list[0] is not None:
54 if l_date_list == 3 and \
55 ((date_list[1] is not None and date_list[1] != 1) or
56 (date_list[2] is not None and date_list[2] != 1)):
57 string = datetime(date_list[0], date_list[1], date_list[2]).strftime('%Y-%m-%d')
58 elif l_date_list == 2 and date_list[1] is not None:
59 string = datetime(date_list[0], date_list[1], 1).strftime('%Y-%m')
60 else:
61 string = datetime(date_list[0], 1, 1).strftime('%Y')
62 return string
65def get_datatype_from_iso_8601(string: str) -> Tuple[URIRef, str]:
66 # Keep only the "yyyy-mm-dd" part of the string
67 string = string[:10]
69 try:
70 date_parts: List[int] = [int(s) for s in string.split(sep='-', maxsplit=2)]
71 except ValueError:
72 raise ValueError("The provided date string is not ISO-8601 compliant!")
74 num_of_parts: int = len(date_parts)
75 if num_of_parts == 3:
76 return XSD.date, datetime(*date_parts).strftime('%Y-%m-%d')
77 elif num_of_parts == 2:
78 return XSD.gYearMonth, datetime(*date_parts, 1).strftime('%Y-%m')
79 else:
80 return XSD.gYear, datetime(*date_parts, 1, 1).strftime('%Y')
82def get_ordered_contributors_from_br(br: BibliographicResource,
83 contributor_type: URIRef):
85 ar_list: List[AgentRole] = br.get_contributors()
87 list_id: int = 0
88 heads: Dict[URIRef, Dict] = {}
89 tails: Dict[URIRef, Dict] = {}
90 sub_lists: List[Dict] = []
91 from_id_to_res_in_heads: Dict[int, URIRef] = {}
92 for ar in ar_list:
93 role_type: URIRef = ar.get_role_type()
94 ra: ResponsibleAgent = ar.get_is_held_by()
95 next_ar: AgentRole = ar.get_next()
96 if next_ar is not None:
97 next_ar_res: Optional[URIRef] = next_ar.res
98 else:
99 next_ar_res: Optional[URIRef] = None
101 if role_type is not None and role_type == contributor_type and ra is not None:
102 if next_ar_res is not None and next_ar_res in heads:
103 sub_list: Dict = heads[next_ar_res]
104 sub_list['list'].insert(0, ra)
105 del heads[next_ar_res]
106 heads[ar.res] = sub_list
107 from_id_to_res_in_heads[sub_list['id']] = ar.res
108 elif ar.res is not None and ar.res in tails:
109 sub_list: Dict = tails[ar.res]
110 sub_list['list'].append(ra)
111 del tails[ar.res]
113 if next_ar_res is not None:
114 tails[next_ar_res] = sub_list
115 else:
116 # This AR cannot be inserted into any list, so
117 # we need to create an entirely new list for it:
118 sub_list: Dict = {'id': list_id, 'list': [ra]}
119 list_id += 1
120 sub_lists.append(sub_list)
122 heads[ar.res] = sub_list
123 from_id_to_res_in_heads[sub_list['id']] = ar.res
124 if next_ar_res is not None:
125 tails[next_ar_res] = sub_list
127 ids_in_heads: Set[int] = {val['id'] for val in heads.values()}
128 ids_in_tails: Set[int] = {val['id'] for val in tails.values()}
129 diff_set: Set[int] = ids_in_heads - ids_in_tails
130 if len(diff_set) == 0:
131 # No contributor was found!
132 return []
133 elif len(diff_set) != 1:
134 raise ValueError('A malformed list of AgentRole entities was given.')
135 else:
136 result_list: List[ResponsibleAgent] = []
137 cur_id: int = diff_set.pop()
138 already_merged_list_ids: Set[int] = set()
139 finished: bool = False
140 while not finished:
141 found: bool = False
142 if cur_id in from_id_to_res_in_heads:
143 res: URIRef = from_id_to_res_in_heads[cur_id]
144 subl: Dict = heads[res]
145 subl_id: int = subl['id']
146 if subl_id not in already_merged_list_ids:
147 found = True
148 already_merged_list_ids.add(subl_id)
149 result_list = subl['list'] + result_list
151 # Now we need to get the next cur_id value:
152 if res in tails:
153 cur_id = tails[res]['id']
154 else:
155 finished = True
157 if not found:
158 raise ValueError('A malformed list of AgentRole entities was given.')
160 unmerged_list_ids: Set[int] = ids_in_heads - already_merged_list_ids
161 if len(unmerged_list_ids) != 0:
162 raise ValueError('A malformed list of AgentRole entities was given.')
164 return result_list
167def encode_url(u: str) -> str:
168 return quote(u, "://")
171def create_literal(g: Graph, res: URIRef, p: URIRef, s: str, dt: URIRef = None, nor: bool = True) -> None:
172 if not is_string_empty(s):
173 dt = dt if dt is not None else XSD.string
174 g.add((res, p, Literal(s, datatype=dt, normalize=nor)))
177def create_type(g: Graph, res: URIRef, res_type: URIRef) -> None:
178 g.add((res, RDF.type, res_type))
181def is_string_empty(string: str) -> bool:
182 return string is None or string.strip() == ""
185# Variable used in several functions
186entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))$"
187prov_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))/prov/([a-z][a-z])/([1-9][0-9]*)$"
189_compiled_entity_regex = re.compile(entity_regex)
190_compiled_prov_regex = re.compile(prov_regex)
193@lru_cache(maxsize=4096)
194def parse_uri(res: URIRef) -> ParsedURI:
195 string_iri = str(res)
196 if "/prov/" in string_iri:
197 match = _compiled_prov_regex.match(string_iri)
198 if match:
199 return ParsedURI(
200 base_iri=match.group(1),
201 short_name=match.group(5),
202 prefix="",
203 count=match.group(6),
204 is_prov=True,
205 prov_subject_short_name=match.group(2),
206 prov_subject_prefix=match.group(3) or "",
207 prov_subject_count=match.group(4),
208 )
209 else:
210 match = _compiled_entity_regex.match(string_iri)
211 if match:
212 return ParsedURI(
213 base_iri=match.group(1),
214 short_name=match.group(2),
215 prefix=match.group(3) or "",
216 count=match.group(4),
217 is_prov=False,
218 prov_subject_short_name="",
219 prov_subject_prefix="",
220 prov_subject_count="",
221 )
222 return ParsedURI("", "", "", "", False, "", "", "")
225def get_base_iri(res: URIRef) -> str:
226 return parse_uri(res).base_iri
229def get_short_name(res: URIRef) -> str:
230 return parse_uri(res).short_name
233def get_prefix(res: URIRef) -> str:
234 return parse_uri(res).prefix
237def get_count(res: URIRef) -> str:
238 return parse_uri(res).count
241def get_resource_number(res: URIRef) -> int:
242 parsed = parse_uri(res)
243 count = parsed.prov_subject_count if parsed.is_prov else parsed.count
244 return int(count) if count else 0
247def find_local_line_id(res: URIRef, n_file_item: int = 1) -> int:
248 cur_number: int = get_resource_number(res)
250 cur_file_split: int = 0
251 while True:
252 if cur_number > cur_file_split:
253 cur_file_split += n_file_item
254 else:
255 cur_file_split -= n_file_item
256 break
258 return cur_number - cur_file_split
261def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_split: int,
262 n_file_item: int, is_json: bool = True, process_id: int|str = None) -> Tuple[str, str]:
263 """
264 This function is responsible for looking for the correct JSON file that contains the data related to the
265 resource identified by the variable 'string_iri'. This search takes into account the organisation in
266 directories and files, as well as the particular supplier prefix for bibliographic entities, if specified.
267 In case no supplier prefix is specified, the 'default_dir' (usually set to "_") is used instead.
268 """
269 string_iri: str = str(res)
270 process_id_str: str = f"_{process_id}" if process_id else ""
272 if is_dataset(res):
273 cur_dir_path: str = (base_dir + re.sub(r"^%s(.*)$" % base_iri, r"\1", string_iri))[:-1]
274 cur_file_path: str = cur_dir_path + os.sep + "index" + process_id_str + ".json"
275 return cur_dir_path, cur_file_path
277 parsed = parse_uri(res)
278 cur_number: int = int(parsed.prov_subject_count) if parsed.is_prov else int(parsed.count)
280 cur_file_split: int = 0
281 while cur_number > cur_file_split:
282 cur_file_split += n_file_item
284 if dir_split and not string_iri.startswith(base_iri + "prov/"):
285 cur_split: int = 0
286 while cur_number > cur_split:
287 cur_split += dir_split
289 if parsed.is_prov:
290 sub_folder = parsed.prov_subject_prefix or default_dir or "_"
291 file_extension = '.json' if is_json else '.nq'
292 cur_dir_path = base_dir + parsed.prov_subject_short_name + os.sep + sub_folder + \
293 os.sep + str(cur_split) + os.sep + str(cur_file_split) + os.sep + "prov"
294 cur_file_path = cur_dir_path + os.sep + parsed.short_name + process_id_str + file_extension
295 else:
296 sub_folder = parsed.prefix or default_dir or "_"
297 file_extension = '.json' if is_json else '.nt'
298 cur_dir_path = base_dir + parsed.short_name + os.sep + sub_folder + os.sep + str(cur_split)
299 cur_file_path = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension
300 elif dir_split == 0:
301 if parsed.is_prov:
302 sub_folder = parsed.prov_subject_prefix or default_dir or "_"
303 file_extension = '.json' if is_json else '.nq'
304 cur_dir_path = base_dir + parsed.prov_subject_short_name + os.sep + sub_folder + \
305 os.sep + str(cur_file_split) + os.sep + "prov"
306 cur_file_path = cur_dir_path + os.sep + parsed.short_name + process_id_str + file_extension
307 else:
308 sub_folder = parsed.prefix or default_dir or "_"
309 file_extension = '.json' if is_json else '.nt'
310 cur_dir_path = base_dir + parsed.short_name + os.sep + sub_folder
311 cur_file_path = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension
312 else:
313 file_extension = '.json' if is_json else '.nq'
314 cur_dir_path = base_dir + parsed.short_name
315 cur_file_path = cur_dir_path + os.sep + parsed.prefix + parsed.count + process_id_str + file_extension
317 return cur_dir_path, cur_file_path
319def has_supplier_prefix(res: URIRef, base_iri: str) -> bool:
320 string_iri: str = str(res)
321 return re.search(r"^%s[a-z][a-z]/0" % base_iri, string_iri) is not None
323def build_graph_from_results(results: List[Dict]) -> Graph:
324 graph = Graph()
325 for triple in results:
326 s = URIRef(triple['s']['value'])
327 p = URIRef(triple['p']['value'])
328 if triple['o']['type'] == 'uri':
329 o = URIRef(triple['o']['value'])
330 else:
331 datatype = triple['o'].get('datatype', None)
332 datatype = URIRef(datatype) if datatype is not None else None
333 o = Literal(triple['o']['value'], datatype=datatype)
334 graph.add((s, p, o))
335 return graph
338def is_dataset(res: URIRef) -> bool:
339 string_iri: str = str(res)
340 return re.search(r"^.+/[0-9]+(-[0-9]+)?(/[0-9]+)?$", string_iri) is None