Coverage for heritrace / routes / entity / _operations.py: 98%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-07-02 10:16 +0000

1# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from rdflib import XSD, Literal, URIRef 

6from SPARQLWrapper import JSON 

7 

8from heritrace.editor import Editor 

9from heritrace.extensions import get_form_fields, get_sparql 

10from heritrace.sparql import get_sparql_bindings 

11from heritrace.utils.display_rules_utils import get_highest_priority_class 

12from heritrace.utils.shacl_utils import find_matching_form_field 

13from heritrace.utils.sparql_utils import get_entity_types 

14from heritrace.utils.uri_utils import is_valid_url 

15 

16 

17def process_modification_data(data: dict) -> tuple[str, list[dict]]: 

18 subject_uri = data.get("subject") 

19 if not subject_uri: 

20 msg = "No subject URI provided in modification data" 

21 raise ValueError(msg) 

22 

23 modifications = data.get("modifications", []) 

24 if not modifications: 

25 msg = "No modifications provided in data" 

26 raise ValueError(msg) 

27 

28 return subject_uri, modifications 

29 

30 

31def _validate_removal( 

32 predicate_fields: list[dict], 

33 predicate: str, 

34) -> tuple[bool, str]: 

35 for field in predicate_fields: 

36 if field.get("minCount", 0) > 0: 

37 return False, f"Cannot remove required predicate: {predicate}" 

38 return True, "" 

39 

40 

41def _validate_addition( 

42 predicate_fields: list[dict], 

43 predicate: str, 

44 subject_uri: URIRef, 

45) -> tuple[bool, str]: 

46 for field in predicate_fields: 

47 max_count = field.get("maxCount") 

48 if max_count: 

49 current_count = get_predicate_count(subject_uri, URIRef(predicate)) 

50 if current_count >= max_count: 

51 return ( 

52 False, 

53 f"Maximum count exceeded for predicate: {predicate}", 

54 ) 

55 return True, "" 

56 

57 

58def _resolve_entity_type( 

59 modification: dict, 

60 subject_uri: URIRef, 

61) -> str | None: 

62 entity_type = modification.get("entity_type") 

63 if not entity_type: 

64 entity_types = get_entity_types(subject_uri) 

65 if entity_types: 

66 entity_type = get_highest_priority_class(entity_types) 

67 return entity_type 

68 

69 

70def validate_modification(modification: dict, subject_uri: URIRef) -> tuple[bool, str]: 

71 form_fields = get_form_fields() 

72 operation = modification.get("operation") 

73 if not operation: 

74 return False, "No operation specified in modification" 

75 

76 predicate = modification.get("predicate") 

77 if not predicate: 

78 return False, "No predicate specified in modification" 

79 

80 if operation not in ["add", "remove", "update"]: 

81 return False, f"Invalid operation: {operation}" 

82 

83 if form_fields: 

84 entity_type = _resolve_entity_type(modification, subject_uri) 

85 entity_shape = modification.get("entity_shape") 

86 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields) 

87 

88 if matching_key: 

89 predicate_fields = form_fields[matching_key].get(predicate, []) 

90 if operation == "remove": 

91 return _validate_removal(predicate_fields, predicate) 

92 if operation == "add": 

93 return _validate_addition(predicate_fields, predicate, subject_uri) 

94 

95 return True, "" 

96 

97 

98def get_predicate_count(subject_uri: URIRef, predicate: URIRef) -> int: 

99 sparql = get_sparql() 

100 

101 query = f""" 

102 SELECT (COUNT(?o) as ?count) WHERE {{ 

103 <{subject_uri}> <{predicate}> ?o . 

104 }} 

105 """ 

106 

107 sparql.setQuery(query) 

108 sparql.setReturnFormat(JSON) 

109 bindings = get_sparql_bindings(sparql.query().convert()) 

110 

111 return int(bindings[0]["count"]["value"]) 

112 

113 

114def apply_modifications( 

115 editor: Editor, 

116 modifications: list[dict], 

117 subject_uri: URIRef, 

118 graph_uri: URIRef | None = None, 

119) -> None: 

120 for mod in modifications: 

121 operation = mod["operation"] 

122 predicate = URIRef(mod["predicate"]) 

123 

124 if operation == "remove": 

125 editor.delete(subject_uri, predicate, graph=graph_uri) 

126 

127 elif operation == "add": 

128 value = mod["value"] 

129 datatype = mod.get("datatype", XSD.string) 

130 

131 if is_valid_url(value): 

132 object_value = URIRef(value) 

133 else: 

134 object_value = Literal(value, datatype=URIRef(datatype)) 

135 

136 editor.create(subject_uri, predicate, object_value, graph_uri) 

137 

138 elif operation == "update": 

139 old_value = mod["oldValue"] 

140 new_value = mod["newValue"] 

141 datatype = mod.get("datatype", XSD.string) 

142 

143 if is_valid_url(old_value): 

144 old_object = URIRef(old_value) 

145 else: 

146 old_object = Literal(old_value, datatype=URIRef(datatype)) 

147 

148 if is_valid_url(new_value): 

149 new_object = URIRef(new_value) 

150 else: 

151 new_object = Literal(new_value, datatype=URIRef(datatype)) 

152 

153 editor.update( 

154 subject_uri, 

155 predicate, 

156 old_object, 

157 new_object, 

158 graph_uri, 

159 )