Coverage for heritrace / services / resource_lock_manager.py: 100%
105 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import json
6import logging
7from dataclasses import dataclass
8from datetime import datetime, timezone
9from enum import Enum
10from typing import Optional, Tuple, List
12from flask_login import current_user
13from redis import Redis
15# Set up logger
16logger = logging.getLogger(__name__)
19class LockStatus(Enum):
20 """Possible states of a resource lock."""
22 AVAILABLE = "available"
23 LOCKED = "locked"
24 ERROR = "error"
27@dataclass
28class LockInfo:
29 """Information about a resource lock."""
31 user_id: str
32 user_name: str
33 timestamp: str
34 resource_uri: str
35 linked_resources: List[str] = None
37 def __post_init__(self):
38 if self.linked_resources is None:
39 self.linked_resources = []
42class ResourceLockManager:
43 """Manages resource locking using Redis.
45 This class uses Redis to manage locks on resources and their linked resources.
46 It uses the following Redis key patterns:
47 - resource_lock:{resource_uri} - Stores lock info for a resource
48 - reverse_links:{resource_uri} - Stores a set of resources that link to this resource
49 """
51 def __init__(self, redis_client: Redis):
52 self.redis = redis_client
53 self.lock_duration = 300 # 5 minutes in seconds
54 self.lock_prefix = "resource_lock:"
55 self.reverse_links_prefix = "reverse_links:" # Reverse links: resources that link to this resource
57 def _generate_lock_key(self, resource_uri: str) -> str:
58 """Generate a Redis key for a resource lock."""
59 return f"{self.lock_prefix}{resource_uri}"
61 def _generate_reverse_links_key(self, resource_uri: str) -> str:
62 """Generate a Redis key for storing resources that link to this resource."""
63 return f"{self.reverse_links_prefix}{resource_uri}"
65 def get_lock_info(self, resource_uri: str) -> Optional[LockInfo]:
66 """
67 Get information about the current lock on a resource.
69 Args:
70 resource_uri: URI of the resource to check
72 Returns:
73 LockInfo if resource is locked, None otherwise
74 """
75 lock_key = self._generate_lock_key(resource_uri)
76 lock_data = self.redis.get(lock_key)
78 if not lock_data:
79 return None
81 data = json.loads(lock_data)
82 linked_resources = data.get("linked_resources", [])
83 return LockInfo(
84 user_id=data["user_id"],
85 user_name=data["user_name"],
86 timestamp=data["timestamp"],
87 resource_uri=data["resource_uri"],
88 linked_resources=linked_resources
89 )
91 def check_lock_status(
92 self, resource_uri: str
93 ) -> Tuple[LockStatus, Optional[LockInfo]]:
94 """
95 Check if a resource is locked and return its status.
96 This method efficiently checks if:
97 1. The resource itself is directly locked
98 2. Any resource that this resource links to is locked (forward links)
99 3. Any resource that links to this resource is locked (reverse links)
101 Args:
102 resource_uri: URI of the resource to check
104 Returns:
105 Tuple of (LockStatus, Optional[LockInfo])
106 """
107 try:
108 # 1. Check direct lock on the resource
109 lock_info = self.get_lock_info(resource_uri)
110 if lock_info:
111 # If locked by current user, consider it available
112 if lock_info.user_id == str(current_user.orcid):
113 return LockStatus.AVAILABLE, lock_info
114 return LockStatus.LOCKED, lock_info
116 # 3. Check if any resource that links to this resource is locked by another user
117 # Get the resources that link to this resource (reverse links)
118 reverse_links_key = self._generate_reverse_links_key(resource_uri)
119 reverse_links = self.redis.smembers(reverse_links_key)
121 # Check if any of these resources is locked
122 for linking_uri_item in reverse_links:
123 # Use helper method to standardize format
124 linking_uri = self._decode_redis_item(linking_uri_item)
125 linking_lock_info = self.get_lock_info(linking_uri)
126 if linking_lock_info and linking_lock_info.user_id != str(current_user.orcid):
127 # Resource that links to this resource is locked by another user
128 return LockStatus.LOCKED, linking_lock_info
130 # If we get here, the resource is available
131 return LockStatus.AVAILABLE, None
133 except Exception as e:
134 logger.error(f"Error checking lock status for {resource_uri}: {str(e)}")
135 import traceback
136 logger.error(traceback.format_exc())
137 return LockStatus.ERROR, None
139 def _decode_redis_item(self, item):
140 """
141 Helper method to decode Redis items that might be bytes or strings.
143 Args:
144 item: The item to decode (bytes or string)
146 Returns:
147 String representation of the item
148 """
149 if isinstance(item, bytes):
150 return item.decode('utf-8')
151 return str(item)
153 def acquire_lock(self, resource_uri: str, linked_resources: List[str]) -> bool:
154 """
155 Try to acquire a lock on a resource.
156 This method efficiently checks and acquires locks using Redis sets
157 to track relationships between resources.
159 Args:
160 resource_uri: URI of the resource to lock
161 linked_resources: List of linked resources already known
163 Returns:
164 bool: True if lock was acquired, False otherwise
165 """
166 try:
167 # First check if the resource or any related resource is locked by another user
168 status, lock_info = self.check_lock_status(resource_uri)
169 if status == LockStatus.LOCKED:
170 return False
172 # Update reverse links in Redis
173 if linked_resources:
174 # Use a pipeline for better performance
175 pipe = self.redis.pipeline()
177 # Store reverse links (resources that are linked to by this resource)
178 for linked_uri in linked_resources:
179 # Add to reverse links (this resource links to the linked resource)
180 reverse_links_key = self._generate_reverse_links_key(linked_uri)
181 pipe.sadd(reverse_links_key, str(resource_uri))
182 pipe.expire(reverse_links_key, self.lock_duration)
184 pipe.execute()
186 # After checking all linked resources, proceed with lock creation
187 return self._create_resource_lock(resource_uri, current_user, linked_resources)
189 except Exception as e:
190 logger.error(f"Error acquiring lock for {resource_uri}: {str(e)}")
191 return False
193 def _create_resource_lock(self, resource_uri: str, current_user, linked_resources: List[str]) -> bool:
194 """
195 Helper method to create a lock for a resource.
197 Args:
198 resource_uri: URI of the resource to lock
199 current_user: The current user object
200 linked_resources: List of linked resource URIs
202 Returns:
203 bool: True if lock was created successfully
204 """
205 try:
206 # Create or update lock for the main resource
207 lock_key = self._generate_lock_key(resource_uri)
208 lock_data = {
209 "user_id": str(current_user.orcid),
210 "user_name": current_user.name,
211 "timestamp": datetime.now(timezone.utc).isoformat(),
212 "resource_uri": resource_uri,
213 "linked_resources": linked_resources
214 }
216 # Set the lock with expiration
217 self.redis.setex(lock_key, self.lock_duration, json.dumps(lock_data))
218 return True
219 except Exception as e:
220 logger.error(f"Error creating lock for {resource_uri}: {str(e)}")
221 return False
223 def release_lock(self, resource_uri: str) -> bool:
224 """
225 Release a lock on a resource if owned by the current user.
226 Also cleans up the reverse links.
228 Args:
229 resource_uri: URI of the resource to unlock
231 Returns:
232 bool: True if lock was released, False otherwise
233 """
234 try:
235 lock_info = self.get_lock_info(resource_uri)
237 # If not locked or locked by another user
238 if not lock_info or lock_info.user_id != str(current_user.orcid):
239 return False
241 # Get the linked resources from the lock info
242 linked_resources = lock_info.linked_resources if lock_info.linked_resources else []
244 # Delete the lock directly (this will raise an exception in the test)
245 self.redis.delete(self._generate_lock_key(resource_uri))
247 # Clean up reverse links
248 for linked_uri in linked_resources:
249 # Ensure we're working with string URIs
250 linked_uri_str = str(linked_uri)
251 reverse_links_key = self._generate_reverse_links_key(linked_uri_str)
252 self.redis.srem(reverse_links_key, str(resource_uri))
254 return True
256 except Exception as e:
257 logger.error(f"Error releasing lock for {resource_uri}: {str(e)}")
258 return False