Coverage for meta_prov_fixer / dry_run_utils.py: 0%
45 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:12 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:12 +0000
1"""
2Utilities for dry-run mode, including callbacks for writing issues to JSON-Lines files.
3"""
4import os
5import time
6import json
7from pathlib import Path
8from datetime import datetime
9from typing import Callable, Optional
10from meta_prov_fixer.utils import make_json_safe
13def create_dry_run_issues_callback(
14 output_dir: str,
15 max_lines_per_file: int = 100,
16 process_id: Optional[str] = None
17) -> Callable:
18 """
19 Creates a callback function for dry_run mode that writes issues to JSON-Lines files.
21 Each line in the JSON-Lines file has the structure:
22 {filepath:str, ff:list, dt:list, mps:list, pa:list, mo:list}
24 This implementation is safe for parallel execution when used with run_parallel_fix.py,
25 as it uses unique file naming based on process_id and atomic writes.
27 Args:
28 output_dir: Directory where issues files will be written.
29 max_lines_per_file: Maximum number of lines per file. Default is 100.
30 process_id: Optional identifier for parallel execution (e.g., directory name like 'br', 'ar').
31 If provided, will be included in the output filename for uniqueness.
33 Returns:
34 A callback function compatible with fix_provenance_process() that accepts
35 (file_path, (ff_issues, dt_issues, mps_issues, pa_issues, mo_issues))
36 """
37 # Create output directory
38 output_dir = Path(output_dir).resolve()
39 output_dir.mkdir(parents=True, exist_ok=True)
41 # Generate a unique session ID when callback is created (not per file)
42 # This ensures all files from this callback instance share the same session identifier
43 # Use microseconds to ensure uniqueness even when callbacks are created rapidly
44 session_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')
45 session_pid = os.getpid()
47 # Create a unique session ID that combines timestamp, PID, and optionally process_id
48 # This prevents any possibility of filename collisions, even with parallel execution
49 if process_id:
50 session_id = f"{process_id}_{session_timestamp}_{session_pid}"
51 else:
52 session_id = f"{session_timestamp}_{session_pid}"
54 # State variables for the callback
55 state = {
56 'lines_written': 0,
57 'current_file_number': 0,
58 'current_file_path': None
59 }
61 def _get_new_filename() -> Path:
62 """
63 Generate a new unique filename for the next chunk.
65 The filename includes the session_id (generated once when callback is created),
66 ensuring all files from the same session are grouped together and
67 preventing any possibility of overwriting files from other sessions,
68 even with parallel execution.
69 """
70 filename = f"dry_run_issues_{session_id}_chunk{state['current_file_number']}.jsonl"
71 return output_dir / filename
73 def _atomic_write_line(filepath: Path, line: str):
74 """
75 Append a line to a file atomically using a temporary file and os.replace().
76 This ensures thread/process safety for concurrent writes.
77 """
78 # Read existing content if file exists
79 existing_lines = []
80 if filepath.exists():
81 with open(filepath, 'r', encoding='utf-8') as f:
82 existing_lines = f.readlines()
84 # Add new line
85 existing_lines.append(line + '\n')
87 # Write to temporary file
88 tmp_path = filepath.with_suffix('.tmp')
89 with open(tmp_path, 'w', encoding='utf-8') as f:
90 f.writelines(existing_lines)
92 # Atomically replace (os.replace is atomic on Unix and Windows)
93 os.replace(tmp_path, filepath)
95 def dry_run_callback(
96 file_path: str,
97 issues: tuple
98 ):
99 """
100 Callback function that writes issues to JSON-Lines files.
102 Args:
103 file_path: Path to the processed file.
104 issues: Tuple of (ff_issues, dt_issues, mps_issues, pa_issues, mo_issues)
105 """
106 ff_issues, dt_issues, mps_issues, pa_issues, mo_issues = issues
108 # Create JSON-safe version of issues using make_json_safe
109 # This converts rdflib.URIRef and other objects to strings
110 safe_ff = make_json_safe(ff_issues)
111 safe_dt = make_json_safe(dt_issues)
112 safe_mps = make_json_safe(mps_issues)
113 safe_pa = make_json_safe(pa_issues)
114 safe_mo = make_json_safe(mo_issues)
116 # Prepare the record
117 record = {
118 'filepath': file_path,
119 'ff': safe_ff,
120 'dt': safe_dt,
121 'mps': safe_mps,
122 'pa': safe_pa,
123 'mo': safe_mo
124 }
126 # Convert to JSON line
127 json_line = json.dumps(record, ensure_ascii=False)
129 # Check if we need a new file
130 if (state['current_file_path'] is None or
131 state['lines_written'] >= max_lines_per_file):
133 # Update file number
134 state['current_file_number'] += 1
135 state['current_file_path'] = _get_new_filename()
136 state['lines_written'] = 0
138 # Write the line atomically
139 _atomic_write_line(state['current_file_path'], json_line)
140 state['lines_written'] += 1
142 return dry_run_callback