Coverage for oc_meta / run / merge / entities.py: 84%
205 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
1import argparse
2import concurrent.futures
3import csv
4import logging
5import os
6import traceback
7from typing import Dict, List, Set
9from oc_ocdm.graph import GraphSet
10from rdflib import URIRef
11from rich_argparse import RichHelpFormatter
12from sparqlite import SPARQLClient
13from tqdm import tqdm
15from oc_meta.core.editor import MetaEditor
17logging.basicConfig(
18 level=logging.INFO,
19 format="%(asctime)s - %(levelname)s - %(message)s",
20 datefmt="%Y-%m-%d %H:%M:%S",
21)
22logger = logging.getLogger(__name__)
25class EntityMerger:
26 def __init__(
27 self,
28 meta_config: str,
29 resp_agent: str,
30 entity_types: List[str],
31 stop_file_path: str,
32 workers: int,
33 ):
34 self.meta_config = meta_config
35 self.resp_agent = resp_agent
36 self.entity_types = entity_types
37 self.stop_file_path = stop_file_path
38 self.workers = workers
39 self.batch_size = 10
41 @staticmethod
42 def get_entity_type(entity_url: str) -> str | None:
43 parts = entity_url.split("/")
44 if "oc" in parts and "meta" in parts:
45 try:
46 return parts[parts.index("meta") + 1]
47 except IndexError:
48 return None
49 return None
51 @staticmethod
52 def read_csv(csv_file: str) -> List[Dict]:
53 data = []
54 with open(csv_file, mode="r", newline="", encoding="utf-8") as file:
55 csv_reader = csv.DictReader(file)
56 for row in csv_reader:
57 if "Done" not in row:
58 row["Done"] = "False"
59 data.append(row)
60 return data
62 @staticmethod
63 def write_csv(csv_file: str, data: List[Dict]):
64 fieldnames = data[0].keys()
65 with open(csv_file, mode="w", newline="", encoding="utf-8") as file:
66 writer = csv.DictWriter(file, fieldnames=fieldnames)
67 writer.writeheader()
68 for row in data:
69 writer.writerow(row)
71 @staticmethod
72 def count_csv_rows(csv_file: str) -> int:
73 with open(csv_file, "r", encoding="utf-8") as f:
74 return sum(1 for _ in f) - 1
76 def fetch_related_entities_batch(
77 self,
78 meta_editor: MetaEditor,
79 merged_entities: List[str],
80 surviving_entities: List[str],
81 batch_size: int = 10,
82 ) -> Set[URIRef]:
83 """
84 Fetch all related entities in batches and populate the relationship cache.
86 Args:
87 meta_editor: MetaEditor instance
88 merged_entities: List of entities to be merged
89 surviving_entities: List of surviving entities
90 batch_size: Maximum number of entities to process in a single SPARQL query
92 Returns:
93 Set of all related entities
94 """
95 all_related_entities = set()
97 with SPARQLClient(meta_editor.endpoint, max_retries=5, backoff_factor=0.3, timeout=3600) as client:
98 for i in range(0, len(merged_entities), batch_size):
99 batch_merged = merged_entities[i : i + batch_size]
100 merged_clauses = []
101 for entity in batch_merged:
102 merged_clauses.extend(
103 [f"{{?entity ?p <{entity}>}}", f"{{<{entity}> ?p ?entity}}"]
104 )
106 if not merged_clauses:
107 continue
109 query = f"""
110 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
111 PREFIX datacite: <http://purl.org/spar/datacite/>
112 PREFIX pro: <http://purl.org/spar/pro/>
113 SELECT DISTINCT ?entity WHERE {{
114 {{
115 {' UNION '.join(merged_clauses)}
116 }}
117 FILTER (?p != rdf:type)
118 FILTER (?p != datacite:usesIdentifierScheme)
119 FILTER (?p != pro:withRole)
120 }}
121 """
123 try:
124 results = client.query(query)
125 for result in results["results"]["bindings"]:
126 if result["entity"]["type"] == "uri":
127 related_uri = URIRef(result["entity"]["value"])
128 all_related_entities.add(related_uri)
130 for entity in batch_merged:
131 entity_uri = URIRef(entity)
132 if entity_uri not in meta_editor.relationship_cache:
133 meta_editor.relationship_cache[entity_uri] = set()
134 meta_editor.relationship_cache[entity_uri].add(related_uri)
136 except Exception as e:
137 print(
138 f"Error fetching related entities for merged batch {i}-{i+batch_size}: {e}"
139 )
141 for i in range(0, len(surviving_entities), batch_size):
142 batch_surviving = surviving_entities[i : i + batch_size]
143 surviving_clauses = []
144 for entity in batch_surviving:
145 surviving_clauses.append(f"{{<{entity}> ?p ?entity}}")
147 if not surviving_clauses:
148 continue
150 query = f"""
151 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
152 PREFIX datacite: <http://purl.org/spar/datacite/>
153 PREFIX pro: <http://purl.org/spar/pro/>
154 SELECT DISTINCT ?entity WHERE {{
155 {{
156 {' UNION '.join(surviving_clauses)}
157 }}
158 FILTER (?p != rdf:type)
159 FILTER (?p != datacite:usesIdentifierScheme)
160 FILTER (?p != pro:withRole)
161 }}
162 """
164 try:
165 results = client.query(query)
166 for result in results["results"]["bindings"]:
167 if result["entity"]["type"] == "uri":
168 related_uri = URIRef(result["entity"]["value"])
169 all_related_entities.add(related_uri)
171 for entity in batch_surviving:
172 entity_uri = URIRef(entity)
173 if entity_uri not in meta_editor.relationship_cache:
174 meta_editor.relationship_cache[entity_uri] = set()
175 meta_editor.relationship_cache[entity_uri].add(related_uri)
177 except Exception as e:
178 print(
179 f"Error fetching related entities for surviving batch {i}-{i+batch_size}: {e}"
180 )
182 return all_related_entities
184 def should_stop_processing(self) -> bool:
185 return os.path.exists(self.stop_file_path)
187 def process_file(self, csv_file: str) -> str:
188 """Process a single CSV file with cross-row batch processing"""
189 logger.info(f"Starting to process file: {csv_file}")
190 data = self.read_csv(csv_file)
191 logger.info(f"Read {len(data)} rows from {csv_file}")
192 meta_editor = MetaEditor(self.meta_config, self.resp_agent, save_queries=True)
193 modified = False
195 if self.should_stop_processing():
196 logger.info("Stop file detected, halting processing")
197 return csv_file
199 g_set = GraphSet(
200 meta_editor.base_iri, custom_counter_handler=meta_editor.counter_handler
201 )
203 batch_merged_entities = []
204 batch_surviving_entities = []
205 rows_to_process = []
207 for row in data:
208 if row.get("Done") == "True":
209 continue
211 entity_type = self.get_entity_type(row["surviving_entity"])
212 if entity_type in self.entity_types:
213 surviving_entity = row["surviving_entity"]
214 merged_entities = row["merged_entities"].split("; ")
215 batch_surviving_entities.append(surviving_entity)
216 batch_merged_entities.extend(merged_entities)
217 rows_to_process.append((surviving_entity, merged_entities))
219 if not rows_to_process:
220 logger.info(f"No rows to process in {csv_file}")
221 return csv_file
223 logger.info(f"Found {len(rows_to_process)} rows to process in {csv_file}")
224 logger.info(
225 f"Fetching related entities for {len(batch_merged_entities)} merged entities and {len(batch_surviving_entities)} surviving entities"
226 )
228 all_related_entities = self.fetch_related_entities_batch(
229 meta_editor,
230 batch_merged_entities,
231 batch_surviving_entities,
232 self.batch_size,
233 )
234 logger.info(f"Found {len(all_related_entities)} related entities")
236 entities_to_import = all_related_entities.copy()
237 entities_to_import.update(URIRef(e) for e in batch_surviving_entities)
238 entities_to_import.update(URIRef(e) for e in batch_merged_entities)
240 entities_to_import = {
241 e for e in entities_to_import if not meta_editor.entity_cache.is_cached(e)
242 }
244 if entities_to_import:
245 logger.info(f"Importing {len(entities_to_import)} new entities")
246 try:
247 meta_editor.reader.import_entities_from_triplestore(
248 g_set=g_set,
249 ts_url=meta_editor.endpoint,
250 entities=list(entities_to_import),
251 resp_agent=meta_editor.resp_agent,
252 enable_validation=False,
253 batch_size=self.batch_size,
254 )
256 for entity in entities_to_import:
257 meta_editor.entity_cache.add(entity)
258 logger.info("Entity import completed successfully")
260 except ValueError as e:
261 logger.error(f"Error importing entities: {e}")
262 modified = True
264 processed_count = 0
265 for surviving_entity, merged_entities in rows_to_process:
266 logger.info(f"Processing row - surviving entity: {surviving_entity}")
267 surviving_uri = URIRef(surviving_entity)
268 for merged_entity in merged_entities:
269 logger.info(
270 f" Attempting to merge {merged_entity} into {surviving_entity}"
271 )
272 try:
273 meta_editor.merge(g_set, surviving_uri, URIRef(merged_entity))
274 modified = True
275 processed_count += 1
276 logger.info(f" Successfully merged {merged_entity}")
277 except ValueError as e:
278 logger.error(
279 f"Error merging {merged_entity} into {surviving_entity}: {e}"
280 )
281 continue
282 logger.info(
283 f"Completed processing row with surviving entity: {surviving_entity}"
284 )
286 logger.info(f"Successfully processed {processed_count} merges")
288 if modified:
289 marked_done = 0
290 for row in data:
291 if (
292 row.get("Done") != "True"
293 and self.get_entity_type(row["surviving_entity"])
294 in self.entity_types
295 ):
296 row["Done"] = "True"
297 marked_done += 1
299 logger.info(f"Marked {marked_done} rows as done")
300 meta_editor.save(g_set)
301 self.write_csv(csv_file, data)
302 logger.info(f"Saved changes to {csv_file}")
304 return csv_file
306 def process_folder(self, csv_folder: str):
307 if os.path.exists(self.stop_file_path):
308 os.remove(self.stop_file_path)
310 csv_files = [
311 os.path.join(csv_folder, file)
312 for file in os.listdir(csv_folder)
313 if file.endswith(".csv")
314 ]
316 if self.workers > 4:
317 csv_files = [
318 file for file in csv_files if self.count_csv_rows(file) <= 10000
319 ]
321 with concurrent.futures.ProcessPoolExecutor(
322 max_workers=self.workers
323 ) as executor:
324 futures = {}
325 for csv_file in csv_files:
326 if self.should_stop_processing():
327 break
328 futures[executor.submit(self.process_file, csv_file)] = csv_file
330 for future in tqdm(
331 concurrent.futures.as_completed(futures),
332 total=len(futures),
333 desc="Overall Progress",
334 ):
335 csv_file = futures[future]
336 try:
337 future.result()
338 except Exception as e:
339 error_trace = traceback.format_exc()
340 print(
341 f"""
342 Error processing file {csv_file}:
343 Type: {type(e).__name__}
344 Details: {str(e)}
345 Full Traceback:
346 {error_trace}
347 Suggestion: This is an unexpected error. Please check the traceback for more details.
348 """
349 )
352def main():
353 parser = argparse.ArgumentParser(
354 description="Merge entities from CSV files in a folder.",
355 formatter_class=RichHelpFormatter,
356 )
357 parser.add_argument(
358 "csv_folder", type=str, help="Path to the folder containing CSV files"
359 )
360 parser.add_argument("meta_config", type=str, help="Meta configuration string")
361 parser.add_argument("resp_agent", type=str, help="Responsible agent string")
362 parser.add_argument(
363 "--entity_types",
364 nargs="+",
365 default=["ra", "br", "id"],
366 help="Types of entities to merge (ra, br, id)",
367 )
368 parser.add_argument(
369 "--stop_file", type=str, default="stop.out", help="Path to the stop file"
370 )
371 parser.add_argument(
372 "--workers", type=int, default=4, help="Number of parallel workers"
373 )
375 args = parser.parse_args()
377 merger = EntityMerger(
378 meta_config=args.meta_config,
379 resp_agent=args.resp_agent,
380 entity_types=args.entity_types,
381 stop_file_path=args.stop_file,
382 workers=args.workers,
383 )
385 merger.process_folder(args.csv_folder)
388if __name__ == "__main__":
389 main()