Coverage for oc_meta / run / find / duplicated_ids.py: 82%

105 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 17:25 +0000

1import argparse 

2import csv 

3import multiprocessing as mp 

4import os 

5import shutil 

6import tempfile 

7import zipfile 

8from collections import defaultdict 

9from typing import Dict, List, Set 

10 

11from rdflib import Dataset, URIRef 

12from rich_argparse import RichHelpFormatter 

13from tqdm import tqdm 

14 

15 

16def process_zip_file(zip_path: str) -> Dict[tuple, Set[str]]: 

17 entity_info = defaultdict(set) 

18 datacite_uses_identifier_scheme = URIRef("http://purl.org/spar/datacite/usesIdentifierScheme") 

19 literal_reification_has_literal_value = URIRef("http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue") 

20 

21 try: 

22 with zipfile.ZipFile(zip_path, 'r') as zip_ref: 

23 for zip_file in zip_ref.namelist(): 

24 try: 

25 with zip_ref.open(zip_file) as rdf_file: 

26 g = Dataset(default_union=True) 

27 g.parse(data=rdf_file.read(), format="json-ld") 

28 

29 for s, _, o in g.triples((None, datacite_uses_identifier_scheme, None)): 

30 entity_id = str(s) 

31 identifier_scheme = str(o) 

32 literal_value = g.value(s, literal_reification_has_literal_value) 

33 if identifier_scheme and literal_value: 

34 key = (str(identifier_scheme), str(literal_value)) 

35 entity_info[key].add(entity_id) 

36 except Exception as e: 

37 print(f"Error processing file {zip_file} in {zip_path}: {str(e)}") 

38 except zipfile.BadZipFile: 

39 print(f"Corrupted or invalid ZIP file: {zip_path}") 

40 except Exception as e: 

41 print(f"Error opening ZIP file {zip_path}: {str(e)}") 

42 

43 return entity_info 

44 

45def save_chunk_to_temp_csv(entity_info: Dict[tuple, Set[str]], temp_file_path: str): 

46 with open(temp_file_path, mode='w', newline='', encoding='utf-8') as csv_file: 

47 csv_writer = csv.writer(csv_file) 

48 csv_writer.writerow(['identifier_scheme', 'literal_value', 'entity_ids']) 

49 for (scheme, value), ids in entity_info.items(): 

50 csv_writer.writerow([scheme, value, ';'.join(ids)]) 

51 

52def load_and_merge_temp_csv(temp_file_path: str, entity_info: Dict[tuple, Set[str]]): 

53 with open(temp_file_path, mode='r', encoding='utf-8') as csv_file: 

54 csv_reader = csv.DictReader(csv_file) 

55 for row in csv_reader: 

56 key = (row['identifier_scheme'], row['literal_value']) 

57 ids = set(row['entity_ids'].split(';')) 

58 entity_info[key].update(ids) 

59 

60def process_chunk(zip_files_chunk: List[str], temp_dir: str, chunk_index: int) -> str: 

61 entity_info = defaultdict(set) 

62 

63 with mp.Pool(processes=mp.cpu_count()) as pool: 

64 results = pool.map(process_zip_file, zip_files_chunk) 

65 

66 for result in results: 

67 for key, value in result.items(): 

68 entity_info[key].update(value) 

69 

70 temp_file_path = os.path.join(temp_dir, f'chunk_{chunk_index}.csv') 

71 save_chunk_to_temp_csv(entity_info, temp_file_path) 

72 

73 return temp_file_path 

74 

75def read_and_analyze_zip_files(folder_path: str, csv_path: str, chunk_size: int = 5000, temp_dir: str | None = None): 

76 id_folder_path = os.path.join(folder_path, 'id') 

77 

78 if not os.path.exists(id_folder_path): 

79 print(f"Error: The 'id' subfolder does not exist in path: {folder_path}") 

80 return 

81 

82 zip_files = [os.path.join(root, file) for root, _, files in os.walk(id_folder_path) 

83 for file in files if file.endswith('.zip') and file != 'se.zip'] 

84 

85 if temp_dir is None: 

86 temp_dir = tempfile.mkdtemp(prefix='oc_meta_duplicates_') 

87 else: 

88 os.makedirs(temp_dir, exist_ok=True) 

89 

90 try: 

91 chunks = [zip_files[i:i + chunk_size] for i in range(0, len(zip_files), chunk_size)] 

92 temp_files = [] 

93 

94 print(f"Processing {len(zip_files)} ZIP files in {len(chunks)} chunks of max {chunk_size} files each") 

95 print(f"Temporary files will be stored in: {temp_dir}") 

96 

97 for chunk_index, chunk in enumerate(tqdm(chunks, desc="Processing chunks")): 

98 temp_file = process_chunk(chunk, temp_dir, chunk_index) 

99 temp_files.append(temp_file) 

100 

101 print("Merging chunk results...") 

102 entity_info = defaultdict(set) 

103 for temp_file in tqdm(temp_files, desc="Merging chunks"): 

104 load_and_merge_temp_csv(temp_file, entity_info) 

105 

106 save_duplicates_to_csv(entity_info, csv_path) 

107 

108 finally: 

109 if temp_dir.startswith(tempfile.gettempdir()): 

110 shutil.rmtree(temp_dir, ignore_errors=True) 

111 

112def save_duplicates_to_csv(entity_info: Dict[tuple, Set[str]], csv_path: str): 

113 try: 

114 with open(csv_path, mode='w', newline='', encoding='utf-8') as csv_file: 

115 csv_writer = csv.writer(csv_file) 

116 csv_writer.writerow(['surviving_entity', 'merged_entities']) 

117 

118 for ids in tqdm(entity_info.values(), desc="Writing CSV"): 

119 if len(ids) > 1: 

120 ids_list = list(ids) 

121 csv_writer.writerow([ids_list[0], '; '.join(ids_list[1:])]) 

122 except Exception as e: 

123 print(f"Error saving CSV file {csv_path}: {str(e)}") 

124 

125def main(): 

126 parser = argparse.ArgumentParser( 

127 description="Find duplicate identifiers by reading RDF files inside ZIP archives in an 'id' subfolder.", 

128 formatter_class=RichHelpFormatter, 

129 ) 

130 parser.add_argument("folder_path", type=str, help="Path to the folder containing the 'id' subfolder") 

131 parser.add_argument("csv_path", type=str, help="Path to the CSV file to save duplicates") 

132 parser.add_argument("--chunk-size", type=int, default=5000, 

133 help="Number of ZIP files to process per chunk (default: 5000)") 

134 parser.add_argument("--temp-dir", type=str, default=None, 

135 help="Directory for temporary files (default: system temp directory)") 

136 args = parser.parse_args() 

137 

138 read_and_analyze_zip_files(args.folder_path, args.csv_path, args.chunk_size, args.temp_dir) 

139 

140if __name__ == "__main__": 

141 main()