Coverage for oc_validator / lmdb_cache.py: 97%
238 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 15:46 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 15:46 +0000
1# ISC License
2#
3# Copyright (c) 2023-2026, Elia Rizzetto, Silvio Peroni
4#
5# Permission to use, copy, modify, and/or distribute this software for any
6# purpose with or without fee is hereby granted, provided that the above
7# copyright notice and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
12# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
13# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
14# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
15# PERFORMANCE OF THIS SOFTWARE.
17import lmdb
18import pickle
19import os
20import shutil
21import tempfile
22from typing import Optional, Any, Iterator, Union
23from contextlib import contextmanager
26class LmdbCache:
27 """
28 A memory-efficient cache using LMDB (Lightning Memory-Mapped Database).
30 This class provides a dictionary-like interface backed by LMDB, allowing
31 for constant memory usage regardless of the number of stored items.
32 Ideal for caching large amounts of data without risking OOM errors.
34 Usage:
35 with LmdbCache('my_cache') as cache:
36 cache['key1'] = 'value1'
37 value = cache.get('key2')
39 # Cache is automatically closed when exiting context
40 """
42 def __init__(self, name: str, base_dir: str = '.', map_size: int = 1 * 1024**3):
43 """
44 Initialize LMDB cache.
46 :param name: Unique name for this cache (used in directory name prefix)
47 :param base_dir: Base directory under which a new dedicated temporary
48 directory will be created. Defaults to the current working directory.
49 :param map_size: Maximum database size in bytes.
50 On Windows LMDB pre-allocates a file of exactly ``map_size`` bytes;
51 on Linux/macOS it uses sparse files so actual disk usage equals only
52 the data written. Increase this value when processing files with
53 millions of unique IDs (e.g. ``map_size=20*1024**3`` for 20GB).
54 """
55 self.name = name
56 self._env: Optional[lmdb.Environment] = None
57 self.map_size = int(os.getenv('LMDB_MAP_SIZE', str(map_size))) # use env variable if specified, else default init value
59 # Always create a new dedicated temporary directory for this environment
60 os.makedirs(base_dir, exist_ok=True)
61 self._temp_dir = tempfile.mkdtemp(prefix=f'lmdb_{name}_', dir=base_dir)
62 self.path = os.path.join(self._temp_dir, 'cache')
64 self._is_open = False
66 def open(self) -> None:
67 """
68 Open the LMDB environment.
70 No-op if the environment is already open.
72 :rtype: None
73 """
74 if self._is_open:
75 return
77 self._env = lmdb.open(
78 self.path,
79 map_size=self.map_size,
80 max_dbs=1, # Allow one sub-database if needed
81 writemap=False, # Use write-through for durability
82 metasync=False, # Flush metadata asynchronously for speed
83 sync=False, # Don't sync to disk on every commit (speed over durability)
84 readahead=True # Enable readahead for better sequential read performance
85 )
86 self._is_open = True
88 def close(self) -> None:
89 """
90 Close the LMDB environment and remove its dedicated temporary directory.
92 :rtype: None
93 """
94 if self._env is not None:
95 self._env.close()
96 self._env = None
97 self._is_open = False
99 # Clean up the dedicated temporary directory
100 if self._temp_dir is not None and os.path.isdir(self._temp_dir):
101 shutil.rmtree(self._temp_dir, ignore_errors=True)
102 self._temp_dir = None
104 def __enter__(self):
105 """
106 Open the environment and return this instance for use in a ``with`` block.
108 :return: The :class:`LmdbCache` instance.
109 :rtype: LmdbCache
110 """
111 self.open()
112 return self
114 def __exit__(self, exc_type, exc_val, exc_tb):
115 """
116 Close the environment on context exit.
118 :rtype: None
119 """
120 self.close()
121 return False
123 def __del__(self):
124 """Ensure cleanup on garbage collection."""
125 self.close()
127 def put(self, key: str, value: Any) -> None:
128 """
129 Store a key-value pair.
131 :param key: String key.
132 :type key: str
133 :param value: Any picklable value.
134 :type value: Any
135 :raises RuntimeError: if the cache is not open.
136 :rtype: None
137 """
138 if not self._is_open:
139 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.")
141 # Serialize value
142 serialized_value = pickle.dumps(value)
144 with self._env.begin(write=True) as txn:
145 txn.put(key.encode('utf-8'), serialized_value)
147 def get(self, key: str, default: Any = None) -> Any:
148 """
149 Retrieve a value by key.
151 :param key: String key.
152 :type key: str
153 :param default: Default value if the key is not found. Defaults to ``None``.
154 :type default: Any
155 :return: The stored value, or *default* if the key is absent.
156 :rtype: Any
157 :raises RuntimeError: if the cache is not open.
158 """
159 if not self._is_open:
160 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.")
162 with self._env.begin(write=False) as txn:
163 value = txn.get(key.encode('utf-8'))
164 if value is None:
165 return default
166 return pickle.loads(value)
168 def __setitem__(self, key: str, value: Any) -> None:
169 """
170 Allow dict-like assignment: ``cache[key] = value``.
172 :param key: String key.
173 :type key: str
174 :param value: Any picklable value.
175 :type value: Any
176 :rtype: None
177 """
178 self.put(key, value)
180 def __getitem__(self, key: str) -> Any:
181 """
182 Allow dict-like access: ``value = cache[key]``.
184 :param key: String key.
185 :type key: str
186 :return: The stored value.
187 :rtype: Any
188 :raises KeyError: if the key is not found.
189 """
190 value = self.get(key)
191 if value is None:
192 raise KeyError(f"Key '{key}' not found in cache")
193 return value
195 def __contains__(self, key: str) -> bool:
196 """Allow 'in' operator: if key in cache.
197 Returns ``False`` for empty strings (LMDB does not allow zero-length keys).
198 """
199 if not key:
200 return False
201 if not self._is_open:
202 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.")
204 with self._env.begin(write=False) as txn:
205 return txn.get(key.encode('utf-8')) is not None
207 def delete(self, key: str) -> None:
208 """
209 Delete a key-value pair.
211 :param key: String key to delete.
212 :type key: str
213 :raises RuntimeError: if the cache is not open.
214 :rtype: None
215 """
216 if not self._is_open:
217 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.")
219 with self._env.begin(write=True) as txn:
220 txn.delete(key.encode('utf-8'))
222 def __delitem__(self, key: str) -> None:
223 """
224 Allow dict-like deletion: ``del cache[key]``.
226 :param key: String key to delete.
227 :type key: str
228 :rtype: None
229 """
230 self.delete(key)
232 def keys(self) -> Iterator[str]:
233 """
234 Iterate over all keys in the cache.
236 :return: Iterator of key strings.
237 :rtype: Iterator[str]
238 """
239 if not self._is_open:
240 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.")
242 with self._env.begin(write=False) as txn:
243 cursor = txn.cursor()
244 for key_bytes in cursor.iternext(keys=True, values=False):
245 yield key_bytes.decode('utf-8')
247 def values(self) -> Iterator[Any]:
248 """
249 Iterate over all values in the cache.
251 :return: Iterator of deserialized values.
252 :rtype: Iterator[Any]
253 """
254 if not self._is_open:
255 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.")
257 with self._env.begin(write=False) as txn:
258 cursor = txn.cursor()
259 for value_bytes in cursor.iternext(keys=False, values=True):
260 yield pickle.loads(value_bytes)
262 def items(self) -> Iterator[tuple[str, Any]]:
263 """
264 Iterate over all key-value pairs in the cache.
266 :return: Iterator of ``(key, value)`` tuples.
267 :rtype: Iterator[tuple[str, Any]]
268 """
269 if not self._is_open:
270 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.")
272 with self._env.begin(write=False) as txn:
273 cursor = txn.cursor()
274 for key_bytes, value_bytes in cursor:
275 yield (key_bytes.decode('utf-8'), pickle.loads(value_bytes))
277 def __len__(self) -> int:
278 """
279 Return the number of items in the cache.
281 :return: Entry count.
282 :rtype: int
283 """
284 if not self._is_open:
285 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.")
287 with self._env.begin(write=False) as txn:
288 return txn.stat()['entries']
290 def clear(self) -> None:
291 """
292 Remove all items from the cache.
294 :raises RuntimeError: if the cache is not open.
295 :rtype: None
296 """
297 if not self._is_open:
298 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.")
300 with self._env.begin(write=True) as txn:
301 cursor = txn.cursor()
302 for key in cursor.iternext(keys=True, values=False):
303 txn.delete(key)
305 def __bool__(self) -> bool:
306 """
307 Return ``True`` if the cache contains at least one item.
309 :return: Boolean indicating whether the cache is non-empty.
310 :rtype: bool
311 """
312 return len(self) > 0
315class InMemoryCache:
316 """
317 A simple in-memory cache that mimics the LmdbCache interface.
319 This is used for small datasets where LMDB overhead is unnecessary.
320 It provides the same interface as LmdbCache but stores everything in RAM.
321 """
323 def __init__(self, name: str, base_dir: str = '.', max_size: int = 10**10):
324 """
325 Initialize in-memory cache.
327 :param name: Name of the cache (unused, for API compatibility)
328 :param base_dir: Unused, for API compatibility
329 :param max_size: Unused, for API compatibility
330 """
331 self.name = name
332 self._data: dict = {}
333 self._is_open = True
335 def open(self) -> None:
336 """
337 No-op for in-memory cache. Sets the internal open flag.
339 :rtype: None
340 """
341 self._is_open = True
343 def close(self) -> None:
344 """
345 No-op for in-memory cache. Clears the internal open flag.
347 :rtype: None
348 """
349 self._is_open = False
351 def __enter__(self):
352 """
353 Open the cache and return this instance for use in a ``with`` block.
355 :return: The :class:`InMemoryCache` instance.
356 :rtype: InMemoryCache
357 """
358 self.open()
359 return self
361 def __exit__(self, exc_type, exc_val, exc_tb):
362 """
363 Close the cache on context exit.
365 :rtype: None
366 """
367 self.close()
368 return False
370 def put(self, key: str, value: Any) -> None:
371 """
372 Store a key-value pair.
374 :param key: String key.
375 :type key: str
376 :param value: Any value.
377 :type value: Any
378 :rtype: None
379 """
380 self._data[key] = value
382 def get(self, key: str, default: Any = None) -> Any:
383 """
384 Retrieve a value by key.
386 :param key: String key.
387 :type key: str
388 :param default: Default value if the key is not found. Defaults to ``None``.
389 :type default: Any
390 :return: The stored value, or *default* if the key is absent.
391 :rtype: Any
392 """
393 return self._data.get(key, default)
395 def __setitem__(self, key: str, value: Any) -> None:
396 """Allow dict-like assignment: ``cache[key] = value``."""
397 self.put(key, value)
399 def __getitem__(self, key: str) -> Any:
400 """
401 Allow dict-like access: ``value = cache[key]``.
403 :raises KeyError: if the key is not found.
404 """
405 if key not in self._data:
406 raise KeyError(f"Key '{key}' not found in cache")
407 return self._data[key]
409 def __contains__(self, key: str) -> bool:
410 """Return ``True`` if *key* is in the cache."""
411 return key in self._data
413 def delete(self, key: str) -> None:
414 """
415 Delete a key-value pair.
417 :param key: String key to delete.
418 :type key: str
419 :rtype: None
420 """
421 if key in self._data:
422 del self._data[key]
424 def __delitem__(self, key: str) -> None:
425 """Allow dict-like deletion: ``del cache[key]``."""
426 self.delete(key)
428 def keys(self) -> Iterator[str]:
429 """
430 Iterate over all keys.
432 :return: Iterator of key strings.
433 :rtype: Iterator[str]
434 """
435 return iter(self._data.keys())
437 def values(self) -> Iterator[Any]:
438 """
439 Iterate over all values.
441 :return: Iterator of values.
442 :rtype: Iterator[Any]
443 """
444 return iter(self._data.values())
446 def items(self) -> Iterator[tuple[str, Any]]:
447 """
448 Iterate over all key-value pairs.
450 :return: Iterator of ``(key, value)`` tuples.
451 :rtype: Iterator[tuple[str, Any]]
452 """
453 return iter(self._data.items())
455 def __len__(self) -> int:
456 """
457 Return the number of items in the cache.
459 :return: Entry count.
460 :rtype: int
461 """
462 return len(self._data)
464 def clear(self) -> None:
465 """
466 Remove all items from the cache.
468 :rtype: None
469 """
470 self._data.clear()
472 def __bool__(self) -> bool:
473 """Return ``True`` if the cache contains at least one item."""
474 return bool(self._data)
477class LmdbUnionFind:
478 """
479 A Union-Find (Disjoint Set Union) data structure persisted entirely in LMDB.
481 Each element maps to its parent. Roots map to themselves.
482 Supports path-compression on ``find``. Union is by arbitrary root
483 (i.e. root of x becomes child of root of y).
485 This class does NOT manage the lifecycle of the LMDB environment it
486 receives; the caller is responsible for opening and closing ``env``.
488 Typical usage inside a ``try/finally`` block::
490 env = lmdb.open(tmp_path, map_size=10 * 1024**3, sync=False)
491 try:
492 uf = LmdbUnionFind(env)
493 uf.union('a', 'b')
494 uf.union('b', 'c')
495 for root, members in uf.iter_components():
496 print(root, members)
497 finally:
498 env.close()
499 shutil.rmtree(tmp_path)
500 """
502 def __init__(self, env: lmdb.Environment):
503 """
504 :param env: An already-opened ``lmdb.Environment``.
505 """
506 self._env = env
508 # ------------------------------------------------------------------
509 # Core operations
510 # ------------------------------------------------------------------
512 def find(self, x: str) -> str:
513 """
514 Return the root of the component containing *x*.
516 If *x* has never been seen before it is registered as its own root.
517 Path compression is applied: all nodes along the path to the root are
518 updated to point directly to the root.
520 :param x: Element identifier (arbitrary non-empty string).
521 :return: Root identifier of the component.
522 :raises ValueError: if *x* is an empty string (LMDB does not allow
523 zero-length keys).
524 """
525 if not x:
526 raise ValueError("LmdbUnionFind: element identifier must be a non-empty string.")
527 path: list[str] = []
528 current = x
530 # --- read-only traversal to locate root ---
531 with self._env.begin(write=False) as txn:
532 while True:
533 raw = txn.get(current.encode('utf-8'))
534 if raw is None:
535 # 'current' (and therefore 'x') has never been seen.
536 root = current
537 is_new = True
538 break
539 parent = raw.decode('utf-8')
540 if parent == current:
541 root = current
542 is_new = False
543 break
544 path.append(current)
545 current = parent
547 # --- write phase: register new node and/or apply path compression ---
548 if is_new or path:
549 with self._env.begin(write=True) as txn:
550 if is_new:
551 # Register x (and any un-initialised ancestors) as self-root.
552 txn.put(current.encode('utf-8'), current.encode('utf-8'))
553 # Path compression: point every node on the path directly to root.
554 root_bytes = root.encode('utf-8')
555 for node in path:
556 txn.put(node.encode('utf-8'), root_bytes)
558 return root
560 def __contains__(self, x: str) -> bool:
561 """
562 Return ``True`` if *x* has been registered in the Union-Find.
564 Unlike ``find``, this method does **not** register *x* as a new node
565 if it is absent. Returns ``False`` immediately for empty strings
566 (LMDB does not allow zero-length keys).
568 :param x: Element identifier to test.
569 :return: ``True`` if *x* is a known element, ``False`` otherwise.
570 """
571 if not x:
572 return False
573 with self._env.begin(write=False) as txn:
574 return txn.get(x.encode('utf-8')) is not None
576 def union(self, x: str, y: str) -> None:
577 """
578 Merge the components containing *x* and *y*.
580 After the call, ``find(x) == find(y)``. Specifically the root of *x*
581 is made a child of the root of *y*.
583 :param x: First element.
584 :param y: Second element.
585 """
586 rx = self.find(x)
587 ry = self.find(y)
588 if rx != ry:
589 with self._env.begin(write=True) as txn:
590 txn.put(rx.encode('utf-8'), ry.encode('utf-8'))
592 # ------------------------------------------------------------------
593 # Component enumeration
594 # ------------------------------------------------------------------
596 def iter_components(self) -> Iterator[tuple[str, set]]:
597 """
598 Iterate over all components, yielding ``(root, members_set)`` pairs.
600 This performs one full key-scan of the LMDB database followed by one
601 ``find`` call per element (with path-compression side-effects).
602 Peak RAM usage is proportional to the number of distinct components,
603 not the total number of elements.
605 :return: Iterator of ``(root_str, set_of_member_strings)`` pairs.
606 """
607 # Collect all keys in a single read transaction.
608 with self._env.begin(write=False) as txn:
609 all_keys: list[str] = [
610 k.decode('utf-8')
611 for k in txn.cursor().iternext(keys=True, values=False)
612 ]
614 # Group by root. find() may write back path-compressed parents but
615 # that is safe because we are no longer inside the read transaction.
616 components: dict[str, set] = {}
617 for key in all_keys:
618 root = self.find(key)
619 if root not in components:
620 components[root] = set()
621 components[root].add(key)
623 yield from components.items()
626class InMemoryUnionFind:
627 """
628 An in-memory Union-Find (Disjoint Set Union) data structure.
630 Each element maps to its parent. Roots map to themselves.
631 Supports path-compression on ``find``. Union is by arbitrary root
632 (i.e. root of x becomes child of root of y).
634 This is a memory-based alternative to LmdbUnionFind, suitable for
635 small datasets where LMDB overhead is unnecessary.
637 Usage::
639 uf = InMemoryUnionFind()
640 uf.union('a', 'b')
641 uf.union('b', 'c')
642 for root, members in uf.iter_components():
643 print(root, members)
644 """
646 def __init__(self):
647 """
648 Initialize an empty Union-Find structure.
649 """
650 self._data: dict[str, str] = {}
652 # ------------------------------------------------------------------
653 # Core operations
654 # ------------------------------------------------------------------
656 def find(self, x: str) -> str:
657 """
658 Return the root of the component containing *x*.
660 If *x* has never been seen before it is registered as its own root.
661 Path compression is applied: all nodes along the path to the root are
662 updated to point directly to the root.
664 :param x: Element identifier (arbitrary non-empty string).
665 :return: Root identifier of the component.
666 :raises ValueError: if *x* is an empty string.
667 """
668 if not x:
669 raise ValueError("InMemoryUnionFind: element identifier must be a non-empty string.")
671 # If x is not registered, register it as its own root
672 if x not in self._data:
673 self._data[x] = x
674 return x
676 # Find root with path compression
677 path: list[str] = []
678 current = x
679 while current != self._data[current]:
680 path.append(current)
681 current = self._data[current]
682 root = current
684 # Apply path compression
685 for node in path:
686 self._data[node] = root
688 return root
690 def __contains__(self, x: str) -> bool:
691 """
692 Return ``True`` if *x* has been registered in the Union-Find.
694 Unlike ``find``, this method does **not** register *x* as a new node
695 if it is absent.
697 :param x: Element identifier to test.
698 :return: ``True`` if *x* is a known element, ``False`` otherwise.
699 """
700 if not x:
701 return False
702 return x in self._data
704 def union(self, x: str, y: str) -> None:
705 """
706 Merge the components containing *x* and *y*.
708 After the call, ``find(x) == find(y)``. Specifically the root of *x*
709 is made a child of the root of *y*.
711 :param x: First element.
712 :param y: Second element.
713 """
714 rx = self.find(x)
715 ry = self.find(y)
716 if rx != ry:
717 self._data[rx] = ry
719 # ------------------------------------------------------------------
720 # Component enumeration
721 # ------------------------------------------------------------------
723 def iter_components(self) -> Iterator[tuple[str, set]]:
724 """
725 Iterate over all components, yielding ``(root, members_set)`` pairs.
727 This performs one full iteration over all elements followed by one
728 ``find`` call per element (with path-compression side-effects).
729 Peak RAM usage is proportional to the number of distinct components,
730 not the total number of elements.
732 :return: Iterator of ``(root_str, set_of_member_strings)`` pairs.
733 """
734 # Group elements by root
735 components: dict[str, set] = {}
736 for key in self._data:
737 root = self.find(key)
738 if root not in components:
739 components[root] = set()
740 components[root].add(key)
742 yield from components.items()
745# Type alias for Union-Find implementations
746UnionFind = Union[LmdbUnionFind, InMemoryUnionFind]