Coverage for oc_meta / run / find / duplicated_ids.py: 82%

107 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import argparse 

6import csv 

7import multiprocessing as mp 

8import os 

9import shutil 

10import tempfile 

11import zipfile 

12from collections import defaultdict 

13from typing import Dict, List, Set 

14 

15from rdflib import Dataset, URIRef 

16from rich_argparse import RichHelpFormatter 

17from tqdm import tqdm 

18 

19from oc_meta.lib.file_manager import collect_files 

20 

21 

22def process_zip_file(zip_path: str) -> Dict[tuple, Set[str]]: 

23 entity_info = defaultdict(set) 

24 datacite_uses_identifier_scheme = URIRef("http://purl.org/spar/datacite/usesIdentifierScheme") 

25 literal_reification_has_literal_value = URIRef("http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue") 

26 

27 try: 

28 with zipfile.ZipFile(zip_path, 'r') as zip_ref: 

29 for zip_file in zip_ref.namelist(): 

30 try: 

31 with zip_ref.open(zip_file) as rdf_file: 

32 g = Dataset(default_union=True) 

33 g.parse(data=rdf_file.read(), format="json-ld") 

34 

35 for s, _, o in g.triples((None, datacite_uses_identifier_scheme, None)): 

36 entity_id = str(s) 

37 identifier_scheme = str(o) 

38 literal_value = g.value(s, literal_reification_has_literal_value) 

39 if identifier_scheme and literal_value: 

40 key = (str(identifier_scheme), str(literal_value)) 

41 entity_info[key].add(entity_id) 

42 except Exception as e: 

43 print(f"Error processing file {zip_file} in {zip_path}: {str(e)}") 

44 except zipfile.BadZipFile: 

45 print(f"Corrupted or invalid ZIP file: {zip_path}") 

46 except Exception as e: 

47 print(f"Error opening ZIP file {zip_path}: {str(e)}") 

48 

49 return entity_info 

50 

51def save_chunk_to_temp_csv(entity_info: Dict[tuple, Set[str]], temp_file_path: str): 

52 with open(temp_file_path, mode='w', newline='', encoding='utf-8') as csv_file: 

53 csv_writer = csv.writer(csv_file) 

54 csv_writer.writerow(['identifier_scheme', 'literal_value', 'entity_ids']) 

55 for (scheme, value), ids in entity_info.items(): 

56 csv_writer.writerow([scheme, value, ';'.join(ids)]) 

57 

58def load_and_merge_temp_csv(temp_file_path: str, entity_info: Dict[tuple, Set[str]]): 

59 with open(temp_file_path, mode='r', encoding='utf-8') as csv_file: 

60 csv_reader = csv.DictReader(csv_file) 

61 for row in csv_reader: 

62 key = (row['identifier_scheme'], row['literal_value']) 

63 ids = set(row['entity_ids'].split(';')) 

64 entity_info[key].update(ids) 

65 

66def process_chunk(zip_files_chunk: List[str], temp_dir: str, chunk_index: int) -> str: 

67 entity_info = defaultdict(set) 

68 

69 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment 

70 ctx = mp.get_context('forkserver') 

71 with ctx.Pool(processes=mp.cpu_count()) as pool: 

72 results = pool.map(process_zip_file, zip_files_chunk) 

73 

74 for result in results: 

75 for key, value in result.items(): 

76 entity_info[key].update(value) 

77 

78 temp_file_path = os.path.join(temp_dir, f'chunk_{chunk_index}.csv') 

79 save_chunk_to_temp_csv(entity_info, temp_file_path) 

80 

81 return temp_file_path 

82 

83def read_and_analyze_zip_files(folder_path: str, csv_path: str, chunk_size: int = 5000, temp_dir: str | None = None): 

84 id_folder_path = os.path.join(folder_path, 'id') 

85 

86 if not os.path.exists(id_folder_path): 

87 print(f"Error: The 'id' subfolder does not exist in path: {folder_path}") 

88 return 

89 

90 zip_files = sorted(collect_files( 

91 id_folder_path, 

92 pattern="*.zip", 

93 path_filter=lambda p: os.path.basename(p) != "se.zip", 

94 )) 

95 

96 if temp_dir is None: 

97 temp_dir = tempfile.mkdtemp(prefix='oc_meta_duplicates_') 

98 else: 

99 os.makedirs(temp_dir, exist_ok=True) 

100 

101 try: 

102 chunks = [zip_files[i:i + chunk_size] for i in range(0, len(zip_files), chunk_size)] 

103 temp_files = [] 

104 

105 print(f"Processing {len(zip_files)} ZIP files in {len(chunks)} chunks of max {chunk_size} files each") 

106 print(f"Temporary files will be stored in: {temp_dir}") 

107 

108 for chunk_index, chunk in enumerate(tqdm(chunks, desc="Processing chunks")): 

109 temp_file = process_chunk(chunk, temp_dir, chunk_index) 

110 temp_files.append(temp_file) 

111 

112 print("Merging chunk results...") 

113 entity_info = defaultdict(set) 

114 for temp_file in tqdm(temp_files, desc="Merging chunks"): 

115 load_and_merge_temp_csv(temp_file, entity_info) 

116 

117 save_duplicates_to_csv(entity_info, csv_path) 

118 

119 finally: 

120 if temp_dir.startswith(tempfile.gettempdir()): 

121 shutil.rmtree(temp_dir, ignore_errors=True) 

122 

123def save_duplicates_to_csv(entity_info: Dict[tuple, Set[str]], csv_path: str): 

124 try: 

125 with open(csv_path, mode='w', newline='', encoding='utf-8') as csv_file: 

126 csv_writer = csv.writer(csv_file) 

127 csv_writer.writerow(['surviving_entity', 'merged_entities']) 

128 

129 for ids in tqdm(entity_info.values(), desc="Writing CSV"): 

130 if len(ids) > 1: 

131 ids_list = list(ids) 

132 csv_writer.writerow([ids_list[0], '; '.join(ids_list[1:])]) 

133 except Exception as e: 

134 print(f"Error saving CSV file {csv_path}: {str(e)}") 

135 

136def main(): 

137 parser = argparse.ArgumentParser( 

138 description="Find duplicate identifiers by reading RDF files inside ZIP archives in an 'id' subfolder.", 

139 formatter_class=RichHelpFormatter, 

140 ) 

141 parser.add_argument("folder_path", type=str, help="Path to the folder containing the 'id' subfolder") 

142 parser.add_argument("csv_path", type=str, help="Path to the CSV file to save duplicates") 

143 parser.add_argument("--chunk-size", type=int, default=5000, 

144 help="Number of ZIP files to process per chunk (default: 5000)") 

145 parser.add_argument("--temp-dir", type=str, default=None, 

146 help="Directory for temporary files (default: system temp directory)") 

147 args = parser.parse_args() 

148 

149 read_and_analyze_zip_files(args.folder_path, args.csv_path, args.chunk_size, args.temp_dir) 

150 

151if __name__ == "__main__": 

152 main()