Coverage for src / time_agnostic_library / support.py: 100%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-03-21 11:54 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7 

8import json 

9import re 

10from datetime import datetime, timezone 

11from functools import lru_cache 

12 

13CONFIG_PATH = './config.json' 

14 

15_NT_TERM_RE = re.compile( 

16 r'<([^>]+)>' 

17 r'|"((?:[^"\\]|\\.)*)"\^\^<([^>]+)>' 

18 r'|"((?:[^"\\]|\\.)*)"@([a-zA-Z][\w-]*)' 

19 r'|"((?:[^"\\]|\\.)*)"' 

20 r'|(_:\S+)', 

21 re.DOTALL, 

22) 

23 

24 

25def _nt_match_to_n3(match: re.Match) -> str: 

26 if match.group(1) is not None: 

27 return f"<{match.group(1)}>" 

28 if match.group(2) is not None: 

29 return f'"{match.group(2)}"^^<{match.group(3)}>' 

30 if match.group(4) is not None: 

31 return f'"{match.group(4)}"@{match.group(5)}' 

32 if match.group(6) is not None: 

33 return f'"{match.group(6)}"' 

34 return match.group(7) 

35 

36 

37def generate_config_file( 

38 config_path:str=CONFIG_PATH, dataset_urls:list | None=None, dataset_dirs:list | None=None, dataset_is_quadstore:bool=True, 

39 provenance_urls:list | None=None, provenance_dirs:list | None=None, provenance_is_quadstore:bool=True, 

40 blazegraph_full_text_search:bool=False, fuseki_full_text_search:bool=False, virtuoso_full_text_search:bool=False, 

41 graphdb_connector_name:str='') -> dict: 

42 if provenance_dirs is None: 

43 provenance_dirs = [] 

44 if provenance_urls is None: 

45 provenance_urls = [] 

46 if dataset_dirs is None: 

47 dataset_dirs = [] 

48 if dataset_urls is None: 

49 dataset_urls = [] 

50 config = { 

51 'dataset': { 

52 'triplestore_urls': dataset_urls, 

53 'file_paths': dataset_dirs, 

54 'is_quadstore': dataset_is_quadstore 

55 }, 

56 'provenance': { 

57 'triplestore_urls': provenance_urls, 

58 'file_paths': provenance_dirs, 

59 'is_quadstore': provenance_is_quadstore 

60 }, 

61 'blazegraph_full_text_search': str(blazegraph_full_text_search).lower(), 

62 'fuseki_full_text_search': str(fuseki_full_text_search).lower(), 

63 'virtuoso_full_text_search': str(virtuoso_full_text_search).lower(), 

64 'graphdb_connector_name': graphdb_connector_name, 

65 } 

66 with open(config_path, 'w', encoding='utf-8') as f: 

67 json.dump(config, f) 

68 return config 

69 

70@lru_cache(maxsize=4096) 

71def _cached_parse(time_string: str) -> datetime: 

72 if time_string.endswith("Z"): 

73 time_string = time_string[:-1] + "+00:00" 

74 time = datetime.fromisoformat(time_string) 

75 if time.tzinfo is None: 

76 return time.replace(tzinfo=timezone.utc) 

77 return time.astimezone(timezone.utc) 

78 

79def convert_to_datetime(time_string: str | None, stringify: bool = False) -> datetime | str | None: 

80 if time_string and time_string != 'None': 

81 time = _cached_parse(time_string) 

82 if stringify: 

83 return time.isoformat() 

84 return time 

85 return None 

86 

87def _strip_literal_datatype(n3: str) -> str: 

88 if not n3.startswith('"'): 

89 return n3 

90 i = 1 

91 while i < len(n3): 

92 if n3[i] == '\\': 

93 i += 2 

94 continue 

95 if n3[i] == '"': 

96 rest = n3[i + 1:] 

97 if rest.startswith('@'): 

98 return n3 

99 return n3[:i + 1] 

100 i += 1 

101 return n3 

102 

103def _to_nt_sorted_list(quads) -> list | None: 

104 if quads is None: 

105 return None 

106 lines = set() 

107 for q in quads: 

108 parts = [_strip_literal_datatype(el) for el in q[:3]] 

109 lines.add(' '.join(parts)) 

110 return sorted(lines) 

111 

112def _to_dict_of_nt_sorted_lists(dictionary: dict) -> dict: 

113 result = {} 

114 for key, value in dictionary.items(): 

115 if isinstance(value, set): 

116 result[key] = _to_nt_sorted_list(value) 

117 else: 

118 result.setdefault(key, {}) 

119 for snapshot, quad_set in value.items(): 

120 result[key][snapshot] = _to_nt_sorted_list(quad_set) 

121 return result 

122 

123def _nt_list_to_quad_set(nt_list: list[str]) -> set[tuple[str, ...]]: 

124 result = set() 

125 for line in nt_list: 

126 if not line.strip(): 

127 continue 

128 matches = list(_NT_TERM_RE.finditer(line)) 

129 if len(matches) >= 3: 

130 result.add(tuple(_nt_match_to_n3(m) for m in matches[:3])) 

131 return result 

132 

133def _to_dict_of_quad_sets(dictionary: dict) -> dict: 

134 result = {} 

135 for key, value in dictionary.items(): 

136 if isinstance(value, list): 

137 result[key] = _nt_list_to_quad_set(value) 

138 else: 

139 result.setdefault(key, {}) 

140 for snapshot, triples in value.items(): 

141 result[key][snapshot] = _nt_list_to_quad_set(triples) 

142 return result