Coverage for oc_meta / run / find / duplicated_ids.py: 82%
105 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
1import argparse
2import csv
3import multiprocessing as mp
4import os
5import shutil
6import tempfile
7import zipfile
8from collections import defaultdict
9from typing import Dict, List, Set
11from rdflib import Dataset, URIRef
12from rich_argparse import RichHelpFormatter
13from tqdm import tqdm
16def process_zip_file(zip_path: str) -> Dict[tuple, Set[str]]:
17 entity_info = defaultdict(set)
18 datacite_uses_identifier_scheme = URIRef("http://purl.org/spar/datacite/usesIdentifierScheme")
19 literal_reification_has_literal_value = URIRef("http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue")
21 try:
22 with zipfile.ZipFile(zip_path, 'r') as zip_ref:
23 for zip_file in zip_ref.namelist():
24 try:
25 with zip_ref.open(zip_file) as rdf_file:
26 g = Dataset(default_union=True)
27 g.parse(data=rdf_file.read(), format="json-ld")
29 for s, _, o in g.triples((None, datacite_uses_identifier_scheme, None)):
30 entity_id = str(s)
31 identifier_scheme = str(o)
32 literal_value = g.value(s, literal_reification_has_literal_value)
33 if identifier_scheme and literal_value:
34 key = (str(identifier_scheme), str(literal_value))
35 entity_info[key].add(entity_id)
36 except Exception as e:
37 print(f"Error processing file {zip_file} in {zip_path}: {str(e)}")
38 except zipfile.BadZipFile:
39 print(f"Corrupted or invalid ZIP file: {zip_path}")
40 except Exception as e:
41 print(f"Error opening ZIP file {zip_path}: {str(e)}")
43 return entity_info
45def save_chunk_to_temp_csv(entity_info: Dict[tuple, Set[str]], temp_file_path: str):
46 with open(temp_file_path, mode='w', newline='', encoding='utf-8') as csv_file:
47 csv_writer = csv.writer(csv_file)
48 csv_writer.writerow(['identifier_scheme', 'literal_value', 'entity_ids'])
49 for (scheme, value), ids in entity_info.items():
50 csv_writer.writerow([scheme, value, ';'.join(ids)])
52def load_and_merge_temp_csv(temp_file_path: str, entity_info: Dict[tuple, Set[str]]):
53 with open(temp_file_path, mode='r', encoding='utf-8') as csv_file:
54 csv_reader = csv.DictReader(csv_file)
55 for row in csv_reader:
56 key = (row['identifier_scheme'], row['literal_value'])
57 ids = set(row['entity_ids'].split(';'))
58 entity_info[key].update(ids)
60def process_chunk(zip_files_chunk: List[str], temp_dir: str, chunk_index: int) -> str:
61 entity_info = defaultdict(set)
63 with mp.Pool(processes=mp.cpu_count()) as pool:
64 results = pool.map(process_zip_file, zip_files_chunk)
66 for result in results:
67 for key, value in result.items():
68 entity_info[key].update(value)
70 temp_file_path = os.path.join(temp_dir, f'chunk_{chunk_index}.csv')
71 save_chunk_to_temp_csv(entity_info, temp_file_path)
73 return temp_file_path
75def read_and_analyze_zip_files(folder_path: str, csv_path: str, chunk_size: int = 5000, temp_dir: str | None = None):
76 id_folder_path = os.path.join(folder_path, 'id')
78 if not os.path.exists(id_folder_path):
79 print(f"Error: The 'id' subfolder does not exist in path: {folder_path}")
80 return
82 zip_files = [os.path.join(root, file) for root, _, files in os.walk(id_folder_path)
83 for file in files if file.endswith('.zip') and file != 'se.zip']
85 if temp_dir is None:
86 temp_dir = tempfile.mkdtemp(prefix='oc_meta_duplicates_')
87 else:
88 os.makedirs(temp_dir, exist_ok=True)
90 try:
91 chunks = [zip_files[i:i + chunk_size] for i in range(0, len(zip_files), chunk_size)]
92 temp_files = []
94 print(f"Processing {len(zip_files)} ZIP files in {len(chunks)} chunks of max {chunk_size} files each")
95 print(f"Temporary files will be stored in: {temp_dir}")
97 for chunk_index, chunk in enumerate(tqdm(chunks, desc="Processing chunks")):
98 temp_file = process_chunk(chunk, temp_dir, chunk_index)
99 temp_files.append(temp_file)
101 print("Merging chunk results...")
102 entity_info = defaultdict(set)
103 for temp_file in tqdm(temp_files, desc="Merging chunks"):
104 load_and_merge_temp_csv(temp_file, entity_info)
106 save_duplicates_to_csv(entity_info, csv_path)
108 finally:
109 if temp_dir.startswith(tempfile.gettempdir()):
110 shutil.rmtree(temp_dir, ignore_errors=True)
112def save_duplicates_to_csv(entity_info: Dict[tuple, Set[str]], csv_path: str):
113 try:
114 with open(csv_path, mode='w', newline='', encoding='utf-8') as csv_file:
115 csv_writer = csv.writer(csv_file)
116 csv_writer.writerow(['surviving_entity', 'merged_entities'])
118 for ids in tqdm(entity_info.values(), desc="Writing CSV"):
119 if len(ids) > 1:
120 ids_list = list(ids)
121 csv_writer.writerow([ids_list[0], '; '.join(ids_list[1:])])
122 except Exception as e:
123 print(f"Error saving CSV file {csv_path}: {str(e)}")
125def main():
126 parser = argparse.ArgumentParser(
127 description="Find duplicate identifiers by reading RDF files inside ZIP archives in an 'id' subfolder.",
128 formatter_class=RichHelpFormatter,
129 )
130 parser.add_argument("folder_path", type=str, help="Path to the folder containing the 'id' subfolder")
131 parser.add_argument("csv_path", type=str, help="Path to the CSV file to save duplicates")
132 parser.add_argument("--chunk-size", type=int, default=5000,
133 help="Number of ZIP files to process per chunk (default: 5000)")
134 parser.add_argument("--temp-dir", type=str, default=None,
135 help="Directory for temporary files (default: system temp directory)")
136 args = parser.parse_args()
138 read_and_analyze_zip_files(args.folder_path, args.csv_path, args.chunk_size, args.temp_dir)
140if __name__ == "__main__":
141 main()