Coverage for src / time_agnostic_library / support.py: 100%
94 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-03-21 11:54 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-03-21 11:54 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
8import json
9import re
10from datetime import datetime, timezone
11from functools import lru_cache
13CONFIG_PATH = './config.json'
15_NT_TERM_RE = re.compile(
16 r'<([^>]+)>'
17 r'|"((?:[^"\\]|\\.)*)"\^\^<([^>]+)>'
18 r'|"((?:[^"\\]|\\.)*)"@([a-zA-Z][\w-]*)'
19 r'|"((?:[^"\\]|\\.)*)"'
20 r'|(_:\S+)',
21 re.DOTALL,
22)
25def _nt_match_to_n3(match: re.Match) -> str:
26 if match.group(1) is not None:
27 return f"<{match.group(1)}>"
28 if match.group(2) is not None:
29 return f'"{match.group(2)}"^^<{match.group(3)}>'
30 if match.group(4) is not None:
31 return f'"{match.group(4)}"@{match.group(5)}'
32 if match.group(6) is not None:
33 return f'"{match.group(6)}"'
34 return match.group(7)
37def generate_config_file(
38 config_path:str=CONFIG_PATH, dataset_urls:list | None=None, dataset_dirs:list | None=None, dataset_is_quadstore:bool=True,
39 provenance_urls:list | None=None, provenance_dirs:list | None=None, provenance_is_quadstore:bool=True,
40 blazegraph_full_text_search:bool=False, fuseki_full_text_search:bool=False, virtuoso_full_text_search:bool=False,
41 graphdb_connector_name:str='') -> dict:
42 if provenance_dirs is None:
43 provenance_dirs = []
44 if provenance_urls is None:
45 provenance_urls = []
46 if dataset_dirs is None:
47 dataset_dirs = []
48 if dataset_urls is None:
49 dataset_urls = []
50 config = {
51 'dataset': {
52 'triplestore_urls': dataset_urls,
53 'file_paths': dataset_dirs,
54 'is_quadstore': dataset_is_quadstore
55 },
56 'provenance': {
57 'triplestore_urls': provenance_urls,
58 'file_paths': provenance_dirs,
59 'is_quadstore': provenance_is_quadstore
60 },
61 'blazegraph_full_text_search': str(blazegraph_full_text_search).lower(),
62 'fuseki_full_text_search': str(fuseki_full_text_search).lower(),
63 'virtuoso_full_text_search': str(virtuoso_full_text_search).lower(),
64 'graphdb_connector_name': graphdb_connector_name,
65 }
66 with open(config_path, 'w', encoding='utf-8') as f:
67 json.dump(config, f)
68 return config
70@lru_cache(maxsize=4096)
71def _cached_parse(time_string: str) -> datetime:
72 if time_string.endswith("Z"):
73 time_string = time_string[:-1] + "+00:00"
74 time = datetime.fromisoformat(time_string)
75 if time.tzinfo is None:
76 return time.replace(tzinfo=timezone.utc)
77 return time.astimezone(timezone.utc)
79def convert_to_datetime(time_string: str | None, stringify: bool = False) -> datetime | str | None:
80 if time_string and time_string != 'None':
81 time = _cached_parse(time_string)
82 if stringify:
83 return time.isoformat()
84 return time
85 return None
87def _strip_literal_datatype(n3: str) -> str:
88 if not n3.startswith('"'):
89 return n3
90 i = 1
91 while i < len(n3):
92 if n3[i] == '\\':
93 i += 2
94 continue
95 if n3[i] == '"':
96 rest = n3[i + 1:]
97 if rest.startswith('@'):
98 return n3
99 return n3[:i + 1]
100 i += 1
101 return n3
103def _to_nt_sorted_list(quads) -> list | None:
104 if quads is None:
105 return None
106 lines = set()
107 for q in quads:
108 parts = [_strip_literal_datatype(el) for el in q[:3]]
109 lines.add(' '.join(parts))
110 return sorted(lines)
112def _to_dict_of_nt_sorted_lists(dictionary: dict) -> dict:
113 result = {}
114 for key, value in dictionary.items():
115 if isinstance(value, set):
116 result[key] = _to_nt_sorted_list(value)
117 else:
118 result.setdefault(key, {})
119 for snapshot, quad_set in value.items():
120 result[key][snapshot] = _to_nt_sorted_list(quad_set)
121 return result
123def _nt_list_to_quad_set(nt_list: list[str]) -> set[tuple[str, ...]]:
124 result = set()
125 for line in nt_list:
126 if not line.strip():
127 continue
128 matches = list(_NT_TERM_RE.finditer(line))
129 if len(matches) >= 3:
130 result.add(tuple(_nt_match_to_n3(m) for m in matches[:3]))
131 return result
133def _to_dict_of_quad_sets(dictionary: dict) -> dict:
134 result = {}
135 for key, value in dictionary.items():
136 if isinstance(value, list):
137 result[key] = _nt_list_to_quad_set(value)
138 else:
139 result.setdefault(key, {})
140 for snapshot, triples in value.items():
141 result[key][snapshot] = _nt_list_to_quad_set(triples)
142 return result