Coverage for meta_prov_fixer / dry_run_utils.py: 0%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 15:12 +0000

1""" 

2Utilities for dry-run mode, including callbacks for writing issues to JSON-Lines files. 

3""" 

4import os 

5import time 

6import json 

7from pathlib import Path 

8from datetime import datetime 

9from typing import Callable, Optional 

10from meta_prov_fixer.utils import make_json_safe 

11 

12 

13def create_dry_run_issues_callback( 

14 output_dir: str, 

15 max_lines_per_file: int = 100, 

16 process_id: Optional[str] = None 

17) -> Callable: 

18 """ 

19 Creates a callback function for dry_run mode that writes issues to JSON-Lines files. 

20 

21 Each line in the JSON-Lines file has the structure: 

22 {filepath:str, ff:list, dt:list, mps:list, pa:list, mo:list} 

23 

24 This implementation is safe for parallel execution when used with run_parallel_fix.py, 

25 as it uses unique file naming based on process_id and atomic writes. 

26 

27 Args: 

28 output_dir: Directory where issues files will be written. 

29 max_lines_per_file: Maximum number of lines per file. Default is 100. 

30 process_id: Optional identifier for parallel execution (e.g., directory name like 'br', 'ar'). 

31 If provided, will be included in the output filename for uniqueness. 

32 

33 Returns: 

34 A callback function compatible with fix_provenance_process() that accepts 

35 (file_path, (ff_issues, dt_issues, mps_issues, pa_issues, mo_issues)) 

36 """ 

37 # Create output directory 

38 output_dir = Path(output_dir).resolve() 

39 output_dir.mkdir(parents=True, exist_ok=True) 

40 

41 # Generate a unique session ID when callback is created (not per file) 

42 # This ensures all files from this callback instance share the same session identifier 

43 # Use microseconds to ensure uniqueness even when callbacks are created rapidly 

44 session_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f') 

45 session_pid = os.getpid() 

46 

47 # Create a unique session ID that combines timestamp, PID, and optionally process_id 

48 # This prevents any possibility of filename collisions, even with parallel execution 

49 if process_id: 

50 session_id = f"{process_id}_{session_timestamp}_{session_pid}" 

51 else: 

52 session_id = f"{session_timestamp}_{session_pid}" 

53 

54 # State variables for the callback 

55 state = { 

56 'lines_written': 0, 

57 'current_file_number': 0, 

58 'current_file_path': None 

59 } 

60 

61 def _get_new_filename() -> Path: 

62 """ 

63 Generate a new unique filename for the next chunk. 

64  

65 The filename includes the session_id (generated once when callback is created), 

66 ensuring all files from the same session are grouped together and 

67 preventing any possibility of overwriting files from other sessions, 

68 even with parallel execution. 

69 """ 

70 filename = f"dry_run_issues_{session_id}_chunk{state['current_file_number']}.jsonl" 

71 return output_dir / filename 

72 

73 def _atomic_write_line(filepath: Path, line: str): 

74 """ 

75 Append a line to a file atomically using a temporary file and os.replace(). 

76 This ensures thread/process safety for concurrent writes. 

77 """ 

78 # Read existing content if file exists 

79 existing_lines = [] 

80 if filepath.exists(): 

81 with open(filepath, 'r', encoding='utf-8') as f: 

82 existing_lines = f.readlines() 

83 

84 # Add new line 

85 existing_lines.append(line + '\n') 

86 

87 # Write to temporary file 

88 tmp_path = filepath.with_suffix('.tmp') 

89 with open(tmp_path, 'w', encoding='utf-8') as f: 

90 f.writelines(existing_lines) 

91 

92 # Atomically replace (os.replace is atomic on Unix and Windows) 

93 os.replace(tmp_path, filepath) 

94 

95 def dry_run_callback( 

96 file_path: str, 

97 issues: tuple 

98 ): 

99 """ 

100 Callback function that writes issues to JSON-Lines files. 

101 

102 Args: 

103 file_path: Path to the processed file. 

104 issues: Tuple of (ff_issues, dt_issues, mps_issues, pa_issues, mo_issues) 

105 """ 

106 ff_issues, dt_issues, mps_issues, pa_issues, mo_issues = issues 

107 

108 # Create JSON-safe version of issues using make_json_safe 

109 # This converts rdflib.URIRef and other objects to strings 

110 safe_ff = make_json_safe(ff_issues) 

111 safe_dt = make_json_safe(dt_issues) 

112 safe_mps = make_json_safe(mps_issues) 

113 safe_pa = make_json_safe(pa_issues) 

114 safe_mo = make_json_safe(mo_issues) 

115 

116 # Prepare the record 

117 record = { 

118 'filepath': file_path, 

119 'ff': safe_ff, 

120 'dt': safe_dt, 

121 'mps': safe_mps, 

122 'pa': safe_pa, 

123 'mo': safe_mo 

124 } 

125 

126 # Convert to JSON line 

127 json_line = json.dumps(record, ensure_ascii=False) 

128 

129 # Check if we need a new file 

130 if (state['current_file_path'] is None or 

131 state['lines_written'] >= max_lines_per_file): 

132 

133 # Update file number 

134 state['current_file_number'] += 1 

135 state['current_file_path'] = _get_new_filename() 

136 state['lines_written'] = 0 

137 

138 # Write the line atomically 

139 _atomic_write_line(state['current_file_path'], json_line) 

140 state['lines_written'] += 1 

141 

142 return dry_run_callback