Coverage for oc_meta / run / merge / group_entities.py: 79%
169 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import argparse
6import csv
7import os
8import re
10import pandas as pd
11import yaml
12from rich_argparse import RichHelpFormatter
13from sparqlite import SPARQLClient
14from tqdm import tqdm
17class UnionFind:
18 def __init__(self):
19 self.parent = {}
21 def find(self, item):
22 if item not in self.parent:
23 self.parent[item] = item
24 return item
26 path = []
27 current = item
28 visited = set()
30 while current != self.parent[current]:
31 if current in visited:
32 raise ValueError(f"Cycle detected in union-find structure at {current}")
33 visited.add(current)
34 path.append(current)
35 current = self.parent[current]
37 for node in path:
38 self.parent[node] = current
40 return current
42 def union(self, item1, item2):
43 root1 = self.find(item1)
44 root2 = self.find(item2)
45 if root1 != root2:
46 self.parent[root2] = root1
49def load_csv(file_path):
50 df = pd.read_csv(file_path)
51 required_columns = ["surviving_entity", "merged_entities"]
52 missing_columns = [col for col in required_columns if col not in df.columns]
53 if missing_columns:
54 raise ValueError(f"CSV file missing required columns: {missing_columns}")
55 return df
58def query_sparql_batch(endpoint, uris, batch_size=10):
59 """
60 Query SPARQL for related entities in batches.
62 Args:
63 endpoint: SPARQL endpoint URL
64 uris: List of URIs to query
65 batch_size: Number of URIs to process in a single query
67 Returns:
68 Set of all related entities
69 """
70 related_entities = set()
72 with SPARQLClient(endpoint, max_retries=5, backoff_factor=5, timeout=3600) as client:
73 for i in range(0, len(uris), batch_size):
74 batch_uris = uris[i:i + batch_size]
76 subject_clauses = []
77 object_clauses = []
79 for uri in batch_uris:
80 subject_clauses.append(f"{{?entity ?p <{uri}>}}")
81 object_clauses.append(f"{{<{uri}> ?p ?entity}}")
83 query = f"""
84 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
85 PREFIX datacite: <http://purl.org/spar/datacite/>
86 PREFIX pro: <http://purl.org/spar/pro/>
87 SELECT DISTINCT ?entity WHERE {{
88 {{
89 {' UNION '.join(subject_clauses + object_clauses)}
90 }}
91 ?entity ?p2 ?o2 .
92 FILTER (?p != rdf:type)
93 FILTER (?p != datacite:usesIdentifierScheme)
94 FILTER (?p != pro:withRole)
95 }}
96 """
98 results = client.query(query)
100 for result in results['results']['bindings']:
101 if result['entity']['type'] == 'uri':
102 related_entities.add(result['entity']['value'])
104 return related_entities
107def get_all_related_entities(endpoint, uris, batch_size=10):
108 """
109 Get all related entities for a list of URIs using batch queries.
111 Args:
112 endpoint: SPARQL endpoint URL
113 uris: List of URIs to query
114 batch_size: Number of URIs to process in a single query
116 Returns:
117 Set of all related entities including input URIs
118 """
119 related_entities = set(uris)
120 batch_results = query_sparql_batch(endpoint, uris, batch_size)
121 related_entities.update(batch_results)
122 return related_entities
125def get_file_path(uri, dir_split, items_per_file, zip_output=True):
126 """
127 Calculate RDF file path for an entity URI (same logic as MetaEditor.find_file).
129 Args:
130 uri: Entity URI (e.g., https://w3id.org/oc/meta/br/060100)
131 dir_split: Directory split number
132 items_per_file: Items per file
133 zip_output: Whether files are zipped (default: True)
135 Returns:
136 File path (e.g., br/060/10000/1000.zip) or None if invalid URI
137 """
138 entity_regex = r"^.+/([a-z][a-z])/(0[1-9]+0)([1-9][0-9]*)$"
139 entity_match = re.match(entity_regex, uri)
141 if not entity_match:
142 return None
144 short_name = entity_match.group(1)
145 supplier_prefix = entity_match.group(2)
146 cur_number = int(entity_match.group(3))
148 cur_file_split = 0
149 while True:
150 if cur_number > cur_file_split:
151 cur_file_split += items_per_file
152 else:
153 break
155 cur_split = 0
156 while True:
157 if cur_number > cur_split:
158 cur_split += dir_split
159 else:
160 break
162 extension = ".zip" if zip_output else ".json"
163 return f"{short_name}/{supplier_prefix}/{cur_split}/{cur_file_split}{extension}"
166def group_entities(df, endpoint, dir_split=10000, items_per_file=1000, zip_output=True):
167 """
168 Group entities based on RDF connections and file range conflicts.
170 Args:
171 df: DataFrame with columns 'surviving_entity' and 'merged_entities'
172 endpoint: SPARQL endpoint URL
173 dir_split: Directory split number (default: 1000)
174 items_per_file: Items per file (default: 1000)
175 zip_output: Whether files are zipped (default: True)
177 Returns:
178 Dict of group_id -> DataFrame with grouped rows
179 """
180 uf = UnionFind()
181 rows_list = []
183 for _, row in tqdm(df.iterrows(), total=df.shape[0], desc="Processing rows"):
184 surviving_entity = row['surviving_entity']
185 merged_entities = row['merged_entities'].split("; ")
187 all_entities = [surviving_entity] + merged_entities
189 # Union for RDF connections
190 all_related_entities = get_all_related_entities(endpoint, all_entities)
191 for entity in all_related_entities:
192 uf.union(surviving_entity, entity)
194 # Union for file range conflicts (only for IDs being merged, not related entities)
195 for entity in all_entities:
196 entity_file = get_file_path(entity, dir_split, items_per_file, zip_output)
197 if entity_file:
198 # Use file path as virtual entity in union-find
199 uf.union(surviving_entity, f"FILE:{entity_file}")
201 rows_list.append(row)
203 grouped_data = {}
204 for row in rows_list:
205 surviving_entity = row['surviving_entity']
206 group_id = uf.find(surviving_entity)
208 if group_id not in grouped_data:
209 grouped_data[group_id] = []
211 grouped_data[group_id].append(row)
213 for group_id in grouped_data:
214 grouped_data[group_id] = pd.DataFrame(grouped_data[group_id])
216 return grouped_data
219def optimize_groups(grouped_data, target_size=50):
220 """
221 Ottimizza i gruppi combinando quelli singoli mantenendo separate le entità interconnesse.
223 Args:
224 grouped_data (dict): Dizionario di DataFrame raggruppati
225 target_size (int): Dimensione minima target per ogni gruppo
227 Returns:
228 dict: Dizionario ottimizzato dei DataFrame raggruppati
229 """
230 # Separa i gruppi in singoli e multipli
231 single_groups = {k: v for k, v in grouped_data.items() if len(v) == 1}
232 multi_groups = {k: v for k, v in grouped_data.items() if len(v) > 1}
234 # Se non ci sono gruppi singoli, restituisci i gruppi originali
235 if not single_groups:
236 return grouped_data
238 # Crea nuovi gruppi combinando quelli singoli
239 combined_groups = {}
240 single_items = list(single_groups.items())
242 # Combina i gruppi singoli in gruppi della dimensione target
243 current_group = []
244 current_key = None
246 for key, df in single_items:
247 if len(current_group) == 0:
248 current_key = key
250 current_group.append(df)
252 if len(current_group) >= target_size:
253 combined_groups[current_key] = pd.concat(current_group, ignore_index=True)
254 current_group = []
256 # Gestisci eventuali gruppi rimanenti
257 if current_group:
258 if len(current_group) == 1 and multi_groups:
259 # Se è rimasto un gruppo singolo e ci sono gruppi multipli,
260 # aggiungiamo il gruppo singolo al gruppo multiplo più piccolo
261 smallest_multi = min(multi_groups.items(), key=lambda x: len(x[1]))
262 multi_groups[smallest_multi[0]] = pd.concat(
263 [smallest_multi[1]] + current_group,
264 ignore_index=True
265 )
266 else:
267 # Altrimenti lo manteniamo come gruppo separato
268 combined_groups[current_key] = pd.concat(current_group, ignore_index=True)
270 # Unisci i gruppi multipli originali con i nuovi gruppi combinati
271 return {**multi_groups, **combined_groups}
274def save_grouped_entities(grouped_data, output_dir):
275 if not os.path.exists(output_dir):
276 os.makedirs(output_dir)
278 for key, df in grouped_data.items():
279 output_file = os.path.join(output_dir, f"{key.split('/')[-1]}.csv")
280 print(f"Saving group with {len(df)} rows to {output_file}")
282 try:
283 df.to_csv(output_file, index=False)
284 except AttributeError as e:
285 print(f"Error saving file {output_file}: {str(e)}")
286 try:
287 df.to_csv(output_file, index=False, encoding='utf-8', quoting=csv.QUOTE_NONNUMERIC)
288 print(f"Successfully saved using alternative method: {output_file}")
289 except Exception as alt_e:
290 print(f"Alternative method also failed: {str(alt_e)}")
291 except Exception as e:
292 print(f"Unexpected error saving file {output_file}: {str(e)}")
295def main():
296 parser = argparse.ArgumentParser(
297 description='Process CSV and group entities based on SPARQL queries.',
298 formatter_class=RichHelpFormatter,
299 )
300 parser.add_argument('csv_file_path', type=str, help='Path to the input CSV file')
301 parser.add_argument('output_dir', type=str, help='Directory to save the output files')
302 parser.add_argument('meta_config', type=str, help='Path to meta configuration YAML file')
303 parser.add_argument('--min_group_size', type=int, default=50,
304 help='Minimum target size for groups (default: 50)')
306 args = parser.parse_args()
308 with open(args.meta_config, 'r', encoding='utf-8') as f:
309 config = yaml.safe_load(f)
311 sparql_endpoint = config['triplestore_url']
312 dir_split = config['dir_split_number']
313 items_per_file = config['items_per_file']
314 zip_output = config['zip_output_rdf']
316 df = load_csv(args.csv_file_path)
317 print(f"Loaded CSV file with {len(df)} rows")
318 print(f"Configuration: dir_split={dir_split}, items_per_file={items_per_file}, zip_output={zip_output}")
320 grouped_entities = group_entities(df, sparql_endpoint, dir_split, items_per_file, zip_output)
321 print(f"Initially grouped entities into {len(grouped_entities)} groups")
323 optimized_groups = optimize_groups(grouped_entities, args.min_group_size)
324 print(f"Optimized into {len(optimized_groups)} groups")
326 save_grouped_entities(optimized_groups, args.output_dir)
327 print("Finished saving grouped entities")
330if __name__ == "__main__":
331 main()