Coverage for oc_meta / run / find / duplicated_ids.py: 82%
107 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import argparse
6import csv
7import multiprocessing as mp
8import os
9import shutil
10import tempfile
11import zipfile
12from collections import defaultdict
13from typing import Dict, List, Set
15from rdflib import Dataset, URIRef
16from rich_argparse import RichHelpFormatter
17from tqdm import tqdm
19from oc_meta.lib.file_manager import collect_files
22def process_zip_file(zip_path: str) -> Dict[tuple, Set[str]]:
23 entity_info = defaultdict(set)
24 datacite_uses_identifier_scheme = URIRef("http://purl.org/spar/datacite/usesIdentifierScheme")
25 literal_reification_has_literal_value = URIRef("http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue")
27 try:
28 with zipfile.ZipFile(zip_path, 'r') as zip_ref:
29 for zip_file in zip_ref.namelist():
30 try:
31 with zip_ref.open(zip_file) as rdf_file:
32 g = Dataset(default_union=True)
33 g.parse(data=rdf_file.read(), format="json-ld")
35 for s, _, o in g.triples((None, datacite_uses_identifier_scheme, None)):
36 entity_id = str(s)
37 identifier_scheme = str(o)
38 literal_value = g.value(s, literal_reification_has_literal_value)
39 if identifier_scheme and literal_value:
40 key = (str(identifier_scheme), str(literal_value))
41 entity_info[key].add(entity_id)
42 except Exception as e:
43 print(f"Error processing file {zip_file} in {zip_path}: {str(e)}")
44 except zipfile.BadZipFile:
45 print(f"Corrupted or invalid ZIP file: {zip_path}")
46 except Exception as e:
47 print(f"Error opening ZIP file {zip_path}: {str(e)}")
49 return entity_info
51def save_chunk_to_temp_csv(entity_info: Dict[tuple, Set[str]], temp_file_path: str):
52 with open(temp_file_path, mode='w', newline='', encoding='utf-8') as csv_file:
53 csv_writer = csv.writer(csv_file)
54 csv_writer.writerow(['identifier_scheme', 'literal_value', 'entity_ids'])
55 for (scheme, value), ids in entity_info.items():
56 csv_writer.writerow([scheme, value, ';'.join(ids)])
58def load_and_merge_temp_csv(temp_file_path: str, entity_info: Dict[tuple, Set[str]]):
59 with open(temp_file_path, mode='r', encoding='utf-8') as csv_file:
60 csv_reader = csv.DictReader(csv_file)
61 for row in csv_reader:
62 key = (row['identifier_scheme'], row['literal_value'])
63 ids = set(row['entity_ids'].split(';'))
64 entity_info[key].update(ids)
66def process_chunk(zip_files_chunk: List[str], temp_dir: str, chunk_index: int) -> str:
67 entity_info = defaultdict(set)
69 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment
70 ctx = mp.get_context('forkserver')
71 with ctx.Pool(processes=mp.cpu_count()) as pool:
72 results = pool.map(process_zip_file, zip_files_chunk)
74 for result in results:
75 for key, value in result.items():
76 entity_info[key].update(value)
78 temp_file_path = os.path.join(temp_dir, f'chunk_{chunk_index}.csv')
79 save_chunk_to_temp_csv(entity_info, temp_file_path)
81 return temp_file_path
83def read_and_analyze_zip_files(folder_path: str, csv_path: str, chunk_size: int = 5000, temp_dir: str | None = None):
84 id_folder_path = os.path.join(folder_path, 'id')
86 if not os.path.exists(id_folder_path):
87 print(f"Error: The 'id' subfolder does not exist in path: {folder_path}")
88 return
90 zip_files = sorted(collect_files(
91 id_folder_path,
92 pattern="*.zip",
93 path_filter=lambda p: os.path.basename(p) != "se.zip",
94 ))
96 if temp_dir is None:
97 temp_dir = tempfile.mkdtemp(prefix='oc_meta_duplicates_')
98 else:
99 os.makedirs(temp_dir, exist_ok=True)
101 try:
102 chunks = [zip_files[i:i + chunk_size] for i in range(0, len(zip_files), chunk_size)]
103 temp_files = []
105 print(f"Processing {len(zip_files)} ZIP files in {len(chunks)} chunks of max {chunk_size} files each")
106 print(f"Temporary files will be stored in: {temp_dir}")
108 for chunk_index, chunk in enumerate(tqdm(chunks, desc="Processing chunks")):
109 temp_file = process_chunk(chunk, temp_dir, chunk_index)
110 temp_files.append(temp_file)
112 print("Merging chunk results...")
113 entity_info = defaultdict(set)
114 for temp_file in tqdm(temp_files, desc="Merging chunks"):
115 load_and_merge_temp_csv(temp_file, entity_info)
117 save_duplicates_to_csv(entity_info, csv_path)
119 finally:
120 if temp_dir.startswith(tempfile.gettempdir()):
121 shutil.rmtree(temp_dir, ignore_errors=True)
123def save_duplicates_to_csv(entity_info: Dict[tuple, Set[str]], csv_path: str):
124 try:
125 with open(csv_path, mode='w', newline='', encoding='utf-8') as csv_file:
126 csv_writer = csv.writer(csv_file)
127 csv_writer.writerow(['surviving_entity', 'merged_entities'])
129 for ids in tqdm(entity_info.values(), desc="Writing CSV"):
130 if len(ids) > 1:
131 ids_list = list(ids)
132 csv_writer.writerow([ids_list[0], '; '.join(ids_list[1:])])
133 except Exception as e:
134 print(f"Error saving CSV file {csv_path}: {str(e)}")
136def main():
137 parser = argparse.ArgumentParser(
138 description="Find duplicate identifiers by reading RDF files inside ZIP archives in an 'id' subfolder.",
139 formatter_class=RichHelpFormatter,
140 )
141 parser.add_argument("folder_path", type=str, help="Path to the folder containing the 'id' subfolder")
142 parser.add_argument("csv_path", type=str, help="Path to the CSV file to save duplicates")
143 parser.add_argument("--chunk-size", type=int, default=5000,
144 help="Number of ZIP files to process per chunk (default: 5000)")
145 parser.add_argument("--temp-dir", type=str, default=None,
146 help="Directory for temporary files (default: system temp directory)")
147 args = parser.parse_args()
149 read_and_analyze_zip_files(args.folder_path, args.csv_path, args.chunk_size, args.temp_dir)
151if __name__ == "__main__":
152 main()