Coverage for oc_meta / run / merge / entities.py: 84%
205 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import argparse
6import concurrent.futures
7import csv
8import logging
9import multiprocessing
10import os
11import traceback
12from typing import Dict, List, Set
14from oc_ocdm.graph import GraphSet
15from rdflib import URIRef
16from rich_argparse import RichHelpFormatter
17from sparqlite import SPARQLClient
18from tqdm import tqdm
20from oc_meta.core.editor import MetaEditor
22logging.basicConfig(
23 level=logging.INFO,
24 format="%(asctime)s - %(levelname)s - %(message)s",
25 datefmt="%Y-%m-%d %H:%M:%S",
26)
27logger = logging.getLogger(__name__)
30class EntityMerger:
31 def __init__(
32 self,
33 meta_config: str,
34 resp_agent: str,
35 entity_types: List[str],
36 stop_file_path: str,
37 workers: int,
38 ):
39 self.meta_config = meta_config
40 self.resp_agent = resp_agent
41 self.entity_types = entity_types
42 self.stop_file_path = stop_file_path
43 self.workers = workers
44 self.batch_size = 10
46 @staticmethod
47 def get_entity_type(entity_url: str) -> str | None:
48 parts = entity_url.split("/")
49 if "oc" in parts and "meta" in parts:
50 try:
51 return parts[parts.index("meta") + 1]
52 except IndexError:
53 return None
54 return None
56 @staticmethod
57 def read_csv(csv_file: str) -> List[Dict]:
58 data = []
59 with open(csv_file, mode="r", newline="", encoding="utf-8") as file:
60 csv_reader = csv.DictReader(file)
61 for row in csv_reader:
62 if "Done" not in row:
63 row["Done"] = "False"
64 data.append(row)
65 return data
67 @staticmethod
68 def write_csv(csv_file: str, data: List[Dict]):
69 fieldnames = data[0].keys()
70 with open(csv_file, mode="w", newline="", encoding="utf-8") as file:
71 writer = csv.DictWriter(file, fieldnames=fieldnames)
72 writer.writeheader()
73 for row in data:
74 writer.writerow(row)
76 @staticmethod
77 def count_csv_rows(csv_file: str) -> int:
78 with open(csv_file, "r", encoding="utf-8") as f:
79 return sum(1 for _ in f) - 1
81 def fetch_related_entities_batch(
82 self,
83 meta_editor: MetaEditor,
84 merged_entities: List[str],
85 surviving_entities: List[str],
86 batch_size: int = 10,
87 ) -> Set[URIRef]:
88 """
89 Fetch all related entities in batches and populate the relationship cache.
91 Args:
92 meta_editor: MetaEditor instance
93 merged_entities: List of entities to be merged
94 surviving_entities: List of surviving entities
95 batch_size: Maximum number of entities to process in a single SPARQL query
97 Returns:
98 Set of all related entities
99 """
100 all_related_entities = set()
102 with SPARQLClient(meta_editor.endpoint, max_retries=5, backoff_factor=0.3, timeout=3600) as client:
103 for i in range(0, len(merged_entities), batch_size):
104 batch_merged = merged_entities[i : i + batch_size]
105 merged_clauses = []
106 for entity in batch_merged:
107 merged_clauses.extend(
108 [f"{{?entity ?p <{entity}>}}", f"{{<{entity}> ?p ?entity}}"]
109 )
111 if not merged_clauses:
112 continue
114 query = f"""
115 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
116 PREFIX datacite: <http://purl.org/spar/datacite/>
117 PREFIX pro: <http://purl.org/spar/pro/>
118 SELECT DISTINCT ?entity WHERE {{
119 {{
120 {' UNION '.join(merged_clauses)}
121 }}
122 FILTER (?p != rdf:type)
123 FILTER (?p != datacite:usesIdentifierScheme)
124 FILTER (?p != pro:withRole)
125 }}
126 """
128 try:
129 results = client.query(query)
130 for result in results["results"]["bindings"]:
131 if result["entity"]["type"] == "uri":
132 related_uri = URIRef(result["entity"]["value"])
133 all_related_entities.add(related_uri)
135 for entity in batch_merged:
136 entity_uri = URIRef(entity)
137 if entity_uri not in meta_editor.relationship_cache:
138 meta_editor.relationship_cache[entity_uri] = set()
139 meta_editor.relationship_cache[entity_uri].add(related_uri)
141 except Exception as e:
142 print(
143 f"Error fetching related entities for merged batch {i}-{i+batch_size}: {e}"
144 )
146 for i in range(0, len(surviving_entities), batch_size):
147 batch_surviving = surviving_entities[i : i + batch_size]
148 surviving_clauses = []
149 for entity in batch_surviving:
150 surviving_clauses.append(f"{{<{entity}> ?p ?entity}}")
152 if not surviving_clauses:
153 continue
155 query = f"""
156 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
157 PREFIX datacite: <http://purl.org/spar/datacite/>
158 PREFIX pro: <http://purl.org/spar/pro/>
159 SELECT DISTINCT ?entity WHERE {{
160 {{
161 {' UNION '.join(surviving_clauses)}
162 }}
163 FILTER (?p != rdf:type)
164 FILTER (?p != datacite:usesIdentifierScheme)
165 FILTER (?p != pro:withRole)
166 }}
167 """
169 try:
170 results = client.query(query)
171 for result in results["results"]["bindings"]:
172 if result["entity"]["type"] == "uri":
173 related_uri = URIRef(result["entity"]["value"])
174 all_related_entities.add(related_uri)
176 for entity in batch_surviving:
177 entity_uri = URIRef(entity)
178 if entity_uri not in meta_editor.relationship_cache:
179 meta_editor.relationship_cache[entity_uri] = set()
180 meta_editor.relationship_cache[entity_uri].add(related_uri)
182 except Exception as e:
183 print(
184 f"Error fetching related entities for surviving batch {i}-{i+batch_size}: {e}"
185 )
187 return all_related_entities
189 def should_stop_processing(self) -> bool:
190 return os.path.exists(self.stop_file_path)
192 def process_file(self, csv_file: str) -> str:
193 """Process a single CSV file with cross-row batch processing"""
194 logger.info(f"Starting to process file: {csv_file}")
195 data = self.read_csv(csv_file)
196 logger.info(f"Read {len(data)} rows from {csv_file}")
197 meta_editor = MetaEditor(self.meta_config, self.resp_agent, save_queries=True)
198 modified = False
200 if self.should_stop_processing():
201 logger.info("Stop file detected, halting processing")
202 return csv_file
204 g_set = GraphSet(
205 meta_editor.base_iri, custom_counter_handler=meta_editor.counter_handler
206 )
208 batch_merged_entities = []
209 batch_surviving_entities = []
210 rows_to_process = []
212 for row in data:
213 if row.get("Done") == "True":
214 continue
216 entity_type = self.get_entity_type(row["surviving_entity"])
217 if entity_type in self.entity_types:
218 surviving_entity = row["surviving_entity"]
219 merged_entities = row["merged_entities"].split("; ")
220 batch_surviving_entities.append(surviving_entity)
221 batch_merged_entities.extend(merged_entities)
222 rows_to_process.append((surviving_entity, merged_entities))
224 if not rows_to_process:
225 logger.info(f"No rows to process in {csv_file}")
226 return csv_file
228 logger.info(f"Found {len(rows_to_process)} rows to process in {csv_file}")
229 logger.info(
230 f"Fetching related entities for {len(batch_merged_entities)} merged entities and {len(batch_surviving_entities)} surviving entities"
231 )
233 all_related_entities = self.fetch_related_entities_batch(
234 meta_editor,
235 batch_merged_entities,
236 batch_surviving_entities,
237 self.batch_size,
238 )
239 logger.info(f"Found {len(all_related_entities)} related entities")
241 entities_to_import = all_related_entities.copy()
242 entities_to_import.update(URIRef(e) for e in batch_surviving_entities)
243 entities_to_import.update(URIRef(e) for e in batch_merged_entities)
245 entities_to_import = {
246 e for e in entities_to_import if not meta_editor.entity_cache.is_cached(e)
247 }
249 if entities_to_import:
250 logger.info(f"Importing {len(entities_to_import)} new entities")
251 try:
252 meta_editor.reader.import_entities_from_triplestore(
253 g_set=g_set,
254 ts_url=meta_editor.endpoint,
255 entities=list(entities_to_import),
256 resp_agent=meta_editor.resp_agent,
257 enable_validation=False,
258 batch_size=self.batch_size,
259 )
261 for entity in entities_to_import:
262 meta_editor.entity_cache.add(entity)
263 logger.info("Entity import completed successfully")
265 except ValueError as e:
266 logger.error(f"Error importing entities: {e}")
267 modified = True
269 processed_count = 0
270 for surviving_entity, merged_entities in rows_to_process:
271 logger.info(f"Processing row - surviving entity: {surviving_entity}")
272 for merged_entity in merged_entities:
273 logger.info(
274 f" Attempting to merge {merged_entity} into {surviving_entity}"
275 )
276 try:
277 meta_editor.merge(g_set, surviving_entity, merged_entity)
278 modified = True
279 processed_count += 1
280 logger.info(f" Successfully merged {merged_entity}")
281 except ValueError as e:
282 logger.error(
283 f"Error merging {merged_entity} into {surviving_entity}: {e}"
284 )
285 continue
286 logger.info(
287 f"Completed processing row with surviving entity: {surviving_entity}"
288 )
290 logger.info(f"Successfully processed {processed_count} merges")
292 if modified:
293 marked_done = 0
294 for row in data:
295 if (
296 row.get("Done") != "True"
297 and self.get_entity_type(row["surviving_entity"])
298 in self.entity_types
299 ):
300 row["Done"] = "True"
301 marked_done += 1
303 logger.info(f"Marked {marked_done} rows as done")
304 meta_editor.save(g_set)
305 self.write_csv(csv_file, data)
306 logger.info(f"Saved changes to {csv_file}")
308 return csv_file
310 def process_folder(self, csv_folder: str):
311 if os.path.exists(self.stop_file_path):
312 os.remove(self.stop_file_path)
314 csv_files = [
315 os.path.join(csv_folder, file)
316 for file in os.listdir(csv_folder)
317 if file.endswith(".csv")
318 ]
320 if self.workers > 4:
321 csv_files = [
322 file for file in csv_files if self.count_csv_rows(file) <= 10000
323 ]
325 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment
326 with concurrent.futures.ProcessPoolExecutor(
327 max_workers=self.workers,
328 mp_context=multiprocessing.get_context('forkserver')
329 ) as executor:
330 futures = {}
331 for csv_file in csv_files:
332 if self.should_stop_processing():
333 break
334 futures[executor.submit(self.process_file, csv_file)] = csv_file
336 for future in tqdm(
337 concurrent.futures.as_completed(futures),
338 total=len(futures),
339 desc="Overall Progress",
340 ):
341 csv_file = futures[future]
342 try:
343 future.result()
344 except Exception as e:
345 error_trace = traceback.format_exc()
346 print(
347 f"""
348 Error processing file {csv_file}:
349 Type: {type(e).__name__}
350 Details: {str(e)}
351 Full Traceback:
352 {error_trace}
353 Suggestion: This is an unexpected error. Please check the traceback for more details.
354 """
355 )
358def main():
359 parser = argparse.ArgumentParser(
360 description="Merge entities from CSV files in a folder.",
361 formatter_class=RichHelpFormatter,
362 )
363 parser.add_argument(
364 "csv_folder", type=str, help="Path to the folder containing CSV files"
365 )
366 parser.add_argument("meta_config", type=str, help="Meta configuration string")
367 parser.add_argument("resp_agent", type=str, help="Responsible agent string")
368 parser.add_argument(
369 "--entity_types",
370 nargs="+",
371 default=["ra", "br", "id"],
372 help="Types of entities to merge (ra, br, id)",
373 )
374 parser.add_argument(
375 "--stop_file", type=str, default="stop.out", help="Path to the stop file"
376 )
377 parser.add_argument(
378 "--workers", type=int, default=4, help="Number of parallel workers"
379 )
381 args = parser.parse_args()
383 merger = EntityMerger(
384 meta_config=args.meta_config,
385 resp_agent=args.resp_agent,
386 entity_types=args.entity_types,
387 stop_file_path=args.stop_file,
388 workers=args.workers,
389 )
391 merger.process_folder(args.csv_folder)
394if __name__ == "__main__":
395 main()