Coverage for heritrace / services / resource_lock_manager.py: 100%

105 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-21 12:56 +0000

1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import json 

6import logging 

7from dataclasses import dataclass 

8from datetime import datetime, timezone 

9from enum import Enum 

10from typing import Optional, Tuple, List 

11 

12from flask_login import current_user 

13from redis import Redis 

14 

15# Set up logger 

16logger = logging.getLogger(__name__) 

17 

18 

19class LockStatus(Enum): 

20 """Possible states of a resource lock.""" 

21 

22 AVAILABLE = "available" 

23 LOCKED = "locked" 

24 ERROR = "error" 

25 

26 

27@dataclass 

28class LockInfo: 

29 """Information about a resource lock.""" 

30 

31 user_id: str 

32 user_name: str 

33 timestamp: str 

34 resource_uri: str 

35 linked_resources: List[str] = None 

36 

37 def __post_init__(self): 

38 if self.linked_resources is None: 

39 self.linked_resources = [] 

40 

41 

42class ResourceLockManager: 

43 """Manages resource locking using Redis. 

44  

45 This class uses Redis to manage locks on resources and their linked resources. 

46 It uses the following Redis key patterns: 

47 - resource_lock:{resource_uri} - Stores lock info for a resource 

48 - reverse_links:{resource_uri} - Stores a set of resources that link to this resource 

49 """ 

50 

51 def __init__(self, redis_client: Redis): 

52 self.redis = redis_client 

53 self.lock_duration = 300 # 5 minutes in seconds 

54 self.lock_prefix = "resource_lock:" 

55 self.reverse_links_prefix = "reverse_links:" # Reverse links: resources that link to this resource 

56 

57 def _generate_lock_key(self, resource_uri: str) -> str: 

58 """Generate a Redis key for a resource lock.""" 

59 return f"{self.lock_prefix}{resource_uri}" 

60 

61 def _generate_reverse_links_key(self, resource_uri: str) -> str: 

62 """Generate a Redis key for storing resources that link to this resource.""" 

63 return f"{self.reverse_links_prefix}{resource_uri}" 

64 

65 def get_lock_info(self, resource_uri: str) -> Optional[LockInfo]: 

66 """ 

67 Get information about the current lock on a resource. 

68 

69 Args: 

70 resource_uri: URI of the resource to check 

71 

72 Returns: 

73 LockInfo if resource is locked, None otherwise 

74 """ 

75 lock_key = self._generate_lock_key(resource_uri) 

76 lock_data = self.redis.get(lock_key) 

77 

78 if not lock_data: 

79 return None 

80 

81 data = json.loads(lock_data) 

82 linked_resources = data.get("linked_resources", []) 

83 return LockInfo( 

84 user_id=data["user_id"], 

85 user_name=data["user_name"], 

86 timestamp=data["timestamp"], 

87 resource_uri=data["resource_uri"], 

88 linked_resources=linked_resources 

89 ) 

90 

91 def check_lock_status( 

92 self, resource_uri: str 

93 ) -> Tuple[LockStatus, Optional[LockInfo]]: 

94 """ 

95 Check if a resource is locked and return its status. 

96 This method efficiently checks if: 

97 1. The resource itself is directly locked 

98 2. Any resource that this resource links to is locked (forward links) 

99 3. Any resource that links to this resource is locked (reverse links) 

100 

101 Args: 

102 resource_uri: URI of the resource to check 

103 

104 Returns: 

105 Tuple of (LockStatus, Optional[LockInfo]) 

106 """ 

107 try: 

108 # 1. Check direct lock on the resource 

109 lock_info = self.get_lock_info(resource_uri) 

110 if lock_info: 

111 # If locked by current user, consider it available 

112 if lock_info.user_id == str(current_user.orcid): 

113 return LockStatus.AVAILABLE, lock_info 

114 return LockStatus.LOCKED, lock_info 

115 

116 # 3. Check if any resource that links to this resource is locked by another user 

117 # Get the resources that link to this resource (reverse links) 

118 reverse_links_key = self._generate_reverse_links_key(resource_uri) 

119 reverse_links = self.redis.smembers(reverse_links_key) 

120 

121 # Check if any of these resources is locked 

122 for linking_uri_item in reverse_links: 

123 # Use helper method to standardize format 

124 linking_uri = self._decode_redis_item(linking_uri_item) 

125 linking_lock_info = self.get_lock_info(linking_uri) 

126 if linking_lock_info and linking_lock_info.user_id != str(current_user.orcid): 

127 # Resource that links to this resource is locked by another user 

128 return LockStatus.LOCKED, linking_lock_info 

129 

130 # If we get here, the resource is available 

131 return LockStatus.AVAILABLE, None 

132 

133 except Exception as e: 

134 logger.error(f"Error checking lock status for {resource_uri}: {str(e)}") 

135 import traceback 

136 logger.error(traceback.format_exc()) 

137 return LockStatus.ERROR, None 

138 

139 def _decode_redis_item(self, item): 

140 """ 

141 Helper method to decode Redis items that might be bytes or strings. 

142  

143 Args: 

144 item: The item to decode (bytes or string) 

145  

146 Returns: 

147 String representation of the item 

148 """ 

149 if isinstance(item, bytes): 

150 return item.decode('utf-8') 

151 return str(item) 

152 

153 def acquire_lock(self, resource_uri: str, linked_resources: List[str]) -> bool: 

154 """ 

155 Try to acquire a lock on a resource. 

156 This method efficiently checks and acquires locks using Redis sets 

157 to track relationships between resources. 

158 

159 Args: 

160 resource_uri: URI of the resource to lock 

161 linked_resources: List of linked resources already known 

162 

163 Returns: 

164 bool: True if lock was acquired, False otherwise 

165 """ 

166 try: 

167 # First check if the resource or any related resource is locked by another user 

168 status, lock_info = self.check_lock_status(resource_uri) 

169 if status == LockStatus.LOCKED: 

170 return False 

171 

172 # Update reverse links in Redis 

173 if linked_resources: 

174 # Use a pipeline for better performance 

175 pipe = self.redis.pipeline() 

176 

177 # Store reverse links (resources that are linked to by this resource) 

178 for linked_uri in linked_resources: 

179 # Add to reverse links (this resource links to the linked resource) 

180 reverse_links_key = self._generate_reverse_links_key(linked_uri) 

181 pipe.sadd(reverse_links_key, str(resource_uri)) 

182 pipe.expire(reverse_links_key, self.lock_duration) 

183 

184 pipe.execute() 

185 

186 # After checking all linked resources, proceed with lock creation 

187 return self._create_resource_lock(resource_uri, current_user, linked_resources) 

188 

189 except Exception as e: 

190 logger.error(f"Error acquiring lock for {resource_uri}: {str(e)}") 

191 return False 

192 

193 def _create_resource_lock(self, resource_uri: str, current_user, linked_resources: List[str]) -> bool: 

194 """ 

195 Helper method to create a lock for a resource. 

196  

197 Args: 

198 resource_uri: URI of the resource to lock 

199 current_user: The current user object 

200 linked_resources: List of linked resource URIs 

201  

202 Returns: 

203 bool: True if lock was created successfully 

204 """ 

205 try: 

206 # Create or update lock for the main resource 

207 lock_key = self._generate_lock_key(resource_uri) 

208 lock_data = { 

209 "user_id": str(current_user.orcid), 

210 "user_name": current_user.name, 

211 "timestamp": datetime.now(timezone.utc).isoformat(), 

212 "resource_uri": resource_uri, 

213 "linked_resources": linked_resources 

214 } 

215 

216 # Set the lock with expiration 

217 self.redis.setex(lock_key, self.lock_duration, json.dumps(lock_data)) 

218 return True 

219 except Exception as e: 

220 logger.error(f"Error creating lock for {resource_uri}: {str(e)}") 

221 return False 

222 

223 def release_lock(self, resource_uri: str) -> bool: 

224 """ 

225 Release a lock on a resource if owned by the current user. 

226 Also cleans up the reverse links. 

227 

228 Args: 

229 resource_uri: URI of the resource to unlock 

230 

231 Returns: 

232 bool: True if lock was released, False otherwise 

233 """ 

234 try: 

235 lock_info = self.get_lock_info(resource_uri) 

236 

237 # If not locked or locked by another user 

238 if not lock_info or lock_info.user_id != str(current_user.orcid): 

239 return False 

240 

241 # Get the linked resources from the lock info 

242 linked_resources = lock_info.linked_resources if lock_info.linked_resources else [] 

243 

244 # Delete the lock directly (this will raise an exception in the test) 

245 self.redis.delete(self._generate_lock_key(resource_uri)) 

246 

247 # Clean up reverse links 

248 for linked_uri in linked_resources: 

249 # Ensure we're working with string URIs 

250 linked_uri_str = str(linked_uri) 

251 reverse_links_key = self._generate_reverse_links_key(linked_uri_str) 

252 self.redis.srem(reverse_links_key, str(resource_uri)) 

253 

254 return True 

255 

256 except Exception as e: 

257 logger.error(f"Error releasing lock for {resource_uri}: {str(e)}") 

258 return False