Coverage for oc_ocdm/support/support.py: 76%
250 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-05-30 22:05 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-05-30 22:05 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
16from __future__ import annotations
18import os
19import re
20from datetime import datetime
21from typing import TYPE_CHECKING
22from rdflib import URIRef, Graph
24if TYPE_CHECKING:
25 from typing import Optional, List, Tuple, Match, Dict, Set
26 from oc_ocdm.graph.entities.bibliographic.bibliographic_resource import BibliographicResource
27 from oc_ocdm.graph.entities.bibliographic.responsible_agent import ResponsibleAgent
28 from oc_ocdm.graph.entities.bibliographic.agent_role import AgentRole
30from urllib.parse import quote
32from rdflib import RDF, XSD, Literal
35def create_date(date_list: List[Optional[int]] = None) -> Optional[str]:
36 string: Optional[str] = None
37 if date_list is not None:
38 l_date_list: int = len(date_list)
39 if l_date_list != 0 and date_list[0] is not None:
40 if l_date_list == 3 and \
41 ((date_list[1] is not None and date_list[1] != 1) or
42 (date_list[2] is not None and date_list[2] != 1)):
43 string = datetime(date_list[0], date_list[1], date_list[2]).strftime('%Y-%m-%d')
44 elif l_date_list == 2 and date_list[1] is not None:
45 string = datetime(date_list[0], date_list[1], 1).strftime('%Y-%m')
46 else:
47 string = datetime(date_list[0], 1, 1).strftime('%Y')
48 return string
51def get_datatype_from_iso_8601(string: str) -> Tuple[URIRef, str]:
52 # Keep only the "yyyy-mm-dd" part of the string
53 string = string[:10]
55 try:
56 date_parts: List[int] = [int(s) for s in string.split(sep='-', maxsplit=2)]
57 except ValueError:
58 raise ValueError("The provided date string is not ISO-8601 compliant!")
60 num_of_parts: int = len(date_parts)
61 if num_of_parts == 3:
62 return XSD.date, datetime(*date_parts).strftime('%Y-%m-%d')
63 elif num_of_parts == 2:
64 return XSD.gYearMonth, datetime(*date_parts, 1).strftime('%Y-%m')
65 else:
66 return XSD.gYear, datetime(*date_parts, 1, 1).strftime('%Y')
68def get_ordered_contributors_from_br(br: BibliographicResource,
69 contributor_type: URIRef):
71 ar_list: List[AgentRole] = br.get_contributors()
73 list_id: int = 0
74 heads: Dict[URIRef, Dict] = {}
75 tails: Dict[URIRef, Dict] = {}
76 sub_lists: List[Dict] = []
77 from_id_to_res_in_heads: Dict[int, URIRef] = {}
78 for ar in ar_list:
79 role_type: URIRef = ar.get_role_type()
80 ra: ResponsibleAgent = ar.get_is_held_by()
81 next_ar: AgentRole = ar.get_next()
82 if next_ar is not None:
83 next_ar_res: Optional[URIRef] = next_ar.res
84 else:
85 next_ar_res: Optional[URIRef] = None
87 if role_type is not None and role_type == contributor_type and ra is not None:
88 if next_ar_res is not None and next_ar_res in heads:
89 sub_list: Dict = heads[next_ar_res]
90 sub_list['list'].insert(0, ra)
91 del heads[next_ar_res]
92 heads[ar.res] = sub_list
93 from_id_to_res_in_heads[sub_list['id']] = ar.res
94 elif ar.res is not None and ar.res in tails:
95 sub_list: Dict = tails[ar.res]
96 sub_list['list'].append(ra)
97 del tails[ar.res]
99 if next_ar_res is not None:
100 tails[next_ar_res] = sub_list
101 else:
102 # This AR cannot be inserted into any list, so
103 # we need to create an entirely new list for it:
104 sub_list: Dict = {'id': list_id, 'list': [ra]}
105 list_id += 1
106 sub_lists.append(sub_list)
108 heads[ar.res] = sub_list
109 from_id_to_res_in_heads[sub_list['id']] = ar.res
110 if next_ar_res is not None:
111 tails[next_ar_res] = sub_list
113 ids_in_heads: Set[int] = {val['id'] for val in heads.values()}
114 ids_in_tails: Set[int] = {val['id'] for val in tails.values()}
115 diff_set: Set[int] = ids_in_heads - ids_in_tails
116 if len(diff_set) == 0:
117 # No contributor was found!
118 return []
119 elif len(diff_set) != 1:
120 raise ValueError('A malformed list of AgentRole entities was given.')
121 else:
122 result_list: List[ResponsibleAgent] = []
123 cur_id: int = diff_set.pop()
124 already_merged_list_ids: Set[int] = set()
125 finished: bool = False
126 while not finished:
127 found: bool = False
128 if cur_id in from_id_to_res_in_heads:
129 res: URIRef = from_id_to_res_in_heads[cur_id]
130 subl: Dict = heads[res]
131 subl_id: int = subl['id']
132 if subl_id not in already_merged_list_ids:
133 found = True
134 already_merged_list_ids.add(subl_id)
135 result_list = subl['list'] + result_list
137 # Now we need to get the next cur_id value:
138 if res in tails:
139 cur_id = tails[res]['id']
140 else:
141 finished = True
143 if not found:
144 raise ValueError('A malformed list of AgentRole entities was given.')
146 unmerged_list_ids: Set[int] = ids_in_heads - already_merged_list_ids
147 if len(unmerged_list_ids) != 0:
148 raise ValueError('A malformed list of AgentRole entities was given.')
150 return result_list
153def encode_url(u: str) -> str:
154 return quote(u, "://")
157def create_literal(g: Graph, res: URIRef, p: URIRef, s: str, dt: URIRef = None, nor: bool = True) -> None:
158 if not is_string_empty(s):
159 dt = dt if dt is not None else XSD.string
160 g.add((res, p, Literal(s, datatype=dt, normalize=nor)))
163def create_type(g: Graph, res: URIRef, res_type: URIRef) -> None:
164 g.add((res, RDF.type, res_type))
167def is_string_empty(string: str) -> bool:
168 return string is None or string.strip() == ""
171# Variable used in several functions
172entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))$"
173prov_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))/prov/([a-z][a-z])/([1-9][0-9]*)$"
176def _get_match(regex: str, group: int, string: str) -> str:
177 match: Match = re.match(regex, string)
178 if match is not None:
179 return match.group(group)
180 else:
181 return ""
184def get_base_iri(res: URIRef) -> str:
185 string_iri: str = str(res)
186 if "/prov/" in string_iri:
187 return _get_match(prov_regex, 1, string_iri)
188 else:
189 return _get_match(entity_regex, 1, string_iri)
192def get_short_name(res: URIRef) -> str:
193 string_iri: str = str(res)
194 if "/prov/" in string_iri:
195 return _get_match(prov_regex, 5, string_iri)
196 else:
197 return _get_match(entity_regex, 2, string_iri)
200def get_prov_subject_short_name(prov_res: URIRef) -> str:
201 string_iri: str = str(prov_res)
202 if "/prov/" in string_iri:
203 return _get_match(prov_regex, 2, string_iri)
204 else:
205 return "" # non-provenance entities do not have a prov_subject!
208def get_prefix(res: URIRef) -> str:
209 string_iri: str = str(res)
210 if "/prov/" in string_iri:
211 return "" # provenance entities cannot have a supplier prefix
212 else:
213 return _get_match(entity_regex, 3, string_iri)
216def get_prov_subject_prefix(prov_res: URIRef) -> str:
217 string_iri: str = str(prov_res)
218 if "/prov/" in string_iri:
219 return _get_match(prov_regex, 3, string_iri)
220 else:
221 return "" # non-provenance entities do not have a prov_subject!
224def get_count(res: URIRef) -> str:
225 string_iri: str = str(res)
226 if "/prov/" in string_iri:
227 return _get_match(prov_regex, 6, string_iri)
228 else:
229 return _get_match(entity_regex, 4, string_iri)
232def get_prov_subject_count(prov_res: URIRef) -> str:
233 string_iri: str = str(prov_res)
234 if "/prov/" in string_iri:
235 return _get_match(prov_regex, 4, string_iri)
236 else:
237 return "" # non-provenance entities do not have a prov_subject!
240def get_resource_number(res: URIRef) -> int:
241 string_iri: str = str(res)
242 if "/prov/" in string_iri:
243 return int(_get_match(prov_regex, 4, string_iri))
244 else:
245 return int(_get_match(entity_regex, 4, string_iri))
248def find_local_line_id(res: URIRef, n_file_item: int = 1) -> int:
249 cur_number: int = get_resource_number(res)
251 cur_file_split: int = 0
252 while True:
253 if cur_number > cur_file_split:
254 cur_file_split += n_file_item
255 else:
256 cur_file_split -= n_file_item
257 break
259 return cur_number - cur_file_split
262def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_split: int,
263 n_file_item: int, is_json: bool = True, process_id: int|str = None) -> Tuple[str, str]:
264 """
265 This function is responsible for looking for the correct JSON file that contains the data related to the
266 resource identified by the variable 'string_iri'. This search takes into account the organisation in
267 directories and files, as well as the particular supplier prefix for bibliographic entities, if specified.
268 In case no supplier prefix is specified, the 'default_dir' (usually set to "_") is used instead.
269 """
270 string_iri: str = str(res)
271 process_id_str: str = f"_{process_id}" if process_id else ""
273 if is_dataset(res):
274 cur_dir_path: str = (base_dir + re.sub(r"^%s(.*)$" % base_iri, r"\1", string_iri))[:-1]
275 # In case of dataset, the file path is different from regular files, e.g.
276 # /corpus/br/index.json
277 cur_file_path: str = cur_dir_path + os.sep + "index" + process_id_str + ".json"
278 else:
279 cur_number: int = get_resource_number(res)
281 # Find the correct file number where to save the resources
282 cur_file_split: int = 0
283 while True:
284 if cur_number > cur_file_split:
285 cur_file_split += n_file_item
286 else:
287 break
289 # The data have been split in multiple directories and it is not something related
290 # with the provenance data of the whole corpus (e.g. provenance agents)
291 if dir_split and not string_iri.startswith(base_iri + "prov/"):
292 # Find the correct directory number where to save the file
293 cur_split: int = 0
294 while True:
295 if cur_number > cur_split:
296 cur_split += dir_split
297 else:
298 break
300 if "/prov/" in string_iri: # provenance file of a bibliographic entity
301 subj_short_name: str = get_prov_subject_short_name(res)
302 short_name: str = get_short_name(res)
303 sub_folder: str = get_prov_subject_prefix(res)
304 file_extension: str = '.json' if is_json else '.nq'
305 if sub_folder == "":
306 sub_folder = default_dir
307 if sub_folder == "":
308 sub_folder = "_" # enforce default value
310 cur_dir_path: str = base_dir + subj_short_name + os.sep + sub_folder + \
311 os.sep + str(cur_split) + os.sep + str(cur_file_split) + os.sep + "prov"
312 cur_file_path: str = cur_dir_path + os.sep + short_name + process_id_str + file_extension
313 else: # regular bibliographic entity
314 short_name: str = get_short_name(res)
315 sub_folder: str = get_prefix(res)
316 file_extension: str = '.json' if is_json else '.nt'
317 if sub_folder == "":
318 sub_folder = default_dir
319 if sub_folder == "":
320 sub_folder = "_" # enforce default value
322 cur_dir_path: str = base_dir + short_name + os.sep + sub_folder + os.sep + str(cur_split)
323 cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension
324 # Enter here if no split is needed
325 elif dir_split == 0:
326 if "/prov/" in string_iri:
327 subj_short_name: str = get_prov_subject_short_name(res)
328 short_name: str = get_short_name(res)
329 sub_folder: str = get_prov_subject_prefix(res)
330 file_extension: str = '.json' if is_json else '.nq'
331 if sub_folder == "":
332 sub_folder = default_dir
333 if sub_folder == "":
334 sub_folder = "_" # enforce default value
336 cur_dir_path: str = base_dir + subj_short_name + os.sep + sub_folder + \
337 os.sep + str(cur_file_split) + os.sep + "prov"
338 cur_file_path: str = cur_dir_path + os.sep + short_name + process_id_str + file_extension
339 else:
340 short_name: str = get_short_name(res)
341 sub_folder: str = get_prefix(res)
342 file_extension: str = '.json' if is_json else '.nt'
343 if sub_folder == "":
344 sub_folder = default_dir
345 if sub_folder == "":
346 sub_folder = "_" # enforce default value
348 cur_dir_path: str = base_dir + short_name + os.sep + sub_folder
349 cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension
350 # Enter here if the data is about a provenance agent, e.g. /corpus/prov/
351 else:
352 short_name: str = get_short_name(res)
353 prefix: str = get_prefix(res)
354 count: str = get_count(res)
355 file_extension: str = '.json' if is_json else '.nq'
357 cur_dir_path: str = base_dir + short_name
358 cur_file_path: str = cur_dir_path + os.sep + prefix + count + process_id_str + file_extension
360 return cur_dir_path, cur_file_path
362def has_supplier_prefix(res: URIRef, base_iri: str) -> bool:
363 string_iri: str = str(res)
364 return re.search(r"^%s[a-z][a-z]/0" % base_iri, string_iri) is not None
366def build_graph_from_results(results: List[Dict]) -> Graph:
367 graph = Graph()
368 for triple in results:
369 s = URIRef(triple['s']['value'])
370 p = URIRef(triple['p']['value'])
371 if triple['o']['type'] == 'uri':
372 o = URIRef(triple['o']['value'])
373 else:
374 datatype = triple['o'].get('datatype', None)
375 datatype = URIRef(datatype) if datatype is not None else None
376 o = Literal(triple['o']['value'], datatype=datatype)
377 graph.add((s, p, o))
378 return graph
381def is_dataset(res: URIRef) -> bool:
382 string_iri: str = str(res)
383 return re.search(r"^.+/[0-9]+(-[0-9]+)?(/[0-9]+)?$", string_iri) is None