Coverage for oc_validator / lmdb_cache.py: 97%

238 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 15:46 +0000

1# ISC License 

2# 

3# Copyright (c) 2023-2026, Elia Rizzetto, Silvio Peroni 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any 

6# purpose with or without fee is hereby granted, provided that the above 

7# copyright notice and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, 

12# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM 

13# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR 

14# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR 

15# PERFORMANCE OF THIS SOFTWARE. 

16 

17import lmdb 

18import pickle 

19import os 

20import shutil 

21import tempfile 

22from typing import Optional, Any, Iterator, Union 

23from contextlib import contextmanager 

24 

25 

26class LmdbCache: 

27 """ 

28 A memory-efficient cache using LMDB (Lightning Memory-Mapped Database). 

29  

30 This class provides a dictionary-like interface backed by LMDB, allowing 

31 for constant memory usage regardless of the number of stored items. 

32 Ideal for caching large amounts of data without risking OOM errors. 

33  

34 Usage: 

35 with LmdbCache('my_cache') as cache: 

36 cache['key1'] = 'value1' 

37 value = cache.get('key2') 

38  

39 # Cache is automatically closed when exiting context 

40 """ 

41 

42 def __init__(self, name: str, base_dir: str = '.', map_size: int = 1 * 1024**3): 

43 """ 

44 Initialize LMDB cache. 

45 

46 :param name: Unique name for this cache (used in directory name prefix) 

47 :param base_dir: Base directory under which a new dedicated temporary 

48 directory will be created. Defaults to the current working directory. 

49 :param map_size: Maximum database size in bytes. 

50 On Windows LMDB pre-allocates a file of exactly ``map_size`` bytes; 

51 on Linux/macOS it uses sparse files so actual disk usage equals only 

52 the data written. Increase this value when processing files with 

53 millions of unique IDs (e.g. ``map_size=20*1024**3`` for 20GB). 

54 """ 

55 self.name = name 

56 self._env: Optional[lmdb.Environment] = None 

57 self.map_size = int(os.getenv('LMDB_MAP_SIZE', str(map_size))) # use env variable if specified, else default init value 

58 

59 # Always create a new dedicated temporary directory for this environment 

60 os.makedirs(base_dir, exist_ok=True) 

61 self._temp_dir = tempfile.mkdtemp(prefix=f'lmdb_{name}_', dir=base_dir) 

62 self.path = os.path.join(self._temp_dir, 'cache') 

63 

64 self._is_open = False 

65 

66 def open(self) -> None: 

67 """ 

68 Open the LMDB environment. 

69 

70 No-op if the environment is already open. 

71 

72 :rtype: None 

73 """ 

74 if self._is_open: 

75 return 

76 

77 self._env = lmdb.open( 

78 self.path, 

79 map_size=self.map_size, 

80 max_dbs=1, # Allow one sub-database if needed 

81 writemap=False, # Use write-through for durability 

82 metasync=False, # Flush metadata asynchronously for speed 

83 sync=False, # Don't sync to disk on every commit (speed over durability) 

84 readahead=True # Enable readahead for better sequential read performance 

85 ) 

86 self._is_open = True 

87 

88 def close(self) -> None: 

89 """ 

90 Close the LMDB environment and remove its dedicated temporary directory. 

91 

92 :rtype: None 

93 """ 

94 if self._env is not None: 

95 self._env.close() 

96 self._env = None 

97 self._is_open = False 

98 

99 # Clean up the dedicated temporary directory 

100 if self._temp_dir is not None and os.path.isdir(self._temp_dir): 

101 shutil.rmtree(self._temp_dir, ignore_errors=True) 

102 self._temp_dir = None 

103 

104 def __enter__(self): 

105 """ 

106 Open the environment and return this instance for use in a ``with`` block. 

107 

108 :return: The :class:`LmdbCache` instance. 

109 :rtype: LmdbCache 

110 """ 

111 self.open() 

112 return self 

113 

114 def __exit__(self, exc_type, exc_val, exc_tb): 

115 """ 

116 Close the environment on context exit. 

117 

118 :rtype: None 

119 """ 

120 self.close() 

121 return False 

122 

123 def __del__(self): 

124 """Ensure cleanup on garbage collection.""" 

125 self.close() 

126 

127 def put(self, key: str, value: Any) -> None: 

128 """ 

129 Store a key-value pair. 

130 

131 :param key: String key. 

132 :type key: str 

133 :param value: Any picklable value. 

134 :type value: Any 

135 :raises RuntimeError: if the cache is not open. 

136 :rtype: None 

137 """ 

138 if not self._is_open: 

139 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.") 

140 

141 # Serialize value 

142 serialized_value = pickle.dumps(value) 

143 

144 with self._env.begin(write=True) as txn: 

145 txn.put(key.encode('utf-8'), serialized_value) 

146 

147 def get(self, key: str, default: Any = None) -> Any: 

148 """ 

149 Retrieve a value by key. 

150 

151 :param key: String key. 

152 :type key: str 

153 :param default: Default value if the key is not found. Defaults to ``None``. 

154 :type default: Any 

155 :return: The stored value, or *default* if the key is absent. 

156 :rtype: Any 

157 :raises RuntimeError: if the cache is not open. 

158 """ 

159 if not self._is_open: 

160 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.") 

161 

162 with self._env.begin(write=False) as txn: 

163 value = txn.get(key.encode('utf-8')) 

164 if value is None: 

165 return default 

166 return pickle.loads(value) 

167 

168 def __setitem__(self, key: str, value: Any) -> None: 

169 """ 

170 Allow dict-like assignment: ``cache[key] = value``. 

171 

172 :param key: String key. 

173 :type key: str 

174 :param value: Any picklable value. 

175 :type value: Any 

176 :rtype: None 

177 """ 

178 self.put(key, value) 

179 

180 def __getitem__(self, key: str) -> Any: 

181 """ 

182 Allow dict-like access: ``value = cache[key]``. 

183 

184 :param key: String key. 

185 :type key: str 

186 :return: The stored value. 

187 :rtype: Any 

188 :raises KeyError: if the key is not found. 

189 """ 

190 value = self.get(key) 

191 if value is None: 

192 raise KeyError(f"Key '{key}' not found in cache") 

193 return value 

194 

195 def __contains__(self, key: str) -> bool: 

196 """Allow 'in' operator: if key in cache. 

197 Returns ``False`` for empty strings (LMDB does not allow zero-length keys). 

198 """ 

199 if not key: 

200 return False 

201 if not self._is_open: 

202 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.") 

203 

204 with self._env.begin(write=False) as txn: 

205 return txn.get(key.encode('utf-8')) is not None 

206 

207 def delete(self, key: str) -> None: 

208 """ 

209 Delete a key-value pair. 

210 

211 :param key: String key to delete. 

212 :type key: str 

213 :raises RuntimeError: if the cache is not open. 

214 :rtype: None 

215 """ 

216 if not self._is_open: 

217 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.") 

218 

219 with self._env.begin(write=True) as txn: 

220 txn.delete(key.encode('utf-8')) 

221 

222 def __delitem__(self, key: str) -> None: 

223 """ 

224 Allow dict-like deletion: ``del cache[key]``. 

225 

226 :param key: String key to delete. 

227 :type key: str 

228 :rtype: None 

229 """ 

230 self.delete(key) 

231 

232 def keys(self) -> Iterator[str]: 

233 """ 

234 Iterate over all keys in the cache. 

235 

236 :return: Iterator of key strings. 

237 :rtype: Iterator[str] 

238 """ 

239 if not self._is_open: 

240 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.") 

241 

242 with self._env.begin(write=False) as txn: 

243 cursor = txn.cursor() 

244 for key_bytes in cursor.iternext(keys=True, values=False): 

245 yield key_bytes.decode('utf-8') 

246 

247 def values(self) -> Iterator[Any]: 

248 """ 

249 Iterate over all values in the cache. 

250 

251 :return: Iterator of deserialized values. 

252 :rtype: Iterator[Any] 

253 """ 

254 if not self._is_open: 

255 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.") 

256 

257 with self._env.begin(write=False) as txn: 

258 cursor = txn.cursor() 

259 for value_bytes in cursor.iternext(keys=False, values=True): 

260 yield pickle.loads(value_bytes) 

261 

262 def items(self) -> Iterator[tuple[str, Any]]: 

263 """ 

264 Iterate over all key-value pairs in the cache. 

265 

266 :return: Iterator of ``(key, value)`` tuples. 

267 :rtype: Iterator[tuple[str, Any]] 

268 """ 

269 if not self._is_open: 

270 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.") 

271 

272 with self._env.begin(write=False) as txn: 

273 cursor = txn.cursor() 

274 for key_bytes, value_bytes in cursor: 

275 yield (key_bytes.decode('utf-8'), pickle.loads(value_bytes)) 

276 

277 def __len__(self) -> int: 

278 """ 

279 Return the number of items in the cache. 

280 

281 :return: Entry count. 

282 :rtype: int 

283 """ 

284 if not self._is_open: 

285 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.") 

286 

287 with self._env.begin(write=False) as txn: 

288 return txn.stat()['entries'] 

289 

290 def clear(self) -> None: 

291 """ 

292 Remove all items from the cache. 

293 

294 :raises RuntimeError: if the cache is not open. 

295 :rtype: None 

296 """ 

297 if not self._is_open: 

298 raise RuntimeError("LMDB cache is not open. Use context manager or call open() first.") 

299 

300 with self._env.begin(write=True) as txn: 

301 cursor = txn.cursor() 

302 for key in cursor.iternext(keys=True, values=False): 

303 txn.delete(key) 

304 

305 def __bool__(self) -> bool: 

306 """ 

307 Return ``True`` if the cache contains at least one item. 

308 

309 :return: Boolean indicating whether the cache is non-empty. 

310 :rtype: bool 

311 """ 

312 return len(self) > 0 

313 

314 

315class InMemoryCache: 

316 """ 

317 A simple in-memory cache that mimics the LmdbCache interface. 

318  

319 This is used for small datasets where LMDB overhead is unnecessary. 

320 It provides the same interface as LmdbCache but stores everything in RAM. 

321 """ 

322 

323 def __init__(self, name: str, base_dir: str = '.', max_size: int = 10**10): 

324 """ 

325 Initialize in-memory cache. 

326 

327 :param name: Name of the cache (unused, for API compatibility) 

328 :param base_dir: Unused, for API compatibility 

329 :param max_size: Unused, for API compatibility 

330 """ 

331 self.name = name 

332 self._data: dict = {} 

333 self._is_open = True 

334 

335 def open(self) -> None: 

336 """ 

337 No-op for in-memory cache. Sets the internal open flag. 

338 

339 :rtype: None 

340 """ 

341 self._is_open = True 

342 

343 def close(self) -> None: 

344 """ 

345 No-op for in-memory cache. Clears the internal open flag. 

346 

347 :rtype: None 

348 """ 

349 self._is_open = False 

350 

351 def __enter__(self): 

352 """ 

353 Open the cache and return this instance for use in a ``with`` block. 

354 

355 :return: The :class:`InMemoryCache` instance. 

356 :rtype: InMemoryCache 

357 """ 

358 self.open() 

359 return self 

360 

361 def __exit__(self, exc_type, exc_val, exc_tb): 

362 """ 

363 Close the cache on context exit. 

364 

365 :rtype: None 

366 """ 

367 self.close() 

368 return False 

369 

370 def put(self, key: str, value: Any) -> None: 

371 """ 

372 Store a key-value pair. 

373 

374 :param key: String key. 

375 :type key: str 

376 :param value: Any value. 

377 :type value: Any 

378 :rtype: None 

379 """ 

380 self._data[key] = value 

381 

382 def get(self, key: str, default: Any = None) -> Any: 

383 """ 

384 Retrieve a value by key. 

385 

386 :param key: String key. 

387 :type key: str 

388 :param default: Default value if the key is not found. Defaults to ``None``. 

389 :type default: Any 

390 :return: The stored value, or *default* if the key is absent. 

391 :rtype: Any 

392 """ 

393 return self._data.get(key, default) 

394 

395 def __setitem__(self, key: str, value: Any) -> None: 

396 """Allow dict-like assignment: ``cache[key] = value``.""" 

397 self.put(key, value) 

398 

399 def __getitem__(self, key: str) -> Any: 

400 """ 

401 Allow dict-like access: ``value = cache[key]``. 

402 

403 :raises KeyError: if the key is not found. 

404 """ 

405 if key not in self._data: 

406 raise KeyError(f"Key '{key}' not found in cache") 

407 return self._data[key] 

408 

409 def __contains__(self, key: str) -> bool: 

410 """Return ``True`` if *key* is in the cache.""" 

411 return key in self._data 

412 

413 def delete(self, key: str) -> None: 

414 """ 

415 Delete a key-value pair. 

416 

417 :param key: String key to delete. 

418 :type key: str 

419 :rtype: None 

420 """ 

421 if key in self._data: 

422 del self._data[key] 

423 

424 def __delitem__(self, key: str) -> None: 

425 """Allow dict-like deletion: ``del cache[key]``.""" 

426 self.delete(key) 

427 

428 def keys(self) -> Iterator[str]: 

429 """ 

430 Iterate over all keys. 

431 

432 :return: Iterator of key strings. 

433 :rtype: Iterator[str] 

434 """ 

435 return iter(self._data.keys()) 

436 

437 def values(self) -> Iterator[Any]: 

438 """ 

439 Iterate over all values. 

440 

441 :return: Iterator of values. 

442 :rtype: Iterator[Any] 

443 """ 

444 return iter(self._data.values()) 

445 

446 def items(self) -> Iterator[tuple[str, Any]]: 

447 """ 

448 Iterate over all key-value pairs. 

449 

450 :return: Iterator of ``(key, value)`` tuples. 

451 :rtype: Iterator[tuple[str, Any]] 

452 """ 

453 return iter(self._data.items()) 

454 

455 def __len__(self) -> int: 

456 """ 

457 Return the number of items in the cache. 

458 

459 :return: Entry count. 

460 :rtype: int 

461 """ 

462 return len(self._data) 

463 

464 def clear(self) -> None: 

465 """ 

466 Remove all items from the cache. 

467 

468 :rtype: None 

469 """ 

470 self._data.clear() 

471 

472 def __bool__(self) -> bool: 

473 """Return ``True`` if the cache contains at least one item.""" 

474 return bool(self._data) 

475 

476 

477class LmdbUnionFind: 

478 """ 

479 A Union-Find (Disjoint Set Union) data structure persisted entirely in LMDB. 

480 

481 Each element maps to its parent. Roots map to themselves. 

482 Supports path-compression on ``find``. Union is by arbitrary root 

483 (i.e. root of x becomes child of root of y). 

484 

485 This class does NOT manage the lifecycle of the LMDB environment it 

486 receives; the caller is responsible for opening and closing ``env``. 

487 

488 Typical usage inside a ``try/finally`` block:: 

489 

490 env = lmdb.open(tmp_path, map_size=10 * 1024**3, sync=False) 

491 try: 

492 uf = LmdbUnionFind(env) 

493 uf.union('a', 'b') 

494 uf.union('b', 'c') 

495 for root, members in uf.iter_components(): 

496 print(root, members) 

497 finally: 

498 env.close() 

499 shutil.rmtree(tmp_path) 

500 """ 

501 

502 def __init__(self, env: lmdb.Environment): 

503 """ 

504 :param env: An already-opened ``lmdb.Environment``. 

505 """ 

506 self._env = env 

507 

508 # ------------------------------------------------------------------ 

509 # Core operations 

510 # ------------------------------------------------------------------ 

511 

512 def find(self, x: str) -> str: 

513 """ 

514 Return the root of the component containing *x*. 

515 

516 If *x* has never been seen before it is registered as its own root. 

517 Path compression is applied: all nodes along the path to the root are 

518 updated to point directly to the root. 

519 

520 :param x: Element identifier (arbitrary non-empty string). 

521 :return: Root identifier of the component. 

522 :raises ValueError: if *x* is an empty string (LMDB does not allow 

523 zero-length keys). 

524 """ 

525 if not x: 

526 raise ValueError("LmdbUnionFind: element identifier must be a non-empty string.") 

527 path: list[str] = [] 

528 current = x 

529 

530 # --- read-only traversal to locate root --- 

531 with self._env.begin(write=False) as txn: 

532 while True: 

533 raw = txn.get(current.encode('utf-8')) 

534 if raw is None: 

535 # 'current' (and therefore 'x') has never been seen. 

536 root = current 

537 is_new = True 

538 break 

539 parent = raw.decode('utf-8') 

540 if parent == current: 

541 root = current 

542 is_new = False 

543 break 

544 path.append(current) 

545 current = parent 

546 

547 # --- write phase: register new node and/or apply path compression --- 

548 if is_new or path: 

549 with self._env.begin(write=True) as txn: 

550 if is_new: 

551 # Register x (and any un-initialised ancestors) as self-root. 

552 txn.put(current.encode('utf-8'), current.encode('utf-8')) 

553 # Path compression: point every node on the path directly to root. 

554 root_bytes = root.encode('utf-8') 

555 for node in path: 

556 txn.put(node.encode('utf-8'), root_bytes) 

557 

558 return root 

559 

560 def __contains__(self, x: str) -> bool: 

561 """ 

562 Return ``True`` if *x* has been registered in the Union-Find. 

563 

564 Unlike ``find``, this method does **not** register *x* as a new node 

565 if it is absent. Returns ``False`` immediately for empty strings 

566 (LMDB does not allow zero-length keys). 

567 

568 :param x: Element identifier to test. 

569 :return: ``True`` if *x* is a known element, ``False`` otherwise. 

570 """ 

571 if not x: 

572 return False 

573 with self._env.begin(write=False) as txn: 

574 return txn.get(x.encode('utf-8')) is not None 

575 

576 def union(self, x: str, y: str) -> None: 

577 """ 

578 Merge the components containing *x* and *y*. 

579 

580 After the call, ``find(x) == find(y)``. Specifically the root of *x* 

581 is made a child of the root of *y*. 

582 

583 :param x: First element. 

584 :param y: Second element. 

585 """ 

586 rx = self.find(x) 

587 ry = self.find(y) 

588 if rx != ry: 

589 with self._env.begin(write=True) as txn: 

590 txn.put(rx.encode('utf-8'), ry.encode('utf-8')) 

591 

592 # ------------------------------------------------------------------ 

593 # Component enumeration 

594 # ------------------------------------------------------------------ 

595 

596 def iter_components(self) -> Iterator[tuple[str, set]]: 

597 """ 

598 Iterate over all components, yielding ``(root, members_set)`` pairs. 

599 

600 This performs one full key-scan of the LMDB database followed by one 

601 ``find`` call per element (with path-compression side-effects). 

602 Peak RAM usage is proportional to the number of distinct components, 

603 not the total number of elements. 

604 

605 :return: Iterator of ``(root_str, set_of_member_strings)`` pairs. 

606 """ 

607 # Collect all keys in a single read transaction. 

608 with self._env.begin(write=False) as txn: 

609 all_keys: list[str] = [ 

610 k.decode('utf-8') 

611 for k in txn.cursor().iternext(keys=True, values=False) 

612 ] 

613 

614 # Group by root. find() may write back path-compressed parents but 

615 # that is safe because we are no longer inside the read transaction. 

616 components: dict[str, set] = {} 

617 for key in all_keys: 

618 root = self.find(key) 

619 if root not in components: 

620 components[root] = set() 

621 components[root].add(key) 

622 

623 yield from components.items() 

624 

625 

626class InMemoryUnionFind: 

627 """ 

628 An in-memory Union-Find (Disjoint Set Union) data structure. 

629 

630 Each element maps to its parent. Roots map to themselves. 

631 Supports path-compression on ``find``. Union is by arbitrary root 

632 (i.e. root of x becomes child of root of y). 

633 

634 This is a memory-based alternative to LmdbUnionFind, suitable for 

635 small datasets where LMDB overhead is unnecessary. 

636 

637 Usage:: 

638 

639 uf = InMemoryUnionFind() 

640 uf.union('a', 'b') 

641 uf.union('b', 'c') 

642 for root, members in uf.iter_components(): 

643 print(root, members) 

644 """ 

645 

646 def __init__(self): 

647 """ 

648 Initialize an empty Union-Find structure. 

649 """ 

650 self._data: dict[str, str] = {} 

651 

652 # ------------------------------------------------------------------ 

653 # Core operations 

654 # ------------------------------------------------------------------ 

655 

656 def find(self, x: str) -> str: 

657 """ 

658 Return the root of the component containing *x*. 

659 

660 If *x* has never been seen before it is registered as its own root. 

661 Path compression is applied: all nodes along the path to the root are 

662 updated to point directly to the root. 

663 

664 :param x: Element identifier (arbitrary non-empty string). 

665 :return: Root identifier of the component. 

666 :raises ValueError: if *x* is an empty string. 

667 """ 

668 if not x: 

669 raise ValueError("InMemoryUnionFind: element identifier must be a non-empty string.") 

670 

671 # If x is not registered, register it as its own root 

672 if x not in self._data: 

673 self._data[x] = x 

674 return x 

675 

676 # Find root with path compression 

677 path: list[str] = [] 

678 current = x 

679 while current != self._data[current]: 

680 path.append(current) 

681 current = self._data[current] 

682 root = current 

683 

684 # Apply path compression 

685 for node in path: 

686 self._data[node] = root 

687 

688 return root 

689 

690 def __contains__(self, x: str) -> bool: 

691 """ 

692 Return ``True`` if *x* has been registered in the Union-Find. 

693 

694 Unlike ``find``, this method does **not** register *x* as a new node 

695 if it is absent. 

696 

697 :param x: Element identifier to test. 

698 :return: ``True`` if *x* is a known element, ``False`` otherwise. 

699 """ 

700 if not x: 

701 return False 

702 return x in self._data 

703 

704 def union(self, x: str, y: str) -> None: 

705 """ 

706 Merge the components containing *x* and *y*. 

707 

708 After the call, ``find(x) == find(y)``. Specifically the root of *x* 

709 is made a child of the root of *y*. 

710 

711 :param x: First element. 

712 :param y: Second element. 

713 """ 

714 rx = self.find(x) 

715 ry = self.find(y) 

716 if rx != ry: 

717 self._data[rx] = ry 

718 

719 # ------------------------------------------------------------------ 

720 # Component enumeration 

721 # ------------------------------------------------------------------ 

722 

723 def iter_components(self) -> Iterator[tuple[str, set]]: 

724 """ 

725 Iterate over all components, yielding ``(root, members_set)`` pairs. 

726 

727 This performs one full iteration over all elements followed by one 

728 ``find`` call per element (with path-compression side-effects). 

729 Peak RAM usage is proportional to the number of distinct components, 

730 not the total number of elements. 

731 

732 :return: Iterator of ``(root_str, set_of_member_strings)`` pairs. 

733 """ 

734 # Group elements by root 

735 components: dict[str, set] = {} 

736 for key in self._data: 

737 root = self.find(key) 

738 if root not in components: 

739 components[root] = set() 

740 components[root].add(key) 

741 

742 yield from components.items() 

743 

744 

745# Type alias for Union-Find implementations 

746UnionFind = Union[LmdbUnionFind, InMemoryUnionFind]