Coverage for heritrace / services / resource_lock_manager.py: 100%

106 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-07-02 10:16 +0000

1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import json 

6import logging 

7from dataclasses import dataclass 

8from datetime import datetime, timezone 

9from enum import Enum 

10from typing import cast 

11 

12from flask_login import current_user 

13from redis import Redis, RedisError 

14 

15from heritrace.models import User 

16 

17# Set up logger 

18logger = logging.getLogger(__name__) 

19 

20 

21class LockStatus(Enum): 

22 """Possible states of a resource lock.""" 

23 

24 AVAILABLE = "available" 

25 LOCKED = "locked" 

26 ERROR = "error" 

27 

28 

29@dataclass 

30class LockInfo: 

31 """Information about a resource lock.""" 

32 

33 user_id: str 

34 user_name: str 

35 timestamp: str 

36 resource_uri: str 

37 linked_resources: list[str] | None = None 

38 

39 def __post_init__(self) -> None: 

40 if self.linked_resources is None: 

41 self.linked_resources = [] 

42 

43 

44class ResourceLockManager: 

45 """Manages resource locking using Redis. 

46 

47 This class uses Redis to manage locks on resources and their linked resources. 

48 It uses the following Redis key patterns: 

49 - resource_lock:{resource_uri} - Stores lock info for a resource 

50 - reverse_links:{resource_uri} - Stores a set of resources that link to this 

51 resource 

52 """ 

53 

54 def __init__(self, redis_client: Redis) -> None: # type: ignore[type-arg] 

55 self.redis: Redis[str] = redis_client # type: ignore[assignment] 

56 self.lock_duration = 300 # 5 minutes in seconds 

57 self.lock_prefix = "resource_lock:" 

58 self.reverse_links_prefix = ( 

59 "reverse_links:" # Reverse links: resources that link to this resource 

60 ) 

61 

62 def generate_lock_key(self, resource_uri: str) -> str: 

63 """Generate a Redis key for a resource lock.""" 

64 return f"{self.lock_prefix}{resource_uri}" 

65 

66 def generate_reverse_links_key(self, resource_uri: str) -> str: 

67 """Generate a Redis key for storing resources that link to this resource.""" 

68 return f"{self.reverse_links_prefix}{resource_uri}" 

69 

70 def get_lock_info(self, resource_uri: str) -> LockInfo | None: 

71 """ 

72 Get information about the current lock on a resource. 

73 

74 Args: 

75 resource_uri: URI of the resource to check 

76 

77 Returns: 

78 LockInfo if resource is locked, None otherwise 

79 """ 

80 lock_key = self.generate_lock_key(resource_uri) 

81 lock_data = self.redis.get(lock_key) 

82 

83 if not lock_data: 

84 return None 

85 

86 data = json.loads(lock_data) # type: ignore[arg-type] 

87 linked_resources = data.get("linked_resources", []) 

88 return LockInfo( 

89 user_id=data["user_id"], 

90 user_name=data["user_name"], 

91 timestamp=data["timestamp"], 

92 resource_uri=data["resource_uri"], 

93 linked_resources=linked_resources, 

94 ) 

95 

96 def check_lock_status( 

97 self, resource_uri: str 

98 ) -> tuple[LockStatus, LockInfo | None]: 

99 """ 

100 Check if a resource is locked and return its status. 

101 This method efficiently checks if: 

102 1. The resource itself is directly locked 

103 2. Any resource that this resource links to is locked (forward links) 

104 3. Any resource that links to this resource is locked (reverse links) 

105 

106 Args: 

107 resource_uri: URI of the resource to check 

108 

109 Returns: 

110 Tuple of (LockStatus, LockInfo | None) 

111 """ 

112 try: 

113 # 1. Check direct lock on the resource 

114 lock_info = self.get_lock_info(resource_uri) 

115 if lock_info: 

116 # If locked by current user, consider it available 

117 if lock_info.user_id == str(current_user.orcid): 

118 return LockStatus.AVAILABLE, lock_info 

119 return LockStatus.LOCKED, lock_info 

120 

121 # 3. Check if any resource that links to this resource is locked by another 

122 # user 

123 # Get the resources that link to this resource (reverse links) 

124 reverse_links_key = self.generate_reverse_links_key(resource_uri) 

125 reverse_links: set[str] = self.redis.smembers(reverse_links_key) # type: ignore[assignment] 

126 

127 # Check if any of these resources is locked 

128 for linking_uri_item in reverse_links: 

129 # Use helper method to standardize format 

130 linking_uri = self.decode_redis_item(linking_uri_item) 

131 linking_lock_info = self.get_lock_info(linking_uri) 

132 if linking_lock_info and linking_lock_info.user_id != str( 

133 current_user.orcid 

134 ): 

135 # Resource that links to this resource is locked by another user 

136 return LockStatus.LOCKED, linking_lock_info 

137 

138 except RedisError: 

139 logger.exception("Error checking lock status for %s", resource_uri) 

140 return LockStatus.ERROR, None 

141 else: 

142 return LockStatus.AVAILABLE, None 

143 

144 def decode_redis_item(self, item: bytes | str) -> str: 

145 """ 

146 Helper method to decode Redis items that might be bytes or strings. 

147 

148 Args: 

149 item: The item to decode (bytes or string) 

150 

151 Returns: 

152 String representation of the item 

153 """ 

154 if isinstance(item, bytes): 

155 return item.decode("utf-8") 

156 return str(item) 

157 

158 def acquire_lock(self, resource_uri: str, linked_resources: list[str]) -> bool: 

159 """ 

160 Try to acquire a lock on a resource. 

161 This method efficiently checks and acquires locks using Redis sets 

162 to track relationships between resources. 

163 

164 Args: 

165 resource_uri: URI of the resource to lock 

166 linked_resources: List of linked resources already known 

167 

168 Returns: 

169 bool: True if lock was acquired, False otherwise 

170 """ 

171 try: 

172 # First check if the resource or any related resource is locked by another 

173 # user 

174 status, _lock_info = self.check_lock_status(resource_uri) 

175 if status == LockStatus.LOCKED: 

176 return False 

177 

178 # Update reverse links in Redis 

179 if linked_resources: 

180 # Use a pipeline for better performance 

181 pipe = self.redis.pipeline() 

182 

183 # Store reverse links (resources that are linked to by this resource) 

184 for linked_uri in linked_resources: 

185 # Add to reverse links (this resource links to the linked resource) 

186 reverse_links_key = self.generate_reverse_links_key(linked_uri) 

187 pipe.sadd(reverse_links_key, str(resource_uri)) 

188 pipe.expire(reverse_links_key, self.lock_duration) 

189 

190 pipe.execute() 

191 

192 # After checking all linked resources, proceed with lock creation 

193 result = self.create_resource_lock( 

194 resource_uri, cast("User", current_user), linked_resources 

195 ) 

196 

197 except RedisError: 

198 logger.exception("Error acquiring lock for %s", resource_uri) 

199 return False 

200 else: 

201 return result 

202 

203 def create_resource_lock( 

204 self, resource_uri: str, current_user: User, linked_resources: list[str] 

205 ) -> bool: 

206 """ 

207 Helper method to create a lock for a resource. 

208 

209 Args: 

210 resource_uri: URI of the resource to lock 

211 current_user: The current user object 

212 linked_resources: List of linked resource URIs 

213 

214 Returns: 

215 bool: True if lock was created successfully 

216 """ 

217 try: 

218 # Create or update lock for the main resource 

219 lock_key = self.generate_lock_key(resource_uri) 

220 lock_data = { 

221 "user_id": str(current_user.orcid), 

222 "user_name": current_user.name, 

223 "timestamp": datetime.now(timezone.utc).isoformat(), 

224 "resource_uri": resource_uri, 

225 "linked_resources": linked_resources, 

226 } 

227 

228 # Set the lock with expiration 

229 self.redis.setex(lock_key, self.lock_duration, json.dumps(lock_data)) 

230 except RedisError: 

231 logger.exception("Error creating lock for %s", resource_uri) 

232 return False 

233 else: 

234 return True 

235 

236 def release_lock(self, resource_uri: str) -> bool: 

237 """ 

238 Release a lock on a resource if owned by the current user. 

239 Also cleans up the reverse links. 

240 

241 Args: 

242 resource_uri: URI of the resource to unlock 

243 

244 Returns: 

245 bool: True if lock was released, False otherwise 

246 """ 

247 try: 

248 lock_info = self.get_lock_info(resource_uri) 

249 

250 # If not locked or locked by another user 

251 if not lock_info or lock_info.user_id != str(current_user.orcid): 

252 return False 

253 

254 # Get the linked resources from the lock info 

255 linked_resources = lock_info.linked_resources or [] 

256 

257 # Delete the lock directly (this will raise an exception in the test) 

258 self.redis.delete(self.generate_lock_key(resource_uri)) 

259 

260 # Clean up reverse links 

261 for linked_uri in linked_resources: 

262 # Ensure we're working with string URIs 

263 linked_uri_str = str(linked_uri) 

264 reverse_links_key = self.generate_reverse_links_key(linked_uri_str) 

265 self.redis.srem(reverse_links_key, str(resource_uri)) 

266 

267 result = True 

268 

269 except RedisError: 

270 logger.exception("Error releasing lock for %s", resource_uri) 

271 return False 

272 else: 

273 return result