Coverage for heritrace/services/resource_lock_manager.py: 100%

105 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-18 11:10 +0000

1import json 

2import logging 

3from dataclasses import dataclass 

4from datetime import datetime, timezone 

5from enum import Enum 

6from typing import Optional, Tuple, List 

7 

8from flask_login import current_user 

9from redis import Redis 

10 

11# Set up logger 

12logger = logging.getLogger(__name__) 

13 

14 

15class LockStatus(Enum): 

16 """Possible states of a resource lock.""" 

17 

18 AVAILABLE = "available" 

19 LOCKED = "locked" 

20 ERROR = "error" 

21 

22 

23@dataclass 

24class LockInfo: 

25 """Information about a resource lock.""" 

26 

27 user_id: str 

28 user_name: str 

29 timestamp: str 

30 resource_uri: str 

31 linked_resources: List[str] = None 

32 

33 def __post_init__(self): 

34 if self.linked_resources is None: 

35 self.linked_resources = [] 

36 

37 

38class ResourceLockManager: 

39 """Manages resource locking using Redis. 

40  

41 This class uses Redis to manage locks on resources and their linked resources. 

42 It uses the following Redis key patterns: 

43 - resource_lock:{resource_uri} - Stores lock info for a resource 

44 - reverse_links:{resource_uri} - Stores a set of resources that link to this resource 

45 """ 

46 

47 def __init__(self, redis_client: Redis): 

48 self.redis = redis_client 

49 self.lock_duration = 300 # 5 minutes in seconds 

50 self.lock_prefix = "resource_lock:" 

51 self.reverse_links_prefix = "reverse_links:" # Reverse links: resources that link to this resource 

52 

53 def _generate_lock_key(self, resource_uri: str) -> str: 

54 """Generate a Redis key for a resource lock.""" 

55 return f"{self.lock_prefix}{resource_uri}" 

56 

57 def _generate_reverse_links_key(self, resource_uri: str) -> str: 

58 """Generate a Redis key for storing resources that link to this resource.""" 

59 return f"{self.reverse_links_prefix}{resource_uri}" 

60 

61 def get_lock_info(self, resource_uri: str) -> Optional[LockInfo]: 

62 """ 

63 Get information about the current lock on a resource. 

64 

65 Args: 

66 resource_uri: URI of the resource to check 

67 

68 Returns: 

69 LockInfo if resource is locked, None otherwise 

70 """ 

71 lock_key = self._generate_lock_key(resource_uri) 

72 lock_data = self.redis.get(lock_key) 

73 

74 if not lock_data: 

75 return None 

76 

77 data = json.loads(lock_data) 

78 linked_resources = data.get("linked_resources", []) 

79 return LockInfo( 

80 user_id=data["user_id"], 

81 user_name=data["user_name"], 

82 timestamp=data["timestamp"], 

83 resource_uri=data["resource_uri"], 

84 linked_resources=linked_resources 

85 ) 

86 

87 def check_lock_status( 

88 self, resource_uri: str 

89 ) -> Tuple[LockStatus, Optional[LockInfo]]: 

90 """ 

91 Check if a resource is locked and return its status. 

92 This method efficiently checks if: 

93 1. The resource itself is directly locked 

94 2. Any resource that this resource links to is locked (forward links) 

95 3. Any resource that links to this resource is locked (reverse links) 

96 

97 Args: 

98 resource_uri: URI of the resource to check 

99 

100 Returns: 

101 Tuple of (LockStatus, Optional[LockInfo]) 

102 """ 

103 try: 

104 # 1. Check direct lock on the resource 

105 lock_info = self.get_lock_info(resource_uri) 

106 if lock_info: 

107 # If locked by current user, consider it available 

108 if lock_info.user_id == str(current_user.orcid): 

109 return LockStatus.AVAILABLE, lock_info 

110 return LockStatus.LOCKED, lock_info 

111 

112 # 3. Check if any resource that links to this resource is locked by another user 

113 # Get the resources that link to this resource (reverse links) 

114 reverse_links_key = self._generate_reverse_links_key(resource_uri) 

115 reverse_links = self.redis.smembers(reverse_links_key) 

116 

117 # Check if any of these resources is locked 

118 for linking_uri_item in reverse_links: 

119 # Use helper method to standardize format 

120 linking_uri = self._decode_redis_item(linking_uri_item) 

121 linking_lock_info = self.get_lock_info(linking_uri) 

122 if linking_lock_info and linking_lock_info.user_id != str(current_user.orcid): 

123 # Resource that links to this resource is locked by another user 

124 return LockStatus.LOCKED, linking_lock_info 

125 

126 # If we get here, the resource is available 

127 return LockStatus.AVAILABLE, None 

128 

129 except Exception as e: 

130 logger.error(f"Error checking lock status for {resource_uri}: {str(e)}") 

131 import traceback 

132 logger.error(traceback.format_exc()) 

133 return LockStatus.ERROR, None 

134 

135 def _decode_redis_item(self, item): 

136 """ 

137 Helper method to decode Redis items that might be bytes or strings. 

138  

139 Args: 

140 item: The item to decode (bytes or string) 

141  

142 Returns: 

143 String representation of the item 

144 """ 

145 if isinstance(item, bytes): 

146 return item.decode('utf-8') 

147 return str(item) 

148 

149 def acquire_lock(self, resource_uri: str, linked_resources: List[str]) -> bool: 

150 """ 

151 Try to acquire a lock on a resource. 

152 This method efficiently checks and acquires locks using Redis sets 

153 to track relationships between resources. 

154 

155 Args: 

156 resource_uri: URI of the resource to lock 

157 linked_resources: List of linked resources already known 

158 

159 Returns: 

160 bool: True if lock was acquired, False otherwise 

161 """ 

162 try: 

163 # First check if the resource or any related resource is locked by another user 

164 status, lock_info = self.check_lock_status(resource_uri) 

165 if status == LockStatus.LOCKED: 

166 return False 

167 

168 # Update reverse links in Redis 

169 if linked_resources: 

170 # Use a pipeline for better performance 

171 pipe = self.redis.pipeline() 

172 

173 # Store reverse links (resources that are linked to by this resource) 

174 for linked_uri in linked_resources: 

175 # Add to reverse links (this resource links to the linked resource) 

176 reverse_links_key = self._generate_reverse_links_key(linked_uri) 

177 pipe.sadd(reverse_links_key, str(resource_uri)) 

178 pipe.expire(reverse_links_key, self.lock_duration) 

179 

180 pipe.execute() 

181 

182 # After checking all linked resources, proceed with lock creation 

183 return self._create_resource_lock(resource_uri, current_user, linked_resources) 

184 

185 except Exception as e: 

186 logger.error(f"Error acquiring lock for {resource_uri}: {str(e)}") 

187 return False 

188 

189 def _create_resource_lock(self, resource_uri: str, current_user, linked_resources: List[str]) -> bool: 

190 """ 

191 Helper method to create a lock for a resource. 

192  

193 Args: 

194 resource_uri: URI of the resource to lock 

195 current_user: The current user object 

196 linked_resources: List of linked resource URIs 

197  

198 Returns: 

199 bool: True if lock was created successfully 

200 """ 

201 try: 

202 # Create or update lock for the main resource 

203 lock_key = self._generate_lock_key(resource_uri) 

204 lock_data = { 

205 "user_id": str(current_user.orcid), 

206 "user_name": current_user.name, 

207 "timestamp": datetime.now(timezone.utc).isoformat(), 

208 "resource_uri": resource_uri, 

209 "linked_resources": linked_resources 

210 } 

211 

212 # Set the lock with expiration 

213 self.redis.setex(lock_key, self.lock_duration, json.dumps(lock_data)) 

214 return True 

215 except Exception as e: 

216 logger.error(f"Error creating lock for {resource_uri}: {str(e)}") 

217 return False 

218 

219 def release_lock(self, resource_uri: str) -> bool: 

220 """ 

221 Release a lock on a resource if owned by the current user. 

222 Also cleans up the reverse links. 

223 

224 Args: 

225 resource_uri: URI of the resource to unlock 

226 

227 Returns: 

228 bool: True if lock was released, False otherwise 

229 """ 

230 try: 

231 lock_info = self.get_lock_info(resource_uri) 

232 

233 # If not locked or locked by another user 

234 if not lock_info or lock_info.user_id != str(current_user.orcid): 

235 return False 

236 

237 # Get the linked resources from the lock info 

238 linked_resources = lock_info.linked_resources if lock_info.linked_resources else [] 

239 

240 # Delete the lock directly (this will raise an exception in the test) 

241 self.redis.delete(self._generate_lock_key(resource_uri)) 

242 

243 # Clean up reverse links 

244 for linked_uri in linked_resources: 

245 # Ensure we're working with string URIs 

246 linked_uri_str = str(linked_uri) 

247 reverse_links_key = self._generate_reverse_links_key(linked_uri_str) 

248 self.redis.srem(reverse_links_key, str(resource_uri)) 

249 

250 return True 

251 

252 except Exception as e: 

253 logger.error(f"Error releasing lock for {resource_uri}: {str(e)}") 

254 return False