Coverage for heritrace / services / resource_lock_manager.py: 100%
106 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import json
6import logging
7from dataclasses import dataclass
8from datetime import datetime, timezone
9from enum import Enum
10from typing import cast
12from flask_login import current_user
13from redis import Redis, RedisError
15from heritrace.models import User
17# Set up logger
18logger = logging.getLogger(__name__)
21class LockStatus(Enum):
22 """Possible states of a resource lock."""
24 AVAILABLE = "available"
25 LOCKED = "locked"
26 ERROR = "error"
29@dataclass
30class LockInfo:
31 """Information about a resource lock."""
33 user_id: str
34 user_name: str
35 timestamp: str
36 resource_uri: str
37 linked_resources: list[str] | None = None
39 def __post_init__(self) -> None:
40 if self.linked_resources is None:
41 self.linked_resources = []
44class ResourceLockManager:
45 """Manages resource locking using Redis.
47 This class uses Redis to manage locks on resources and their linked resources.
48 It uses the following Redis key patterns:
49 - resource_lock:{resource_uri} - Stores lock info for a resource
50 - reverse_links:{resource_uri} - Stores a set of resources that link to this
51 resource
52 """
54 def __init__(self, redis_client: Redis) -> None: # type: ignore[type-arg]
55 self.redis: Redis[str] = redis_client # type: ignore[assignment]
56 self.lock_duration = 300 # 5 minutes in seconds
57 self.lock_prefix = "resource_lock:"
58 self.reverse_links_prefix = (
59 "reverse_links:" # Reverse links: resources that link to this resource
60 )
62 def generate_lock_key(self, resource_uri: str) -> str:
63 """Generate a Redis key for a resource lock."""
64 return f"{self.lock_prefix}{resource_uri}"
66 def generate_reverse_links_key(self, resource_uri: str) -> str:
67 """Generate a Redis key for storing resources that link to this resource."""
68 return f"{self.reverse_links_prefix}{resource_uri}"
70 def get_lock_info(self, resource_uri: str) -> LockInfo | None:
71 """
72 Get information about the current lock on a resource.
74 Args:
75 resource_uri: URI of the resource to check
77 Returns:
78 LockInfo if resource is locked, None otherwise
79 """
80 lock_key = self.generate_lock_key(resource_uri)
81 lock_data = self.redis.get(lock_key)
83 if not lock_data:
84 return None
86 data = json.loads(lock_data) # type: ignore[arg-type]
87 linked_resources = data.get("linked_resources", [])
88 return LockInfo(
89 user_id=data["user_id"],
90 user_name=data["user_name"],
91 timestamp=data["timestamp"],
92 resource_uri=data["resource_uri"],
93 linked_resources=linked_resources,
94 )
96 def check_lock_status(
97 self, resource_uri: str
98 ) -> tuple[LockStatus, LockInfo | None]:
99 """
100 Check if a resource is locked and return its status.
101 This method efficiently checks if:
102 1. The resource itself is directly locked
103 2. Any resource that this resource links to is locked (forward links)
104 3. Any resource that links to this resource is locked (reverse links)
106 Args:
107 resource_uri: URI of the resource to check
109 Returns:
110 Tuple of (LockStatus, LockInfo | None)
111 """
112 try:
113 # 1. Check direct lock on the resource
114 lock_info = self.get_lock_info(resource_uri)
115 if lock_info:
116 # If locked by current user, consider it available
117 if lock_info.user_id == str(current_user.orcid):
118 return LockStatus.AVAILABLE, lock_info
119 return LockStatus.LOCKED, lock_info
121 # 3. Check if any resource that links to this resource is locked by another
122 # user
123 # Get the resources that link to this resource (reverse links)
124 reverse_links_key = self.generate_reverse_links_key(resource_uri)
125 reverse_links: set[str] = self.redis.smembers(reverse_links_key) # type: ignore[assignment]
127 # Check if any of these resources is locked
128 for linking_uri_item in reverse_links:
129 # Use helper method to standardize format
130 linking_uri = self.decode_redis_item(linking_uri_item)
131 linking_lock_info = self.get_lock_info(linking_uri)
132 if linking_lock_info and linking_lock_info.user_id != str(
133 current_user.orcid
134 ):
135 # Resource that links to this resource is locked by another user
136 return LockStatus.LOCKED, linking_lock_info
138 except RedisError:
139 logger.exception("Error checking lock status for %s", resource_uri)
140 return LockStatus.ERROR, None
141 else:
142 return LockStatus.AVAILABLE, None
144 def decode_redis_item(self, item: bytes | str) -> str:
145 """
146 Helper method to decode Redis items that might be bytes or strings.
148 Args:
149 item: The item to decode (bytes or string)
151 Returns:
152 String representation of the item
153 """
154 if isinstance(item, bytes):
155 return item.decode("utf-8")
156 return str(item)
158 def acquire_lock(self, resource_uri: str, linked_resources: list[str]) -> bool:
159 """
160 Try to acquire a lock on a resource.
161 This method efficiently checks and acquires locks using Redis sets
162 to track relationships between resources.
164 Args:
165 resource_uri: URI of the resource to lock
166 linked_resources: List of linked resources already known
168 Returns:
169 bool: True if lock was acquired, False otherwise
170 """
171 try:
172 # First check if the resource or any related resource is locked by another
173 # user
174 status, _lock_info = self.check_lock_status(resource_uri)
175 if status == LockStatus.LOCKED:
176 return False
178 # Update reverse links in Redis
179 if linked_resources:
180 # Use a pipeline for better performance
181 pipe = self.redis.pipeline()
183 # Store reverse links (resources that are linked to by this resource)
184 for linked_uri in linked_resources:
185 # Add to reverse links (this resource links to the linked resource)
186 reverse_links_key = self.generate_reverse_links_key(linked_uri)
187 pipe.sadd(reverse_links_key, str(resource_uri))
188 pipe.expire(reverse_links_key, self.lock_duration)
190 pipe.execute()
192 # After checking all linked resources, proceed with lock creation
193 result = self.create_resource_lock(
194 resource_uri, cast("User", current_user), linked_resources
195 )
197 except RedisError:
198 logger.exception("Error acquiring lock for %s", resource_uri)
199 return False
200 else:
201 return result
203 def create_resource_lock(
204 self, resource_uri: str, current_user: User, linked_resources: list[str]
205 ) -> bool:
206 """
207 Helper method to create a lock for a resource.
209 Args:
210 resource_uri: URI of the resource to lock
211 current_user: The current user object
212 linked_resources: List of linked resource URIs
214 Returns:
215 bool: True if lock was created successfully
216 """
217 try:
218 # Create or update lock for the main resource
219 lock_key = self.generate_lock_key(resource_uri)
220 lock_data = {
221 "user_id": str(current_user.orcid),
222 "user_name": current_user.name,
223 "timestamp": datetime.now(timezone.utc).isoformat(),
224 "resource_uri": resource_uri,
225 "linked_resources": linked_resources,
226 }
228 # Set the lock with expiration
229 self.redis.setex(lock_key, self.lock_duration, json.dumps(lock_data))
230 except RedisError:
231 logger.exception("Error creating lock for %s", resource_uri)
232 return False
233 else:
234 return True
236 def release_lock(self, resource_uri: str) -> bool:
237 """
238 Release a lock on a resource if owned by the current user.
239 Also cleans up the reverse links.
241 Args:
242 resource_uri: URI of the resource to unlock
244 Returns:
245 bool: True if lock was released, False otherwise
246 """
247 try:
248 lock_info = self.get_lock_info(resource_uri)
250 # If not locked or locked by another user
251 if not lock_info or lock_info.user_id != str(current_user.orcid):
252 return False
254 # Get the linked resources from the lock info
255 linked_resources = lock_info.linked_resources or []
257 # Delete the lock directly (this will raise an exception in the test)
258 self.redis.delete(self.generate_lock_key(resource_uri))
260 # Clean up reverse links
261 for linked_uri in linked_resources:
262 # Ensure we're working with string URIs
263 linked_uri_str = str(linked_uri)
264 reverse_links_key = self.generate_reverse_links_key(linked_uri_str)
265 self.redis.srem(reverse_links_key, str(resource_uri))
267 result = True
269 except RedisError:
270 logger.exception("Error releasing lock for %s", resource_uri)
271 return False
272 else:
273 return result