Coverage for oc_meta / run / merge / group_entities.py: 79%
169 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
1import argparse
2import csv
3import os
4import re
6import pandas as pd
7import yaml
8from rich_argparse import RichHelpFormatter
9from sparqlite import SPARQLClient
10from tqdm import tqdm
13class UnionFind:
14 def __init__(self):
15 self.parent = {}
17 def find(self, item):
18 if item not in self.parent:
19 self.parent[item] = item
20 return item
22 path = []
23 current = item
24 visited = set()
26 while current != self.parent[current]:
27 if current in visited:
28 raise ValueError(f"Cycle detected in union-find structure at {current}")
29 visited.add(current)
30 path.append(current)
31 current = self.parent[current]
33 for node in path:
34 self.parent[node] = current
36 return current
38 def union(self, item1, item2):
39 root1 = self.find(item1)
40 root2 = self.find(item2)
41 if root1 != root2:
42 self.parent[root2] = root1
45def load_csv(file_path):
46 df = pd.read_csv(file_path)
47 required_columns = ["surviving_entity", "merged_entities"]
48 missing_columns = [col for col in required_columns if col not in df.columns]
49 if missing_columns:
50 raise ValueError(f"CSV file missing required columns: {missing_columns}")
51 return df
54def query_sparql_batch(endpoint, uris, batch_size=10):
55 """
56 Query SPARQL for related entities in batches.
58 Args:
59 endpoint: SPARQL endpoint URL
60 uris: List of URIs to query
61 batch_size: Number of URIs to process in a single query
63 Returns:
64 Set of all related entities
65 """
66 related_entities = set()
68 with SPARQLClient(endpoint, max_retries=5, backoff_factor=5, timeout=3600) as client:
69 for i in range(0, len(uris), batch_size):
70 batch_uris = uris[i:i + batch_size]
72 subject_clauses = []
73 object_clauses = []
75 for uri in batch_uris:
76 subject_clauses.append(f"{{?entity ?p <{uri}>}}")
77 object_clauses.append(f"{{<{uri}> ?p ?entity}}")
79 query = f"""
80 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
81 PREFIX datacite: <http://purl.org/spar/datacite/>
82 PREFIX pro: <http://purl.org/spar/pro/>
83 SELECT DISTINCT ?entity WHERE {{
84 {{
85 {' UNION '.join(subject_clauses + object_clauses)}
86 }}
87 ?entity ?p2 ?o2 .
88 FILTER (?p != rdf:type)
89 FILTER (?p != datacite:usesIdentifierScheme)
90 FILTER (?p != pro:withRole)
91 }}
92 """
94 results = client.query(query)
96 for result in results['results']['bindings']:
97 if result['entity']['type'] == 'uri':
98 related_entities.add(result['entity']['value'])
100 return related_entities
103def get_all_related_entities(endpoint, uris, batch_size=10):
104 """
105 Get all related entities for a list of URIs using batch queries.
107 Args:
108 endpoint: SPARQL endpoint URL
109 uris: List of URIs to query
110 batch_size: Number of URIs to process in a single query
112 Returns:
113 Set of all related entities including input URIs
114 """
115 related_entities = set(uris)
116 batch_results = query_sparql_batch(endpoint, uris, batch_size)
117 related_entities.update(batch_results)
118 return related_entities
121def get_file_path(uri, dir_split, items_per_file, zip_output=True):
122 """
123 Calculate RDF file path for an entity URI (same logic as MetaEditor.find_file).
125 Args:
126 uri: Entity URI (e.g., https://w3id.org/oc/meta/br/060100)
127 dir_split: Directory split number
128 items_per_file: Items per file
129 zip_output: Whether files are zipped (default: True)
131 Returns:
132 File path (e.g., br/060/10000/1000.zip) or None if invalid URI
133 """
134 entity_regex = r"^.+/([a-z][a-z])/(0[1-9]+0)([1-9][0-9]*)$"
135 entity_match = re.match(entity_regex, uri)
137 if not entity_match:
138 return None
140 short_name = entity_match.group(1)
141 supplier_prefix = entity_match.group(2)
142 cur_number = int(entity_match.group(3))
144 cur_file_split = 0
145 while True:
146 if cur_number > cur_file_split:
147 cur_file_split += items_per_file
148 else:
149 break
151 cur_split = 0
152 while True:
153 if cur_number > cur_split:
154 cur_split += dir_split
155 else:
156 break
158 extension = ".zip" if zip_output else ".json"
159 return f"{short_name}/{supplier_prefix}/{cur_split}/{cur_file_split}{extension}"
162def group_entities(df, endpoint, dir_split=10000, items_per_file=1000, zip_output=True):
163 """
164 Group entities based on RDF connections and file range conflicts.
166 Args:
167 df: DataFrame with columns 'surviving_entity' and 'merged_entities'
168 endpoint: SPARQL endpoint URL
169 dir_split: Directory split number (default: 1000)
170 items_per_file: Items per file (default: 1000)
171 zip_output: Whether files are zipped (default: True)
173 Returns:
174 Dict of group_id -> DataFrame with grouped rows
175 """
176 uf = UnionFind()
177 rows_list = []
179 for _, row in tqdm(df.iterrows(), total=df.shape[0], desc="Processing rows"):
180 surviving_entity = row['surviving_entity']
181 merged_entities = row['merged_entities'].split("; ")
183 all_entities = [surviving_entity] + merged_entities
185 # Union for RDF connections
186 all_related_entities = get_all_related_entities(endpoint, all_entities)
187 for entity in all_related_entities:
188 uf.union(surviving_entity, entity)
190 # Union for file range conflicts (only for IDs being merged, not related entities)
191 for entity in all_entities:
192 entity_file = get_file_path(entity, dir_split, items_per_file, zip_output)
193 if entity_file:
194 # Use file path as virtual entity in union-find
195 uf.union(surviving_entity, f"FILE:{entity_file}")
197 rows_list.append(row)
199 grouped_data = {}
200 for row in rows_list:
201 surviving_entity = row['surviving_entity']
202 group_id = uf.find(surviving_entity)
204 if group_id not in grouped_data:
205 grouped_data[group_id] = []
207 grouped_data[group_id].append(row)
209 for group_id in grouped_data:
210 grouped_data[group_id] = pd.DataFrame(grouped_data[group_id])
212 return grouped_data
215def optimize_groups(grouped_data, target_size=50):
216 """
217 Ottimizza i gruppi combinando quelli singoli mantenendo separate le entità interconnesse.
219 Args:
220 grouped_data (dict): Dizionario di DataFrame raggruppati
221 target_size (int): Dimensione minima target per ogni gruppo
223 Returns:
224 dict: Dizionario ottimizzato dei DataFrame raggruppati
225 """
226 # Separa i gruppi in singoli e multipli
227 single_groups = {k: v for k, v in grouped_data.items() if len(v) == 1}
228 multi_groups = {k: v for k, v in grouped_data.items() if len(v) > 1}
230 # Se non ci sono gruppi singoli, restituisci i gruppi originali
231 if not single_groups:
232 return grouped_data
234 # Crea nuovi gruppi combinando quelli singoli
235 combined_groups = {}
236 single_items = list(single_groups.items())
238 # Combina i gruppi singoli in gruppi della dimensione target
239 current_group = []
240 current_key = None
242 for key, df in single_items:
243 if len(current_group) == 0:
244 current_key = key
246 current_group.append(df)
248 if len(current_group) >= target_size:
249 combined_groups[current_key] = pd.concat(current_group, ignore_index=True)
250 current_group = []
252 # Gestisci eventuali gruppi rimanenti
253 if current_group:
254 if len(current_group) == 1 and multi_groups:
255 # Se è rimasto un gruppo singolo e ci sono gruppi multipli,
256 # aggiungiamo il gruppo singolo al gruppo multiplo più piccolo
257 smallest_multi = min(multi_groups.items(), key=lambda x: len(x[1]))
258 multi_groups[smallest_multi[0]] = pd.concat(
259 [smallest_multi[1]] + current_group,
260 ignore_index=True
261 )
262 else:
263 # Altrimenti lo manteniamo come gruppo separato
264 combined_groups[current_key] = pd.concat(current_group, ignore_index=True)
266 # Unisci i gruppi multipli originali con i nuovi gruppi combinati
267 return {**multi_groups, **combined_groups}
270def save_grouped_entities(grouped_data, output_dir):
271 if not os.path.exists(output_dir):
272 os.makedirs(output_dir)
274 for key, df in grouped_data.items():
275 output_file = os.path.join(output_dir, f"{key.split('/')[-1]}.csv")
276 print(f"Saving group with {len(df)} rows to {output_file}")
278 try:
279 df.to_csv(output_file, index=False)
280 except AttributeError as e:
281 print(f"Error saving file {output_file}: {str(e)}")
282 try:
283 df.to_csv(output_file, index=False, encoding='utf-8', quoting=csv.QUOTE_NONNUMERIC)
284 print(f"Successfully saved using alternative method: {output_file}")
285 except Exception as alt_e:
286 print(f"Alternative method also failed: {str(alt_e)}")
287 except Exception as e:
288 print(f"Unexpected error saving file {output_file}: {str(e)}")
291def main():
292 parser = argparse.ArgumentParser(
293 description='Process CSV and group entities based on SPARQL queries.',
294 formatter_class=RichHelpFormatter,
295 )
296 parser.add_argument('csv_file_path', type=str, help='Path to the input CSV file')
297 parser.add_argument('output_dir', type=str, help='Directory to save the output files')
298 parser.add_argument('meta_config', type=str, help='Path to meta configuration YAML file')
299 parser.add_argument('--min_group_size', type=int, default=50,
300 help='Minimum target size for groups (default: 50)')
302 args = parser.parse_args()
304 with open(args.meta_config, 'r', encoding='utf-8') as f:
305 config = yaml.safe_load(f)
307 sparql_endpoint = config['triplestore_url']
308 dir_split = config['dir_split_number']
309 items_per_file = config['items_per_file']
310 zip_output = config['zip_output_rdf']
312 df = load_csv(args.csv_file_path)
313 print(f"Loaded CSV file with {len(df)} rows")
314 print(f"Configuration: dir_split={dir_split}, items_per_file={items_per_file}, zip_output={zip_output}")
316 grouped_entities = group_entities(df, sparql_endpoint, dir_split, items_per_file, zip_output)
317 print(f"Initially grouped entities into {len(grouped_entities)} groups")
319 optimized_groups = optimize_groups(grouped_entities, args.min_group_size)
320 print(f"Optimized into {len(optimized_groups)} groups")
322 save_grouped_entities(optimized_groups, args.output_dir)
323 print("Finished saving grouped entities")
326if __name__ == "__main__":
327 main()