Coverage for ramose / auth.py: 100%

35 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-01 13:49 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7import hashlib 

8import secrets 

9import sqlite3 

10import time 

11from pathlib import Path 

12 

13 

14class TokenStore: 

15 def __init__(self, directory: str) -> None: 

16 db_dir = Path(directory) 

17 db_dir.mkdir(parents=True, exist_ok=True) 

18 self._conn = sqlite3.connect(str(db_dir / "auth.db"), check_same_thread=False) 

19 self._conn.execute( 

20 "CREATE TABLE IF NOT EXISTS tokens " 

21 "(token_hash TEXT PRIMARY KEY, label TEXT, created_at REAL NOT NULL, " 

22 "expires_at REAL, revoked INTEGER NOT NULL DEFAULT 0)", 

23 ) 

24 self._conn.commit() 

25 

26 @staticmethod 

27 def _hash(token: str) -> str: 

28 return hashlib.sha256(token.encode("utf-8")).hexdigest() 

29 

30 def create(self, label: str, ttl: int | None = None) -> str: 

31 token = secrets.token_urlsafe(32) 

32 now = time.time() 

33 expires_at = now + ttl if ttl is not None else None 

34 self._conn.execute( 

35 "INSERT INTO tokens (token_hash, label, created_at, expires_at) VALUES (?, ?, ?, ?)", 

36 (TokenStore._hash(token), label, now, expires_at), 

37 ) 

38 self._conn.commit() 

39 return token 

40 

41 def validate(self, token: str) -> bool: 

42 row = self._conn.execute( 

43 "SELECT expires_at FROM tokens WHERE token_hash = ? AND revoked = 0", 

44 (TokenStore._hash(token),), 

45 ).fetchone() 

46 if row is None: 

47 return False 

48 expires_at = row[0] 

49 return expires_at is None or expires_at > time.time() 

50 

51 def revoke(self, token: str) -> bool: 

52 cursor = self._conn.execute( 

53 "UPDATE tokens SET revoked = 1 WHERE token_hash = ?", 

54 (TokenStore._hash(token),), 

55 ) 

56 self._conn.commit() 

57 return cursor.rowcount > 0 

58 

59 def list_tokens(self) -> list[tuple[str, float, float | None, int]]: 

60 return self._conn.execute( 

61 "SELECT label, created_at, expires_at, revoked FROM tokens ORDER BY created_at", 

62 ).fetchall()