Coverage for heritrace/services/resource_lock_manager.py: 100%
105 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-18 11:10 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-18 11:10 +0000
1import json
2import logging
3from dataclasses import dataclass
4from datetime import datetime, timezone
5from enum import Enum
6from typing import Optional, Tuple, List
8from flask_login import current_user
9from redis import Redis
11# Set up logger
12logger = logging.getLogger(__name__)
15class LockStatus(Enum):
16 """Possible states of a resource lock."""
18 AVAILABLE = "available"
19 LOCKED = "locked"
20 ERROR = "error"
23@dataclass
24class LockInfo:
25 """Information about a resource lock."""
27 user_id: str
28 user_name: str
29 timestamp: str
30 resource_uri: str
31 linked_resources: List[str] = None
33 def __post_init__(self):
34 if self.linked_resources is None:
35 self.linked_resources = []
38class ResourceLockManager:
39 """Manages resource locking using Redis.
41 This class uses Redis to manage locks on resources and their linked resources.
42 It uses the following Redis key patterns:
43 - resource_lock:{resource_uri} - Stores lock info for a resource
44 - reverse_links:{resource_uri} - Stores a set of resources that link to this resource
45 """
47 def __init__(self, redis_client: Redis):
48 self.redis = redis_client
49 self.lock_duration = 300 # 5 minutes in seconds
50 self.lock_prefix = "resource_lock:"
51 self.reverse_links_prefix = "reverse_links:" # Reverse links: resources that link to this resource
53 def _generate_lock_key(self, resource_uri: str) -> str:
54 """Generate a Redis key for a resource lock."""
55 return f"{self.lock_prefix}{resource_uri}"
57 def _generate_reverse_links_key(self, resource_uri: str) -> str:
58 """Generate a Redis key for storing resources that link to this resource."""
59 return f"{self.reverse_links_prefix}{resource_uri}"
61 def get_lock_info(self, resource_uri: str) -> Optional[LockInfo]:
62 """
63 Get information about the current lock on a resource.
65 Args:
66 resource_uri: URI of the resource to check
68 Returns:
69 LockInfo if resource is locked, None otherwise
70 """
71 lock_key = self._generate_lock_key(resource_uri)
72 lock_data = self.redis.get(lock_key)
74 if not lock_data:
75 return None
77 data = json.loads(lock_data)
78 linked_resources = data.get("linked_resources", [])
79 return LockInfo(
80 user_id=data["user_id"],
81 user_name=data["user_name"],
82 timestamp=data["timestamp"],
83 resource_uri=data["resource_uri"],
84 linked_resources=linked_resources
85 )
87 def check_lock_status(
88 self, resource_uri: str
89 ) -> Tuple[LockStatus, Optional[LockInfo]]:
90 """
91 Check if a resource is locked and return its status.
92 This method efficiently checks if:
93 1. The resource itself is directly locked
94 2. Any resource that this resource links to is locked (forward links)
95 3. Any resource that links to this resource is locked (reverse links)
97 Args:
98 resource_uri: URI of the resource to check
100 Returns:
101 Tuple of (LockStatus, Optional[LockInfo])
102 """
103 try:
104 # 1. Check direct lock on the resource
105 lock_info = self.get_lock_info(resource_uri)
106 if lock_info:
107 # If locked by current user, consider it available
108 if lock_info.user_id == str(current_user.orcid):
109 return LockStatus.AVAILABLE, lock_info
110 return LockStatus.LOCKED, lock_info
112 # 3. Check if any resource that links to this resource is locked by another user
113 # Get the resources that link to this resource (reverse links)
114 reverse_links_key = self._generate_reverse_links_key(resource_uri)
115 reverse_links = self.redis.smembers(reverse_links_key)
117 # Check if any of these resources is locked
118 for linking_uri_item in reverse_links:
119 # Use helper method to standardize format
120 linking_uri = self._decode_redis_item(linking_uri_item)
121 linking_lock_info = self.get_lock_info(linking_uri)
122 if linking_lock_info and linking_lock_info.user_id != str(current_user.orcid):
123 # Resource that links to this resource is locked by another user
124 return LockStatus.LOCKED, linking_lock_info
126 # If we get here, the resource is available
127 return LockStatus.AVAILABLE, None
129 except Exception as e:
130 logger.error(f"Error checking lock status for {resource_uri}: {str(e)}")
131 import traceback
132 logger.error(traceback.format_exc())
133 return LockStatus.ERROR, None
135 def _decode_redis_item(self, item):
136 """
137 Helper method to decode Redis items that might be bytes or strings.
139 Args:
140 item: The item to decode (bytes or string)
142 Returns:
143 String representation of the item
144 """
145 if isinstance(item, bytes):
146 return item.decode('utf-8')
147 return str(item)
149 def acquire_lock(self, resource_uri: str, linked_resources: List[str]) -> bool:
150 """
151 Try to acquire a lock on a resource.
152 This method efficiently checks and acquires locks using Redis sets
153 to track relationships between resources.
155 Args:
156 resource_uri: URI of the resource to lock
157 linked_resources: List of linked resources already known
159 Returns:
160 bool: True if lock was acquired, False otherwise
161 """
162 try:
163 # First check if the resource or any related resource is locked by another user
164 status, lock_info = self.check_lock_status(resource_uri)
165 if status == LockStatus.LOCKED:
166 return False
168 # Update reverse links in Redis
169 if linked_resources:
170 # Use a pipeline for better performance
171 pipe = self.redis.pipeline()
173 # Store reverse links (resources that are linked to by this resource)
174 for linked_uri in linked_resources:
175 # Add to reverse links (this resource links to the linked resource)
176 reverse_links_key = self._generate_reverse_links_key(linked_uri)
177 pipe.sadd(reverse_links_key, str(resource_uri))
178 pipe.expire(reverse_links_key, self.lock_duration)
180 pipe.execute()
182 # After checking all linked resources, proceed with lock creation
183 return self._create_resource_lock(resource_uri, current_user, linked_resources)
185 except Exception as e:
186 logger.error(f"Error acquiring lock for {resource_uri}: {str(e)}")
187 return False
189 def _create_resource_lock(self, resource_uri: str, current_user, linked_resources: List[str]) -> bool:
190 """
191 Helper method to create a lock for a resource.
193 Args:
194 resource_uri: URI of the resource to lock
195 current_user: The current user object
196 linked_resources: List of linked resource URIs
198 Returns:
199 bool: True if lock was created successfully
200 """
201 try:
202 # Create or update lock for the main resource
203 lock_key = self._generate_lock_key(resource_uri)
204 lock_data = {
205 "user_id": str(current_user.orcid),
206 "user_name": current_user.name,
207 "timestamp": datetime.now(timezone.utc).isoformat(),
208 "resource_uri": resource_uri,
209 "linked_resources": linked_resources
210 }
212 # Set the lock with expiration
213 self.redis.setex(lock_key, self.lock_duration, json.dumps(lock_data))
214 return True
215 except Exception as e:
216 logger.error(f"Error creating lock for {resource_uri}: {str(e)}")
217 return False
219 def release_lock(self, resource_uri: str) -> bool:
220 """
221 Release a lock on a resource if owned by the current user.
222 Also cleans up the reverse links.
224 Args:
225 resource_uri: URI of the resource to unlock
227 Returns:
228 bool: True if lock was released, False otherwise
229 """
230 try:
231 lock_info = self.get_lock_info(resource_uri)
233 # If not locked or locked by another user
234 if not lock_info or lock_info.user_id != str(current_user.orcid):
235 return False
237 # Get the linked resources from the lock info
238 linked_resources = lock_info.linked_resources if lock_info.linked_resources else []
240 # Delete the lock directly (this will raise an exception in the test)
241 self.redis.delete(self._generate_lock_key(resource_uri))
243 # Clean up reverse links
244 for linked_uri in linked_resources:
245 # Ensure we're working with string URIs
246 linked_uri_str = str(linked_uri)
247 reverse_links_key = self._generate_reverse_links_key(linked_uri_str)
248 self.redis.srem(reverse_links_key, str(resource_uri))
250 return True
252 except Exception as e:
253 logger.error(f"Error releasing lock for {resource_uri}: {str(e)}")
254 return False