Coverage for test/entity_merger_test.py: 98%
680 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
1import csv
2import json
3import os
4import re
5import unittest
6from shutil import rmtree
8import redis
9import yaml
10from oc_meta.run.merge.entities import EntityMerger
11from oc_meta.run.meta_editor import MetaEditor
12from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
13from oc_ocdm.graph import GraphSet
14from oc_ocdm.prov.prov_set import ProvSet
15from oc_ocdm.storer import Storer
16from rdflib import URIRef
17from sparqlite import SPARQLClient
19BASE = os.path.join("test", "merger")
20OUTPUT = os.path.join(BASE, "output/")
21META_CONFIG = os.path.join("test", "merger", "meta_config.yaml")
22SERVER = "http://127.0.0.1:8805/sparql"
23PROV_SERVER = "http://127.0.0.1:8806/sparql"
25REDIS_HOST = "localhost"
26REDIS_PORT = 6381
27REDIS_DB = 5
28REDIS_CACHE_DB = 2
31def reset_triplestore():
32 """Reset the test triplestore graphs"""
33 with SPARQLClient(SERVER, timeout=60) as client:
34 for graph in [
35 "https://w3id.org/oc/meta/br/",
36 "https://w3id.org/oc/meta/ra/",
37 "https://w3id.org/oc/meta/re/",
38 "https://w3id.org/oc/meta/id/",
39 "https://w3id.org/oc/meta/ar/",
40 ]:
41 client.update(f"CLEAR GRAPH <{graph}>")
43 with SPARQLClient(PROV_SERVER, timeout=60) as prov_client:
44 for graph in [
45 "https://w3id.org/oc/meta/br/",
46 "https://w3id.org/oc/meta/ra/",
47 "https://w3id.org/oc/meta/re/",
48 "https://w3id.org/oc/meta/id/",
49 "https://w3id.org/oc/meta/ar/",
50 ]:
51 prov_client.update(f"CLEAR GRAPH <{graph}>")
54def reset_redis_counters():
55 redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
56 redis_cache_client = redis.Redis(
57 host=REDIS_HOST, port=REDIS_PORT, db=REDIS_CACHE_DB
58 )
59 redis_client.flushdb()
60 redis_cache_client.flushdb()
63def get_counter_handler():
64 return RedisCounterHandler(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
67class TestEntityMerger(unittest.TestCase):
68 @classmethod
69 def setUpClass(cls):
70 cls.counter_handler = get_counter_handler()
72 def setUp(self):
73 # Reset environment
74 if os.path.exists(OUTPUT):
75 rmtree(OUTPUT)
76 os.makedirs(os.path.join(BASE, "csv"), exist_ok=True)
77 reset_triplestore()
78 reset_redis_counters()
80 # Create temporary directory for cache files
81 self.temp_dir = os.path.join("test", "temp_entity_merger_test")
82 if os.path.exists(self.temp_dir):
83 rmtree(self.temp_dir)
84 os.makedirs(self.temp_dir)
86 # Setup cache files
87 self.cache_file = os.path.join(self.temp_dir, "ts_upload_cache.json")
88 self.failed_file = os.path.join(self.temp_dir, "failed_queries.txt")
89 self.stop_file = os.path.join(self.temp_dir, ".stop_upload")
91 # Create separate directories for data and provenance update queries
92 self.data_update_dir = os.path.join(self.temp_dir, "to_be_uploaded_data")
93 self.prov_update_dir = os.path.join(self.temp_dir, "to_be_uploaded_prov")
94 os.makedirs(self.data_update_dir, exist_ok=True)
95 os.makedirs(self.prov_update_dir, exist_ok=True)
97 # Update config with Redis and cache settings
98 with open(META_CONFIG, encoding="utf-8") as file:
99 settings = yaml.full_load(file)
100 settings.update(
101 {
102 "redis_host": REDIS_HOST,
103 "redis_port": REDIS_PORT,
104 "redis_db": REDIS_DB,
105 "redis_cache_db": REDIS_CACHE_DB,
106 "ts_upload_cache": self.cache_file,
107 "ts_failed_queries": self.failed_file,
108 "ts_stop_file": self.stop_file,
109 "provenance_triplestore_url": PROV_SERVER,
110 "data_update_dir": self.data_update_dir,
111 "prov_update_dir": self.prov_update_dir
112 }
113 )
114 with open(META_CONFIG, "w", encoding="utf-8") as file:
115 yaml.dump(settings, file)
117 # Initialize test data
118 self.setup_test_data()
120 # Create merger instance
121 self.merger = EntityMerger(
122 meta_config=META_CONFIG,
123 resp_agent="https://orcid.org/0000-0002-8420-0696",
124 entity_types=["ra", "br", "id"],
125 stop_file_path="stop.out",
126 workers=4,
127 )
129 def tearDown(self):
130 if os.path.exists(os.path.join(BASE, "csv")):
131 rmtree(os.path.join(BASE, "csv"))
132 if os.path.exists(OUTPUT):
133 rmtree(OUTPUT)
134 if os.path.exists("stop.out"):
135 os.remove("stop.out")
136 if os.path.exists(self.temp_dir):
137 rmtree(self.temp_dir)
138 reset_triplestore()
139 reset_redis_counters()
141 def setup_test_data(self):
142 """Create initial test data in triplestore"""
143 # Create a GraphSet for test data
144 g_set = GraphSet(
145 "https://w3id.org/oc/meta/",
146 supplier_prefix="060",
147 custom_counter_handler=self.counter_handler,
148 )
150 # Create first author entity with specific ID
151 author1 = g_set.add_ra(
152 resp_agent="https://orcid.org/0000-0002-8420-0696",
153 res=URIRef("https://w3id.org/oc/meta/ra/0601"),
154 )
155 author1.has_name("John Smith")
157 # Create ORCID identifier for author1 with specific ID
158 orcid_id = g_set.add_id(
159 resp_agent="https://orcid.org/0000-0002-8420-0696",
160 res=URIRef("https://w3id.org/oc/meta/id/0601"),
161 )
162 orcid_id.create_orcid("0000-0001-1234-5678")
163 author1.has_identifier(orcid_id)
165 # Create second author entity with specific ID
166 author2 = g_set.add_ra(
167 resp_agent="https://orcid.org/0000-0002-8420-0696",
168 res=URIRef("https://w3id.org/oc/meta/ra/0602"),
169 )
170 author2.has_name("J. Smith")
172 # Create VIAF identifier for author2 with specific ID
173 viaf_id = g_set.add_id(
174 resp_agent="https://orcid.org/0000-0002-8420-0696",
175 res=URIRef("https://w3id.org/oc/meta/id/0602"),
176 )
177 viaf_id.create_viaf("12345678")
178 author2.has_identifier(viaf_id)
180 # Create a publication with specific ID
181 pub = g_set.add_br(
182 resp_agent="https://orcid.org/0000-0002-8420-0696",
183 res=URIRef("https://w3id.org/oc/meta/br/0601"),
184 )
185 pub.has_title("Test Publication")
186 pub.has_pub_date("2024-01-01")
188 # Create role for first author with specific ID
189 role1 = g_set.add_ar(
190 resp_agent="https://orcid.org/0000-0002-8420-0696",
191 res=URIRef("https://w3id.org/oc/meta/ar/0601"),
192 )
193 role1.create_author()
194 role1.is_held_by(author1)
195 pub.has_contributor(role1)
197 # Create role for second author with specific ID
198 role2 = g_set.add_ar(
199 resp_agent="https://orcid.org/0000-0002-8420-0696",
200 res=URIRef("https://w3id.org/oc/meta/ar/0602"),
201 )
202 role2.create_author()
203 role2.is_held_by(author2)
204 pub.has_contributor(role2)
206 prov = ProvSet(
207 g_set,
208 "https://w3id.org/oc/meta/",
209 wanted_label=False,
210 custom_counter_handler=self.counter_handler,
211 )
212 prov.generate_provenance()
214 rdf_output = os.path.join(OUTPUT, "rdf") + os.sep
216 res_storer = Storer(
217 abstract_set=g_set,
218 dir_split=10000,
219 n_file_item=1000,
220 output_format="json-ld",
221 zip_output=False,
222 )
223 prov_storer = Storer(
224 abstract_set=prov,
225 dir_split=10000,
226 n_file_item=1000,
227 output_format="json-ld",
228 zip_output=False,
229 )
231 res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
232 prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
233 res_storer.upload_all(
234 triplestore_url=SERVER,
235 base_dir=rdf_output,
236 batch_size=10,
237 save_queries=False,
238 )
240 # Create CSV file for merger
241 merge_data = [
242 {
243 "surviving_entity": str(author1.res),
244 "merged_entities": str(author2.res),
245 "Done": "False",
246 }
247 ]
248 self.write_csv("merge_test.csv", merge_data)
250 def write_csv(self, filename: str, data: list):
251 filepath = os.path.join(BASE, "csv", filename)
252 with open(filepath, "w", newline="", encoding="utf-8") as f:
253 writer = csv.DictWriter(f, fieldnames=data[0].keys())
254 writer.writeheader()
255 writer.writerows(data)
257 def check_sparql_query_content(self, query: str, expected_triples: dict):
258 """
259 Check if a SPARQL query contains expected triples in DELETE and INSERT sections.
261 Args:
262 query (str): The SPARQL update query string
263 expected_triples (dict): Dictionary with 'delete' and 'insert' keys containing
264 lists of triple patterns to check for
265 """
266 # Split query into DELETE and INSERT sections
267 delete_match = re.search(
268 r"DELETE DATA \{ GRAPH.*?\{(.*?)\}.*?\}", query, re.DOTALL
269 )
270 insert_match = re.search(
271 r"INSERT DATA \{ GRAPH.*?\{(.*?)\}.*?\}", query, re.DOTALL
272 )
274 delete_section = delete_match.group(1) if delete_match else ""
275 insert_section = insert_match.group(1) if insert_match else ""
276 # Check DELETE patterns
277 if "delete" in expected_triples:
278 for triple in expected_triples["delete"]:
279 self.assertIn(
280 triple,
281 delete_section.strip(),
282 f"Expected triple not found in DELETE section: {triple}",
283 )
285 # Check INSERT patterns
286 if "insert" in expected_triples:
287 for triple in expected_triples["insert"]:
288 self.assertIn(
289 triple,
290 insert_section.strip(),
291 f"Expected triple not found in INSERT section: {triple}",
292 )
294 def test_get_entity_type(self):
295 """Test the static method get_entity_type"""
296 test_cases = [
297 ("https://w3id.org/oc/meta/ra/06107", "ra"),
298 ("https://w3id.org/oc/meta/br/06101", "br"),
299 ("https://w3id.org/oc/meta/id/06105", "id"),
300 ("https://example.com/invalid/url", None),
301 ("", None),
302 ]
304 for url, expected in test_cases:
305 with self.subTest(url=url):
306 self.assertEqual(EntityMerger.get_entity_type(url), expected)
308 def test_read_write_csv(self):
309 """Test CSV read and write operations"""
310 test_data = [
311 {
312 "surviving_entity": "https://w3id.org/oc/meta/ra/06107",
313 "merged_entities": "https://w3id.org/oc/meta/ra/06205",
314 "Done": "False",
315 }
316 ]
318 # Write test data
319 test_file = os.path.join(BASE, "csv", "test.csv")
320 EntityMerger.write_csv(test_file, test_data)
322 # Read back and verify
323 read_data = EntityMerger.read_csv(test_file)
324 self.assertEqual(test_data, read_data)
326 def test_count_csv_rows(self):
327 """Test CSV row counting"""
328 # Test with empty file
329 empty_file = os.path.join(BASE, "csv", "empty.csv")
330 with open(empty_file, "w", newline="", encoding="utf-8") as f:
331 writer = csv.DictWriter(f, fieldnames=["col1", "col2"])
332 writer.writeheader()
333 self.assertEqual(EntityMerger.count_csv_rows(empty_file), 0)
335 # Test with multiple rows
336 test_file = os.path.join(BASE, "input", "0.csv")
337 row_count = EntityMerger.count_csv_rows(test_file)
338 self.assertEqual(row_count, 1)
340 def test_process_file_with_stop_file(self):
341 """Test that processing stops when stop file is present"""
342 # Create stop file
343 with open(self.merger.stop_file_path, "w") as f:
344 f.write("")
346 # Process test file
347 test_file = os.path.join(BASE, "csv", "merge_test.csv")
348 result = self.merger.process_file(test_file)
350 # Verify the file wasn't processed (Done should still be False)
351 data = EntityMerger.read_csv(test_file)
352 self.assertEqual(data[0]["Done"], "False")
354 def test_process_folder(self):
355 """Test processing multiple files in a folder"""
356 csv_folder = os.path.join(BASE, "csv")
357 self.merger.process_folder(csv_folder)
359 # Verify all files were processed
360 for filename in ["merge_test.csv"]:
361 with self.subTest(file=filename):
362 data = EntityMerger.read_csv(os.path.join(csv_folder, filename))
363 self.assertEqual(data[0]["Done"], "True")
365 def test_process_folder_with_worker_limit(self):
366 """Test processing folder with worker count > 4"""
367 self.merger.workers = 5
368 csv_folder = os.path.join(BASE, "csv")
370 # Create a large file
371 large_data = [
372 {
373 "surviving_entity": f"https://w3id.org/oc/meta/ra/0610{i}",
374 "merged_entities": f"https://w3id.org/oc/meta/ra/0620{i}",
375 "Done": "False",
376 }
377 for i in range(15000)
378 ] # Create more than 10000 rows
379 self.write_csv("large.csv", large_data)
381 self.merger.process_folder(csv_folder)
383 # Verify only small files were processed
384 large_file_data = EntityMerger.read_csv(os.path.join(csv_folder, "large.csv"))
385 self.assertEqual(
386 large_file_data[0]["Done"], "False"
387 ) # Large file should be skipped
389 small_file_data = EntityMerger.read_csv(
390 os.path.join(csv_folder, "merge_test.csv")
391 )
392 self.assertEqual(
393 small_file_data[0]["Done"], "True"
394 ) # Small file should be processed
396 def test_merge_authors_with_real_data(self):
397 """Test merging two author entities with real data"""
398 # Process the merge
399 csv_folder = os.path.join(BASE, "csv")
400 self.merger.process_folder(csv_folder)
402 # Verify files structure
403 rdf_path = os.path.join(OUTPUT)
404 self.assertTrue(
405 os.path.exists(os.path.join(rdf_path, "rdf", "ra", "060", "10000", "1000"))
406 )
407 self.assertTrue(
408 os.path.exists(
409 os.path.join(rdf_path, "rdf", "ra", "060", "10000", "1000", "prov")
410 )
411 )
413 # Load and verify data files
414 ra_file = os.path.join(rdf_path, "rdf", "ra", "060", "10000", "1000.json")
415 with open(ra_file) as f:
416 data = json.load(f)
417 for graph in data:
418 for entity in graph.get("@graph", []):
419 if entity["@id"] == "https://w3id.org/oc/meta/ra/0601":
420 # Check has both identifiers
421 identifiers = {
422 id_obj["@id"]
423 for id_obj in entity[
424 "http://purl.org/spar/datacite/hasIdentifier"
425 ]
426 }
427 self.assertEqual(len(identifiers), 2)
428 self.assertIn("https://w3id.org/oc/meta/id/0601", identifiers)
429 self.assertIn("https://w3id.org/oc/meta/id/0602", identifiers)
431 # Check name
432 self.assertEqual(
433 entity["http://xmlns.com/foaf/0.1/name"][0]["@value"],
434 "J. Smith",
435 )
437 # Check merged entity no longer exists
438 if entity["@id"] == "https://w3id.org/oc/meta/ra/0602":
439 self.fail("Merged entity should not exist")
441 # Check role reassignment
442 ar_file = os.path.join(rdf_path, "rdf", "ar", "060", "10000", "1000.json")
443 with open(ar_file) as f:
444 data = json.load(f)
445 for graph in data:
446 for entity in graph.get("@graph", []):
447 agent = entity["http://purl.org/spar/pro/isHeldBy"][0]["@id"]
448 self.assertEqual(
449 agent,
450 "https://w3id.org/oc/meta/ra/0601",
451 "All roles should point to surviving entity",
452 )
454 # Check provenance
455 prov_file = os.path.join(
456 rdf_path, "rdf", "ra", "060", "10000", "1000", "prov", "se.json"
457 )
458 found_merge_prov = False
459 expected_triples = {
460 "delete": [
461 '<https://w3id.org/oc/meta/ra/0601> <http://xmlns.com/foaf/0.1/name> "John Smith"'
462 ],
463 "insert": [
464 "<https://w3id.org/oc/meta/ra/0601> <http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0602>",
465 '<https://w3id.org/oc/meta/ra/0601> <http://xmlns.com/foaf/0.1/name> "J. Smith"',
466 ],
467 }
468 with open(prov_file) as f:
469 data = json.load(f)
470 for graph in data:
471 for entity in graph.get("@graph", []):
472 if (
473 "/prov/se/" in entity["@id"]
474 and "merge"
475 in entity.get("http://purl.org/dc/terms/description", [{}])[0]
476 .get("@value", "")
477 .lower()
478 ):
479 found_merge_prov = True
481 # Check provenance fields
482 self.assertIn(
483 "http://www.w3.org/ns/prov#generatedAtTime", entity
484 )
485 self.assertIn(
486 "http://www.w3.org/ns/prov#wasAttributedTo", entity
487 )
488 self.assertIn(
489 "https://w3id.org/oc/ontology/hasUpdateQuery", entity
490 )
492 # Get actual query and normalize both expected and actual
493 actual_query = entity[
494 "https://w3id.org/oc/ontology/hasUpdateQuery"
495 ][0]["@value"]
496 self.check_sparql_query_content(actual_query, expected_triples)
498 self.assertTrue(found_merge_prov, "No merge provenance found")
500 def test_merge_with_invalid_entity_type(self):
501 """Test merging with an invalid entity type"""
502 # Create test data with invalid entity type
503 invalid_data = [
504 {
505 "surviving_entity": "https://w3id.org/oc/meta/invalid/0601",
506 "merged_entities": "https://w3id.org/oc/meta/invalid/0602",
507 "Done": "False",
508 }
509 ]
510 test_file = os.path.join(BASE, "csv", "invalid_type.csv")
511 self.write_csv("invalid_type.csv", invalid_data)
512 self.merger.process_file(test_file)
513 data = EntityMerger.read_csv(test_file)
514 self.assertEqual(data[0]["Done"], "False")
516 def test_merge_with_nonexistent_entities(self):
517 """Test merging when one or both entities don't exist"""
518 # Create test data with nonexistent entities
519 nonexistent_data = [
520 {
521 "surviving_entity": "https://w3id.org/oc/meta/ra/9999",
522 "merged_entities": "https://w3id.org/oc/meta/ra/9998",
523 "Done": "False",
524 }
525 ]
526 test_file = os.path.join(BASE, "csv", "nonexistent.csv")
527 self.write_csv("nonexistent.csv", nonexistent_data)
528 self.merger.process_file(test_file)
529 data = EntityMerger.read_csv(test_file)
530 self.assertEqual(data[0]["Done"], "True")
532 def test_merge_multiple_entities(self):
533 """Test merging multiple entities into one surviving entity"""
534 # Create additional test entities
535 g_set = GraphSet(
536 "https://w3id.org/oc/meta/",
537 supplier_prefix="060",
538 custom_counter_handler=self.counter_handler,
539 )
541 # Create additional authors
542 author3 = g_set.add_ra(
543 resp_agent="https://orcid.org/0000-0002-8420-0696",
544 res=URIRef("https://w3id.org/oc/meta/ra/0603"),
545 )
546 author3.has_name("John A. Smith")
548 author4 = g_set.add_ra(
549 resp_agent="https://orcid.org/0000-0002-8420-0696",
550 res=URIRef("https://w3id.org/oc/meta/ra/0604"),
551 )
552 author4.has_name("J A Smith")
554 # Add identifiers
555 viaf_id = g_set.add_id(
556 resp_agent="https://orcid.org/0000-0002-8420-0696",
557 res=URIRef("https://w3id.org/oc/meta/id/0603"),
558 )
559 viaf_id.create_viaf("123456789")
560 author3.has_identifier(viaf_id)
562 researcher_id = g_set.add_id(
563 resp_agent="https://orcid.org/0000-0002-8420-0696",
564 res=URIRef("https://w3id.org/oc/meta/id/0604"),
565 )
566 researcher_id.create_wikidata("Q12345")
567 author4.has_identifier(researcher_id)
569 # Create publications and roles
570 pub2 = g_set.add_br(
571 resp_agent="https://orcid.org/0000-0002-8420-0696",
572 res=URIRef("https://w3id.org/oc/meta/br/0602"),
573 )
574 pub2.has_title("Another Test Publication")
576 role3 = g_set.add_ar(
577 resp_agent="https://orcid.org/0000-0002-8420-0696",
578 res=URIRef("https://w3id.org/oc/meta/ar/0603"),
579 )
580 role3.create_author()
581 role3.is_held_by(author3)
582 pub2.has_contributor(role3)
584 role4 = g_set.add_ar(
585 resp_agent="https://orcid.org/0000-0002-8420-0696",
586 res=URIRef("https://w3id.org/oc/meta/ar/0604"),
587 )
588 role4.create_author()
589 role4.is_held_by(author4)
590 pub2.has_contributor(role4)
592 # Store and upload
593 prov = ProvSet(
594 g_set,
595 "https://w3id.org/oc/meta/",
596 wanted_label=False,
597 custom_counter_handler=self.counter_handler,
598 )
599 prov.generate_provenance()
601 rdf_output = os.path.join(OUTPUT, "rdf") + os.sep
603 res_storer = Storer(
604 abstract_set=g_set,
605 dir_split=10000,
606 n_file_item=1000,
607 output_format="json-ld",
608 zip_output=False,
609 )
610 prov_storer = Storer(
611 abstract_set=prov,
612 dir_split=10000,
613 n_file_item=1000,
614 output_format="json-ld",
615 zip_output=False,
616 )
618 res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
619 prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
620 res_storer.upload_all(
621 triplestore_url=SERVER,
622 base_dir=rdf_output,
623 batch_size=10,
624 save_queries=False,
625 )
627 # Create merge data
628 merge_data = [
629 {
630 "surviving_entity": "https://w3id.org/oc/meta/ra/0601",
631 "merged_entities": "https://w3id.org/oc/meta/ra/0602; https://w3id.org/oc/meta/ra/0603; https://w3id.org/oc/meta/ra/0604",
632 "Done": "False",
633 }
634 ]
635 test_file = os.path.join(BASE, "csv", "multiple_merge.csv")
636 self.write_csv("multiple_merge.csv", merge_data)
638 # Process the merge
639 csv_folder = os.path.join(BASE, "csv")
640 self.merger.process_folder(csv_folder)
642 # Verify the results by checking the output files
643 rdf_path = os.path.join(OUTPUT, "rdf")
645 # 1. Check researcher file for surviving entity and merged data
646 ra_file = os.path.join(rdf_path, "ra", "060", "10000", "1000.json")
647 with open(ra_file) as f:
648 data = json.load(f)
649 for graph in data:
650 for entity in graph.get("@graph", []):
651 if entity["@id"] == "https://w3id.org/oc/meta/ra/0601":
652 # Check has all identifiers
653 identifiers = {
654 id_obj["@id"]
655 for id_obj in entity[
656 "http://purl.org/spar/datacite/hasIdentifier"
657 ]
658 }
659 self.assertEqual(len(identifiers), 4)
660 expected_ids = {
661 "https://w3id.org/oc/meta/id/0601",
662 "https://w3id.org/oc/meta/id/0602",
663 "https://w3id.org/oc/meta/id/0603",
664 "https://w3id.org/oc/meta/id/0604",
665 }
666 self.assertEqual(identifiers, expected_ids)
668 # Check name (should take the last merged name)
669 self.assertEqual(
670 entity["http://xmlns.com/foaf/0.1/name"][0]["@value"],
671 "J A Smith",
672 )
674 # Check merged entities no longer exist
675 self.assertNotIn(
676 entity["@id"],
677 [
678 "https://w3id.org/oc/meta/ra/0602",
679 "https://w3id.org/oc/meta/ra/0603",
680 "https://w3id.org/oc/meta/ra/0604",
681 ],
682 )
684 # 2. Check role assignments in agent role file
685 ar_file = os.path.join(rdf_path, "ar", "060", "10000", "1000.json")
686 with open(ar_file) as f:
687 data = json.load(f)
688 for graph in data:
689 for entity in graph.get("@graph", []):
690 if "http://purl.org/spar/pro/isHeldBy" in entity:
691 agent = entity["http://purl.org/spar/pro/isHeldBy"][0]["@id"]
692 self.assertEqual(
693 agent,
694 "https://w3id.org/oc/meta/ra/0601",
695 "All roles should point to surviving entity",
696 )
698 # 3. Check provenance
699 prov_file = os.path.join(
700 rdf_path, "ra", "060", "10000", "1000", "prov", "se.json"
701 )
702 with open(prov_file) as f:
703 data = json.load(f)
705 # Get all provenance snapshots for surviving entity
706 surviving_snapshots = []
707 for graph in data:
708 if graph["@id"] == "https://w3id.org/oc/meta/ra/0601/prov/":
709 for entity in graph.get("@graph", []):
710 # Skip creation snapshot
711 if (
712 "created"
713 not in entity.get(
714 "http://purl.org/dc/terms/description", [{}]
715 )[0]
716 .get("@value", "")
717 .lower()
718 ):
719 surviving_snapshots.append(entity)
721 # Should have 2 merge snapshots (one partial, one final)
722 self.assertEqual(
723 len(surviving_snapshots), 2, "Should have exactly 2 merge snapshots"
724 )
726 # Verify partial merge (0601 with 0602)
727 partial_merge = next(
728 s
729 for s in surviving_snapshots
730 if "0602" in s["http://purl.org/dc/terms/description"][0]["@value"]
731 and "0603" not in s["http://purl.org/dc/terms/description"][0]["@value"]
732 )
734 # Check partial merge metadata
735 self.assertIn("http://www.w3.org/ns/prov#generatedAtTime", partial_merge)
736 self.assertIn("http://www.w3.org/ns/prov#wasAttributedTo", partial_merge)
737 self.assertEqual(
738 partial_merge["http://www.w3.org/ns/prov#wasAttributedTo"][0]["@id"],
739 "https://orcid.org/0000-0002-8420-0696",
740 )
742 # Check partial merge query content
743 partial_query = partial_merge[
744 "https://w3id.org/oc/ontology/hasUpdateQuery"
745 ][0]["@value"]
746 expected_partial = {
747 "delete": [
748 '<https://w3id.org/oc/meta/ra/0601> <http://xmlns.com/foaf/0.1/name> "John Smith"'
749 ],
750 "insert": [
751 "<https://w3id.org/oc/meta/ra/0601> <http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0602>",
752 '<https://w3id.org/oc/meta/ra/0601> <http://xmlns.com/foaf/0.1/name> "J. Smith"',
753 ],
754 }
755 self.check_sparql_query_content(partial_query, expected_partial)
757 # Verify final merge (0601 with 0602, 0603, 0604)
758 final_merge = next(
759 s
760 for s in surviving_snapshots
761 if "0602" in s["http://purl.org/dc/terms/description"][0]["@value"]
762 and "0603" in s["http://purl.org/dc/terms/description"][0]["@value"]
763 and "0604" in s["http://purl.org/dc/terms/description"][0]["@value"]
764 )
766 # Check final merge metadata
767 self.assertIn("http://www.w3.org/ns/prov#generatedAtTime", final_merge)
768 self.assertIn("http://www.w3.org/ns/prov#wasAttributedTo", final_merge)
769 self.assertEqual(
770 final_merge["http://www.w3.org/ns/prov#wasAttributedTo"][0]["@id"],
771 "https://orcid.org/0000-0002-8420-0696",
772 )
774 # Check final merge query content
775 final_query = final_merge["https://w3id.org/oc/ontology/hasUpdateQuery"][0][
776 "@value"
777 ]
778 expected_final = {
779 "delete": [
780 '<https://w3id.org/oc/meta/ra/0601> <http://xmlns.com/foaf/0.1/name> "John Smith"'
781 ],
782 "insert": [
783 "<https://w3id.org/oc/meta/ra/0601> <http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0603>",
784 "<https://w3id.org/oc/meta/ra/0601> <http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0604>",
785 '<https://w3id.org/oc/meta/ra/0601> <http://xmlns.com/foaf/0.1/name> "J A Smith"',
786 "<https://w3id.org/oc/meta/ra/0601> <http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0602>",
787 ],
788 }
789 self.check_sparql_query_content(final_query, expected_final)
791 # Verify deletion snapshots exist for merged entities
792 merged_ids = ["0602", "0603", "0604"]
793 for merged_id in merged_ids:
794 merged_snapshots = []
795 for graph in data:
796 if graph["@id"] == f"https://w3id.org/oc/meta/ra/{merged_id}/prov/":
797 for entity in graph.get("@graph", []):
798 if (
799 "deleted"
800 in entity.get(
801 "http://purl.org/dc/terms/description", [{}]
802 )[0]
803 .get("@value", "")
804 .lower()
805 ):
806 merged_snapshots.append(entity)
808 self.assertGreater(
809 len(merged_snapshots),
810 0,
811 f"No deletion snapshot found for ra/{merged_id}",
812 )
814 # Verify deletion queries
815 for snapshot in merged_snapshots:
816 self.assertIn(
817 "https://w3id.org/oc/ontology/hasUpdateQuery", snapshot
818 )
819 delete_query = snapshot[
820 "https://w3id.org/oc/ontology/hasUpdateQuery"
821 ][0]["@value"]
822 self.assertIn(
823 f"<https://w3id.org/oc/meta/ra/{merged_id}>", delete_query
824 )
825 self.assertIn("DELETE DATA", delete_query)
827 def test_merge_with_conflicting_data(self):
828 """Test merging entities with conflicting information"""
829 # Create test entities with conflicting data
830 g_set = GraphSet(
831 "https://w3id.org/oc/meta/",
832 supplier_prefix="060",
833 custom_counter_handler=self.counter_handler,
834 )
836 # Create conflicting authors
837 author5 = g_set.add_ra(
838 resp_agent="https://orcid.org/0000-0002-8420-0696",
839 res=URIRef("https://w3id.org/oc/meta/ra/0605"),
840 )
841 author5.has_name("John Smith")
842 author5.has_given_name("John")
843 author5.has_family_name("Smith")
845 author6 = g_set.add_ra(
846 resp_agent="https://orcid.org/0000-0002-8420-0696",
847 res=URIRef("https://w3id.org/oc/meta/ra/0606"),
848 )
849 author6.has_name("Johnny Smith")
850 author6.has_given_name("Johnny")
851 author6.has_family_name("Smith")
853 # Add same identifier to both (which should be impossible in real data)
854 same_orcid = g_set.add_id(
855 resp_agent="https://orcid.org/0000-0002-8420-0696",
856 res=URIRef("https://w3id.org/oc/meta/id/0605"),
857 )
858 same_orcid.create_orcid("0000-0001-9999-9999")
859 author5.has_identifier(same_orcid)
861 same_orcid2 = g_set.add_id(
862 resp_agent="https://orcid.org/0000-0002-8420-0696",
863 res=URIRef("https://w3id.org/oc/meta/id/0606"),
864 )
865 same_orcid2.create_orcid("0000-0001-9999-9999")
866 author6.has_identifier(same_orcid2)
868 # Store and upload
869 prov = ProvSet(
870 g_set,
871 "https://w3id.org/oc/meta/",
872 wanted_label=False,
873 custom_counter_handler=self.counter_handler,
874 )
875 prov.generate_provenance()
877 rdf_output = os.path.join(OUTPUT, "rdf") + os.sep
879 res_storer = Storer(
880 abstract_set=g_set,
881 dir_split=10000,
882 n_file_item=1000,
883 output_format="json-ld",
884 zip_output=False,
885 )
886 prov_storer = Storer(
887 abstract_set=prov,
888 dir_split=10000,
889 n_file_item=1000,
890 output_format="json-ld",
891 zip_output=False,
892 )
894 res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
895 prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
896 res_storer.upload_all(
897 triplestore_url=SERVER,
898 base_dir=rdf_output,
899 batch_size=10,
900 save_queries=False,
901 )
903 # Create merge data
904 merge_data = [
905 {
906 "surviving_entity": "https://w3id.org/oc/meta/ra/0605",
907 "merged_entities": "https://w3id.org/oc/meta/ra/0606",
908 "Done": "False",
909 }
910 ]
911 test_file = os.path.join(BASE, "csv", "conflicting_merge.csv")
912 self.write_csv("conflicting_merge.csv", merge_data)
914 # Process the merge
915 csv_folder = os.path.join(BASE, "csv")
916 self.merger.process_folder(csv_folder)
918 # Verify the results by checking the output files
919 rdf_path = os.path.join(OUTPUT, "rdf")
921 # 1. Check researcher file for surviving entity and merged data
922 ra_file = os.path.join(rdf_path, "ra", "060", "10000", "1000.json")
923 with open(ra_file) as f:
924 data = json.load(f)
925 for graph in data:
926 for entity in graph.get("@graph", []):
927 if entity["@id"] == "https://w3id.org/oc/meta/ra/0605":
928 # Check identifiers - should only keep one instance
929 identifiers = {
930 id_obj["@id"]
931 for id_obj in entity.get(
932 "http://purl.org/spar/datacite/hasIdentifier", []
933 )
934 }
935 self.assertEqual(len(identifiers), 1)
936 self.assertEqual(
937 identifiers, {"https://w3id.org/oc/meta/id/0605"}
938 )
940 # Check name was preserved
941 self.assertEqual(
942 entity["http://xmlns.com/foaf/0.1/name"][0]["@value"],
943 "Johnny Smith",
944 )
945 self.assertEqual(
946 entity["http://xmlns.com/foaf/0.1/givenName"][0]["@value"],
947 "Johnny",
948 )
949 self.assertEqual(
950 entity["http://xmlns.com/foaf/0.1/familyName"][0]["@value"],
951 "Smith",
952 )
954 # Check merged entity does not exist in output
955 self.assertNotEqual(
956 entity["@id"], "https://w3id.org/oc/meta/ra/0606"
957 )
959 # 2. Check provenance
960 prov_file = os.path.join(
961 rdf_path, "ra", "060", "10000", "1000", "prov", "se.json"
962 )
963 with open(prov_file) as f:
964 data = json.load(f)
966 # Find merge snapshot
967 merge_snapshot = None
968 for graph in data:
969 if graph["@id"] == "https://w3id.org/oc/meta/ra/0605/prov/":
970 for entity in graph.get("@graph", []):
971 if (
972 "merge"
973 in entity.get("http://purl.org/dc/terms/description", [{}])[
974 0
975 ]
976 .get("@value", "")
977 .lower()
978 ):
979 merge_snapshot = entity
980 break
982 self.assertIsNotNone(merge_snapshot, "No merge snapshot found")
984 # Verify merge metadata
985 self.assertIn("http://www.w3.org/ns/prov#generatedAtTime", merge_snapshot)
986 self.assertIn("http://www.w3.org/ns/prov#wasAttributedTo", merge_snapshot)
988 # Check the merge query - should not duplicate the conflicting ORCID
989 merge_query = merge_snapshot["https://w3id.org/oc/ontology/hasUpdateQuery"][
990 0
991 ]["@value"]
992 expected_triples = {
993 "delete": [
994 '<https://w3id.org/oc/meta/ra/0605> <http://xmlns.com/foaf/0.1/name> "John Smith"',
995 '<https://w3id.org/oc/meta/ra/0605> <http://xmlns.com/foaf/0.1/givenName> "John"',
996 ],
997 "insert": [
998 '<https://w3id.org/oc/meta/ra/0605> <http://xmlns.com/foaf/0.1/name> "Johnny Smith"',
999 '<https://w3id.org/oc/meta/ra/0605> <http://xmlns.com/foaf/0.1/givenName> "Johnny"',
1000 ],
1001 }
1002 self.check_sparql_query_content(merge_query, expected_triples)
1004 # Verify deletion snapshot exists for merged entity
1005 delete_snapshot = None
1006 for graph in data:
1007 if graph["@id"] == "https://w3id.org/oc/meta/ra/0606/prov/":
1008 for entity in graph.get("@graph", []):
1009 if (
1010 "deleted"
1011 in entity.get("http://purl.org/dc/terms/description", [{}])[
1012 0
1013 ]
1014 .get("@value", "")
1015 .lower()
1016 ):
1017 delete_snapshot = entity
1018 break
1020 self.assertIsNotNone(
1021 delete_snapshot, "No deletion snapshot found for merged entity"
1022 )
1024 # Verify deletion query
1025 delete_query = delete_snapshot[
1026 "https://w3id.org/oc/ontology/hasUpdateQuery"
1027 ][0]["@value"]
1028 self.assertIn("DELETE DATA", delete_query)
1029 self.assertIn("<https://w3id.org/oc/meta/ra/0606>", delete_query)
1031 def test_merge_bibliographic_resources(self):
1032 """Test merging two bibliographic resource entities"""
1033 g_set = GraphSet(
1034 "https://w3id.org/oc/meta/",
1035 supplier_prefix="060",
1036 custom_counter_handler=self.counter_handler,
1037 )
1039 # Create first publication with some metadata
1040 pub1 = g_set.add_br(
1041 resp_agent="https://orcid.org/0000-0002-8420-0696",
1042 res=URIRef("https://w3id.org/oc/meta/br/0603"),
1043 )
1044 pub1.has_title("Data Integration Methods")
1045 pub1.has_subtitle("A Comprehensive Review")
1046 pub1.has_pub_date("2023")
1048 # Create issue for pub1
1049 issue = g_set.add_br(
1050 resp_agent="https://orcid.org/0000-0002-8420-0696",
1051 res=URIRef("https://w3id.org/oc/meta/br/0605"),
1052 )
1053 issue.create_issue()
1054 issue.has_number("4")
1055 pub1.is_part_of(issue)
1057 # Create resource embodiment for pub1
1058 re1 = g_set.add_re(
1059 resp_agent="https://orcid.org/0000-0002-8420-0696",
1060 res=URIRef("https://w3id.org/oc/meta/re/0603"),
1061 )
1062 re1.has_starting_page("1")
1063 re1.has_ending_page("20")
1064 pub1.has_format(re1)
1066 # Add DOI identifier for pub1
1067 doi_id = g_set.add_id(
1068 resp_agent="https://orcid.org/0000-0002-8420-0696",
1069 res=URIRef("https://w3id.org/oc/meta/id/0603"),
1070 )
1071 doi_id.create_doi("10.1000/example.doi.1")
1072 pub1.has_identifier(doi_id)
1074 # Create second publication with complementary metadata
1075 pub2 = g_set.add_br(
1076 resp_agent="https://orcid.org/0000-0002-8420-0696",
1077 res=URIRef("https://w3id.org/oc/meta/br/0604"),
1078 )
1079 pub2.has_title("Data Integration Methods") # Same title
1080 pub2.has_pub_date("2023") # Same year
1082 # Create volume for pub2
1083 volume = g_set.add_br(
1084 resp_agent="https://orcid.org/0000-0002-8420-0696",
1085 res=URIRef("https://w3id.org/oc/meta/br/0606"),
1086 )
1087 volume.create_volume()
1088 volume.has_number("15")
1089 pub2.is_part_of(volume)
1091 # Create resource embodiment for pub2
1092 re2 = g_set.add_re(
1093 resp_agent="https://orcid.org/0000-0002-8420-0696",
1094 res=URIRef("https://w3id.org/oc/meta/re/0604"),
1095 )
1096 re2.has_starting_page("100")
1097 re2.has_ending_page("120")
1098 pub2.has_format(re2)
1100 # Add ISBN identifier for pub2
1101 isbn_id = g_set.add_id(
1102 resp_agent="https://orcid.org/0000-0002-8420-0696",
1103 res=URIRef("https://w3id.org/oc/meta/id/0604"),
1104 )
1105 isbn_id.create_isbn("978-0-123456-47-2")
1106 pub2.has_identifier(isbn_id)
1108 # Create authors and roles
1109 author1 = g_set.add_ra(
1110 resp_agent="https://orcid.org/0000-0002-8420-0696",
1111 res=URIRef("https://w3id.org/oc/meta/ra/0605"),
1112 )
1113 author1.has_name("Jane Doe")
1115 author2 = g_set.add_ra(
1116 resp_agent="https://orcid.org/0000-0002-8420-0696",
1117 res=URIRef("https://w3id.org/oc/meta/ra/0606"),
1118 )
1119 author2.has_name("John Smith")
1121 # Add roles for pub1
1122 role1 = g_set.add_ar(
1123 resp_agent="https://orcid.org/0000-0002-8420-0696",
1124 res=URIRef("https://w3id.org/oc/meta/ar/0605"),
1125 )
1126 role1.create_author()
1127 role1.is_held_by(author1)
1128 pub1.has_contributor(role1)
1130 # Add roles for pub2
1131 role2 = g_set.add_ar(
1132 resp_agent="https://orcid.org/0000-0002-8420-0696",
1133 res=URIRef("https://w3id.org/oc/meta/ar/0606"),
1134 )
1135 role2.create_author()
1136 role2.is_held_by(author2)
1137 pub2.has_contributor(role2)
1139 # Store and upload
1140 prov = ProvSet(
1141 g_set,
1142 "https://w3id.org/oc/meta/",
1143 wanted_label=False,
1144 custom_counter_handler=self.counter_handler,
1145 )
1146 prov.generate_provenance()
1148 rdf_output = os.path.join(OUTPUT, "rdf") + os.sep
1150 res_storer = Storer(
1151 abstract_set=g_set,
1152 dir_split=10000,
1153 n_file_item=1000,
1154 output_format="json-ld",
1155 zip_output=False,
1156 )
1157 prov_storer = Storer(
1158 abstract_set=prov,
1159 dir_split=10000,
1160 n_file_item=1000,
1161 output_format="json-ld",
1162 zip_output=False,
1163 )
1165 res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
1166 prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
1167 res_storer.upload_all(
1168 triplestore_url=SERVER,
1169 base_dir=rdf_output,
1170 batch_size=10,
1171 save_queries=False,
1172 )
1174 # Create merge data
1175 merge_data = [
1176 {
1177 "surviving_entity": "https://w3id.org/oc/meta/br/0603",
1178 "merged_entities": "https://w3id.org/oc/meta/br/0604",
1179 "Done": "False",
1180 }
1181 ]
1182 test_file = os.path.join(BASE, "csv", "br_merge.csv")
1183 self.write_csv("br_merge.csv", merge_data)
1185 # Process the merge
1186 csv_folder = os.path.join(BASE, "csv")
1187 self.merger.process_folder(csv_folder)
1189 # Verify the results by checking the output files
1190 rdf_path = os.path.join(OUTPUT, "rdf")
1192 # 1. Check bibliographic resource file
1193 br_file = os.path.join(rdf_path, "br", "060", "10000", "1000.json")
1194 with open(br_file) as f:
1195 data = json.load(f)
1196 for graph in data:
1197 for entity in graph.get("@graph", []):
1198 if entity["@id"] == "https://w3id.org/oc/meta/br/0603":
1199 # Check basic metadata
1200 self.assertEqual(
1201 entity["http://purl.org/dc/terms/title"][0]["@value"],
1202 "Data Integration Methods",
1203 )
1204 self.assertEqual(
1205 entity["http://purl.org/spar/fabio/hasSubtitle"][0][
1206 "@value"
1207 ],
1208 "A Comprehensive Review",
1209 )
1210 self.assertEqual(
1211 entity[
1212 "http://prismstandard.org/namespaces/basic/2.0/publicationDate"
1213 ][0]["@value"],
1214 "2023",
1215 )
1217 # Check part relationships
1218 parts = {
1219 part["@id"]
1220 for part in entity["http://purl.org/vocab/frbr/core#partOf"]
1221 }
1222 self.assertEqual(len(parts), 1)
1223 self.assertIn(
1224 "https://w3id.org/oc/meta/br/0606", parts
1225 ) # Volume
1227 # Check formats (resource embodiments)
1228 formats = {
1229 fmt["@id"]
1230 for fmt in entity[
1231 "http://purl.org/vocab/frbr/core#embodiment"
1232 ]
1233 }
1234 self.assertEqual(len(formats), 1)
1235 self.assertIn("https://w3id.org/oc/meta/re/0603", formats)
1237 # Check identifiers
1238 identifiers = {
1239 id_obj["@id"]
1240 for id_obj in entity[
1241 "http://purl.org/spar/datacite/hasIdentifier"
1242 ]
1243 }
1244 self.assertEqual(len(identifiers), 2)
1245 self.assertIn("https://w3id.org/oc/meta/id/0603", identifiers)
1246 self.assertIn("https://w3id.org/oc/meta/id/0604", identifiers)
1248 # Check issue metadata
1249 elif entity["@id"] == "https://w3id.org/oc/meta/br/0605":
1250 self.assertIn(
1251 "http://purl.org/spar/fabio/JournalIssue", entity["@type"]
1252 )
1253 self.assertEqual(
1254 entity["http://purl.org/spar/fabio/hasSequenceIdentifier"][
1255 0
1256 ]["@value"],
1257 "4",
1258 )
1260 # Check volume metadata
1261 elif entity["@id"] == "https://w3id.org/oc/meta/br/0606":
1262 self.assertIn(
1263 "http://purl.org/spar/fabio/JournalVolume", entity["@type"]
1264 )
1265 self.assertEqual(
1266 entity["http://purl.org/spar/fabio/hasSequenceIdentifier"][
1267 0
1268 ]["@value"],
1269 "15",
1270 )
1272 # Check merged entity no longer exists
1273 self.assertNotEqual(
1274 entity["@id"], "https://w3id.org/oc/meta/br/0604"
1275 )
1277 # 2. Check resource embodiments
1278 re_file = os.path.join(rdf_path, "re", "060", "10000", "1000.json")
1279 with open(re_file) as f:
1280 data = json.load(f)
1281 res_embodiments = {}
1282 for graph in data:
1283 for entity in graph.get("@graph", []):
1284 if entity["@id"] in [
1285 "https://w3id.org/oc/meta/re/0603",
1286 "https://w3id.org/oc/meta/re/0604",
1287 ]:
1288 res_embodiments[entity["@id"]] = {
1289 "start": entity[
1290 "http://prismstandard.org/namespaces/basic/2.0/startingPage"
1291 ][0]["@value"],
1292 "end": entity[
1293 "http://prismstandard.org/namespaces/basic/2.0/endingPage"
1294 ][0]["@value"],
1295 }
1297 self.assertEqual(len(res_embodiments), 2)
1298 self.assertEqual(
1299 res_embodiments["https://w3id.org/oc/meta/re/0603"]["start"], "1"
1300 )
1301 self.assertEqual(
1302 res_embodiments["https://w3id.org/oc/meta/re/0603"]["end"], "20"
1303 )
1304 self.assertEqual(
1305 res_embodiments["https://w3id.org/oc/meta/re/0604"]["start"], "100"
1306 )
1307 self.assertEqual(
1308 res_embodiments["https://w3id.org/oc/meta/re/0604"]["end"], "120"
1309 )
1311 # 3. Check role assignments
1312 ar_file = os.path.join(rdf_path, "ar", "060", "10000", "1000.json")
1313 with open(ar_file) as f:
1314 data = json.load(f)
1315 for graph in data:
1316 for entity in graph.get("@graph", []):
1317 if entity["@id"] == "https://w3id.org/oc/meta/ar/0605":
1318 self.assertIn("http://purl.org/spar/pro/withRole", entity)
1319 self.assertEqual(
1320 entity["http://purl.org/spar/pro/withRole"][0]["@id"],
1321 "http://purl.org/spar/pro/author",
1322 )
1323 holder = entity["http://purl.org/spar/pro/isHeldBy"][0]["@id"]
1324 self.assertEqual(holder, "https://w3id.org/oc/meta/ra/0605")
1326 # 4. Check provenance
1327 prov_file = os.path.join(
1328 rdf_path, "br", "060", "10000", "1000", "prov", "se.json"
1329 )
1330 with open(prov_file) as f:
1331 data = json.load(f)
1333 # Find merge snapshot
1334 merge_snapshot = None
1335 for graph in data:
1336 if graph["@id"] == "https://w3id.org/oc/meta/br/0603/prov/":
1337 for entity in graph.get("@graph", []):
1338 if (
1339 "merge"
1340 in entity.get("http://purl.org/dc/terms/description", [{}])[
1341 0
1342 ]
1343 .get("@value", "")
1344 .lower()
1345 ):
1346 merge_snapshot = entity
1347 break
1349 self.assertIsNotNone(merge_snapshot, "No merge snapshot found")
1351 # Check merge query content
1352 merge_query = merge_snapshot["https://w3id.org/oc/ontology/hasUpdateQuery"][
1353 0
1354 ]["@value"]
1355 expected_triples = {
1356 "delete": [
1357 "<https://w3id.org/oc/meta/br/0603> <http://purl.org/vocab/frbr/core#partOf> <https://w3id.org/oc/meta/br/0605>"
1358 ],
1359 "insert": [
1360 "<https://w3id.org/oc/meta/br/0603> <http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0604>",
1361 "<https://w3id.org/oc/meta/br/0603> <http://purl.org/vocab/frbr/core#partOf> <https://w3id.org/oc/meta/br/0606>",
1362 ],
1363 }
1364 self.check_sparql_query_content(merge_query, expected_triples)
1366 # Verify deletion snapshot exists for merged entity
1367 delete_snapshot = None
1368 for graph in data:
1369 if graph["@id"] == "https://w3id.org/oc/meta/br/0604/prov/":
1370 for entity in graph.get("@graph", []):
1371 if (
1372 "deleted"
1373 in entity.get("http://purl.org/dc/terms/description", [{}])[
1374 0
1375 ]
1376 .get("@value", "")
1377 .lower()
1378 ):
1379 delete_snapshot = entity
1380 break
1382 self.assertIsNotNone(
1383 delete_snapshot, "No deletion snapshot found for merged entity"
1384 )
1386 # Verify deletion query
1387 delete_query = delete_snapshot[
1388 "https://w3id.org/oc/ontology/hasUpdateQuery"
1389 ][0]["@value"]
1390 expected_delete_triples = {
1391 "delete": [
1392 '<https://w3id.org/oc/meta/br/0604> <http://purl.org/dc/terms/title> "Data Integration Methods"',
1393 '<https://w3id.org/oc/meta/br/0604> <http://prismstandard.org/namespaces/basic/2.0/publicationDate> "2023"',
1394 "<https://w3id.org/oc/meta/br/0604> <http://purl.org/vocab/frbr/core#partOf> <https://w3id.org/oc/meta/br/0606>",
1395 "<https://w3id.org/oc/meta/br/0604> <http://purl.org/vocab/frbr/core#embodiment> <https://w3id.org/oc/meta/re/0604>",
1396 "<https://w3id.org/oc/meta/br/0604> <http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0604>",
1397 "<https://w3id.org/oc/meta/br/0604> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/spar/fabio/Expression>",
1398 "<https://w3id.org/oc/meta/br/0604> <http://purl.org/spar/pro/isDocumentContextFor> <https://w3id.org/oc/meta/ar/0606>",
1399 ]
1400 }
1401 self.check_sparql_query_content(
1402 delete_query, expected_delete_triples
1403 )
1405 # Check that all related entities have appropriate provenance
1406 for graph in data:
1407 # Check volume provenance
1408 if graph["@id"] == "https://w3id.org/oc/meta/br/0606/prov/":
1409 found_volume_creation = False
1410 for entity in graph.get("@graph", []):
1411 if (
1412 "created"
1413 in entity.get(
1414 "http://purl.org/dc/terms/description", [{}]
1415 )[0]
1416 .get("@value", "")
1417 .lower()
1418 ):
1419 found_volume_creation = True
1420 self.assertIn(
1421 "http://www.w3.org/ns/prov#generatedAtTime",
1422 entity,
1423 )
1424 self.assertIn(
1425 "http://www.w3.org/ns/prov#wasAttributedTo",
1426 entity,
1427 )
1428 self.assertTrue(
1429 found_volume_creation,
1430 "No creation provenance found for volume",
1431 )
1433 # Check resource embodiment provenance
1434 if graph["@id"] == "https://w3id.org/oc/meta/re/0604/prov/":
1435 found_re_creation = False
1436 for entity in graph.get("@graph", []):
1437 if (
1438 "created"
1439 in entity.get(
1440 "http://purl.org/dc/terms/description", [{}]
1441 )[0]
1442 .get("@value", "")
1443 .lower()
1444 ):
1445 found_re_creation = True
1446 self.assertIn(
1447 "http://www.w3.org/ns/prov#generatedAtTime",
1448 entity,
1449 )
1450 self.assertIn(
1451 "http://www.w3.org/ns/prov#wasAttributedTo",
1452 entity,
1453 )
1454 self.assertTrue(
1455 found_re_creation,
1456 "No creation provenance found for resource embodiment",
1457 )
1459 # Verify all metadata inheritance
1460 # We expect the surviving entity to inherit all identifiers
1461 # while maintaining its original metadata (title, subtitle, resource embodiment, issue, contributors)
1463 # Check if provenance shows correct sequence of operations
1464 merge_timestamps = []
1465 for graph in data:
1466 if graph["@id"] == "https://w3id.org/oc/meta/br/0603/prov/":
1467 for entity in graph.get("@graph", []):
1468 if (
1469 "merge"
1470 in entity.get(
1471 "http://purl.org/dc/terms/description", [{}]
1472 )[0]
1473 .get("@value", "")
1474 .lower()
1475 ):
1476 timestamp = entity[
1477 "http://www.w3.org/ns/prov#generatedAtTime"
1478 ][0]["@value"]
1479 merge_timestamps.append(timestamp)
1481 # Check timestamps are in correct order
1482 self.assertEqual(
1483 len(merge_timestamps),
1484 1,
1485 "Should have exactly one merge operation",
1486 )
1488 br_file = os.path.join(rdf_path, "br", "060", "10000", "1000.json")
1489 with open(br_file) as f:
1490 data = json.load(f)
1491 volume_found = False
1492 for graph in data:
1493 for entity in graph.get("@graph", []):
1494 if (
1495 entity["@id"] == "https://w3id.org/oc/meta/br/0606"
1496 ): # Volume
1497 volume_found = True
1498 self.assertIn(
1499 "http://purl.org/spar/fabio/JournalVolume",
1500 entity["@type"],
1501 )
1503 self.assertTrue(
1504 volume_found, "Volume should still exist after merge"
1505 )
1507 re_file = os.path.join(rdf_path, "re", "060", "10000", "1000.json")
1508 with open(re_file) as f:
1509 data = json.load(f)
1510 re_found = False
1511 for graph in data:
1512 for entity in graph.get("@graph", []):
1513 if (
1514 entity["@id"] == "https://w3id.org/oc/meta/re/0604"
1515 ): # RE from merged entity
1516 re_found = True
1517 self.assertEqual(
1518 entity[
1519 "http://prismstandard.org/namespaces/basic/2.0/startingPage"
1520 ][0]["@value"],
1521 "100",
1522 )
1524 self.assertTrue(
1525 re_found,
1526 "Resource embodiment should still exist after merge",
1527 )
1529 def test_fetch_related_entities_batch(self):
1530 """Test batch fetching of related entities"""
1531 meta_editor = MetaEditor(
1532 META_CONFIG, "https://orcid.org/0000-0002-8420-0696", save_queries=False
1533 )
1535 g_set = GraphSet(
1536 "https://w3id.org/oc/meta/",
1537 supplier_prefix="060",
1538 custom_counter_handler=self.counter_handler,
1539 )
1541 # Utilizziamo un insieme più piccolo di numeri validi per il test
1542 valid_numbers = [11, 12, 13, 14, 15]
1543 entities = {}
1545 # Creiamo gli autori e li memorizziamo in un dizionario per facile accesso
1546 for i in valid_numbers:
1547 ra = g_set.add_ra(
1548 resp_agent="https://orcid.org/0000-0002-8420-0696",
1549 res=URIRef(f"https://w3id.org/oc/meta/ra/060{i}"),
1550 )
1551 ra.has_name(f"Author {i}")
1552 entities[i] = ra
1554 # Creiamo le entità correlate per ogni autore
1555 for i in valid_numbers:
1556 # Creiamo l'identificatore
1557 identifier = g_set.add_id(
1558 resp_agent="https://orcid.org/0000-0002-8420-0696",
1559 res=URIRef(f"https://w3id.org/oc/meta/id/060{i}"),
1560 )
1561 identifier.create_orcid(f"0000-0001-{i:04d}-1111")
1562 entities[i].has_identifier(identifier)
1564 # Creiamo il ruolo
1565 role = g_set.add_ar(
1566 resp_agent="https://orcid.org/0000-0002-8420-0696",
1567 res=URIRef(f"https://w3id.org/oc/meta/ar/060{i}"),
1568 )
1569 role.create_author()
1570 role.is_held_by(entities[i])
1572 # Creiamo la pubblicazione
1573 pub = g_set.add_br(
1574 resp_agent="https://orcid.org/0000-0002-8420-0696",
1575 res=URIRef(f"https://w3id.org/oc/meta/br/060{i}"),
1576 )
1577 pub.has_title(f"Publication {i}")
1578 pub.has_contributor(role)
1580 prov = ProvSet(
1581 g_set,
1582 "https://w3id.org/oc/meta/",
1583 wanted_label=False,
1584 custom_counter_handler=self.counter_handler,
1585 )
1586 prov.generate_provenance()
1588 rdf_output = os.path.join(OUTPUT, "rdf") + os.sep
1590 res_storer = Storer(
1591 abstract_set=g_set,
1592 dir_split=10000,
1593 n_file_item=1000,
1594 output_format="json-ld",
1595 zip_output=False,
1596 )
1597 prov_storer = Storer(
1598 abstract_set=prov,
1599 dir_split=10000,
1600 n_file_item=1000,
1601 output_format="json-ld",
1602 zip_output=False,
1603 )
1605 res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
1606 prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
1607 res_storer.upload_all(
1608 triplestore_url=SERVER,
1609 base_dir=rdf_output,
1610 batch_size=10,
1611 save_queries=False,
1612 )
1614 batch_sizes = [1, 5, 11, 25]
1615 for batch_size in batch_sizes:
1616 with self.subTest(batch_size=batch_size):
1617 # Test con una singola entità
1618 merged_entities = [f"https://w3id.org/oc/meta/ra/060{valid_numbers[0]}"]
1619 surviving_entities = [
1620 f"https://w3id.org/oc/meta/ra/060{valid_numbers[1]}"
1621 ]
1623 related = self.merger.fetch_related_entities_batch(
1624 meta_editor=meta_editor,
1625 merged_entities=merged_entities,
1626 surviving_entities=surviving_entities,
1627 batch_size=batch_size,
1628 )
1630 expected_related = {
1631 URIRef(
1632 f"https://w3id.org/oc/meta/id/060{valid_numbers[0]}"
1633 ), # ID della merged
1634 URIRef(
1635 f"https://w3id.org/oc/meta/ar/060{valid_numbers[0]}"
1636 ), # AR della merged
1637 URIRef(
1638 f"https://w3id.org/oc/meta/id/060{valid_numbers[1]}"
1639 ), # AR della surviving
1640 }
1642 self.assertEqual(related, expected_related)
1644 # Test con multiple entità
1645 merged_entities = [
1646 f"https://w3id.org/oc/meta/ra/060{i}" for i in valid_numbers[:3]
1647 ]
1648 surviving_entities = [
1649 f"https://w3id.org/oc/meta/ra/060{valid_numbers[3]}"
1650 ]
1652 related = self.merger.fetch_related_entities_batch(
1653 meta_editor=meta_editor,
1654 merged_entities=merged_entities,
1655 surviving_entities=surviving_entities,
1656 batch_size=batch_size,
1657 )
1659 expected_related = set()
1660 for i in valid_numbers[:3]: # Entità merged
1661 expected_related.add(URIRef(f"https://w3id.org/oc/meta/id/060{i}"))
1662 expected_related.add(URIRef(f"https://w3id.org/oc/meta/ar/060{i}"))
1663 expected_related.add(
1664 URIRef(f"https://w3id.org/oc/meta/id/060{valid_numbers[3]}")
1665 )
1667 self.assertEqual(related, expected_related)
1669 def test_merge_bibliographic_resources_with_multiple_identifiers(self):
1670 """Test merging two bibliographic resources with different identifiers"""
1671 g_set = GraphSet(
1672 "https://w3id.org/oc/meta/",
1673 supplier_prefix="065",
1674 custom_counter_handler=self.counter_handler,
1675 )
1677 # Create first publication (surviving entity)
1678 pub1 = g_set.add_br(
1679 resp_agent="https://orcid.org/0000-0002-8420-0696",
1680 res=URIRef("https://w3id.org/oc/meta/br/06501844005"),
1681 )
1682 pub1.has_title("Higgsing The Stringy Higher Spin Symmetry")
1683 pub1.has_pub_date("2015-10")
1685 # Add first identifier (DOI)
1686 doi_id1 = g_set.add_id(
1687 resp_agent="https://orcid.org/0000-0002-8420-0696",
1688 res=URIRef("https://w3id.org/oc/meta/id/0680503588"),
1689 )
1690 doi_id1.create_doi("10.1007/jhep10(2015)101")
1691 pub1.has_identifier(doi_id1)
1693 # Add part of relationship
1694 journal = g_set.add_br(
1695 resp_agent="https://orcid.org/0000-0002-8420-0696",
1696 res=URIRef("https://w3id.org/oc/meta/br/06501844297"),
1697 )
1698 pub1.is_part_of(journal)
1700 # Add contributors for first publication
1701 for i in range(4):
1702 role = g_set.add_ar(
1703 resp_agent="https://orcid.org/0000-0002-8420-0696",
1704 res=URIRef(f"https://w3id.org/oc/meta/ar/0650842286{7+i}"),
1705 )
1706 role.create_author()
1707 pub1.has_contributor(role)
1709 # Create second publication (to be merged)
1710 pub2 = g_set.add_br(
1711 resp_agent="https://orcid.org/0000-0002-8420-0696",
1712 res=URIRef("https://w3id.org/oc/meta/br/06804303923"),
1713 )
1714 pub2.has_title("Higgsing The Stringy Higher Spin Symmetry")
1715 pub2.has_pub_date("2015-10-01")
1717 # Add second identifier (additional DOI)
1718 doi_id2 = g_set.add_id(
1719 resp_agent="https://orcid.org/0000-0002-8420-0696",
1720 res=URIRef("https://w3id.org/oc/meta/id/0680503589"),
1721 )
1722 doi_id2.create_doi("10.3929/ethz-b-000105964")
1723 pub2.has_identifier(doi_id2)
1725 # Add contributors for second publication
1726 for i in range(4):
1727 role = g_set.add_ar(
1728 resp_agent="https://orcid.org/0000-0002-8420-0696",
1729 res=URIRef(f"https://w3id.org/oc/meta/ar/0680174860{1+i}"),
1730 )
1731 role.create_author()
1732 pub2.has_contributor(role)
1734 # Store and upload test data
1735 prov = ProvSet(
1736 g_set,
1737 "https://w3id.org/oc/meta/",
1738 wanted_label=False,
1739 custom_counter_handler=self.counter_handler,
1740 )
1741 prov.generate_provenance()
1743 rdf_output = os.path.join(OUTPUT, "rdf") + os.sep
1745 res_storer = Storer(
1746 abstract_set=g_set,
1747 dir_split=10000,
1748 n_file_item=1000,
1749 output_format="json-ld",
1750 zip_output=False,
1751 )
1752 prov_storer = Storer(
1753 abstract_set=prov,
1754 dir_split=10000,
1755 n_file_item=1000,
1756 output_format="json-ld",
1757 zip_output=False,
1758 )
1760 res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
1761 prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
1762 res_storer.upload_all(
1763 triplestore_url=SERVER,
1764 base_dir=rdf_output,
1765 batch_size=10,
1766 save_queries=False,
1767 )
1769 # # Create merge data
1770 merge_data = [
1771 {
1772 "surviving_entity": "https://w3id.org/oc/meta/br/06501844005",
1773 "merged_entities": "https://w3id.org/oc/meta/br/06804303923",
1774 "Done": "False",
1775 }
1776 ]
1777 test_file = os.path.join(BASE, "csv", "br_dois_merge.csv")
1778 self.write_csv("br_dois_merge.csv", merge_data)
1780 # # Process the merge
1781 self.merger.process_folder(os.path.join(BASE, "csv"))
1783 # # Verify the results
1784 rdf_path = os.path.join(OUTPUT, "rdf")
1785 br_file = os.path.join(rdf_path, "br", "0650", "1850000", "1845000.json")
1787 with open(br_file) as f:
1788 data = json.load(f)
1789 surviving_entity_found = False
1791 for graph in data:
1792 for entity in graph.get("@graph", []):
1793 if entity["@id"] == "https://w3id.org/oc/meta/br/06501844005":
1794 surviving_entity_found = True
1796 # Check identifiers - should have both DOIs
1797 identifiers = {
1798 id_obj["@id"]
1799 for id_obj in entity[
1800 "http://purl.org/spar/datacite/hasIdentifier"
1801 ]
1802 }
1803 self.assertEqual(len(identifiers), 2)
1804 expected_ids = {
1805 "https://w3id.org/oc/meta/id/0680503588",
1806 "https://w3id.org/oc/meta/id/0680503589",
1807 }
1808 self.assertEqual(identifiers, expected_ids)
1810 # Check other metadata preserved
1811 self.assertEqual(
1812 entity["http://purl.org/dc/terms/title"][0]["@value"],
1813 "Higgsing The Stringy Higher Spin Symmetry",
1814 )
1815 self.assertEqual(
1816 entity[
1817 "http://prismstandard.org/namespaces/basic/2.0/publicationDate"
1818 ][0]["@value"],
1819 "2015-10-01", # Should keep original date format
1820 )
1822 # Check part of relationship preserved
1823 self.assertEqual(
1824 entity["http://purl.org/vocab/frbr/core#partOf"][0]["@id"],
1825 "https://w3id.org/oc/meta/br/06501844297",
1826 )
1828 # Verify merged entity doesn't exist
1829 self.assertNotEqual(
1830 entity["@id"], "https://w3id.org/oc/meta/br/06804303923"
1831 )
1833 self.assertTrue(
1834 surviving_entity_found, "Surviving entity not found in output"
1835 )
1837 # # Verify provenance
1838 prov_file = os.path.join(
1839 rdf_path, "br", "0650", "1850000", "1845000", "prov", "se.json"
1840 )
1841 with open(prov_file) as f:
1842 data = json.load(f)
1843 merge_snapshot_found = False
1845 for graph in data:
1846 if graph["@id"] == "https://w3id.org/oc/meta/br/06501844005/prov/":
1847 for entity in graph.get("@graph", []):
1848 if (
1849 "merge"
1850 in entity.get("http://purl.org/dc/terms/description", [{}])[
1851 0
1852 ]
1853 .get("@value", "")
1854 .lower()
1855 ):
1856 merge_snapshot_found = True
1858 # Check merge query content
1859 merge_query = entity[
1860 "https://w3id.org/oc/ontology/hasUpdateQuery"
1861 ][0]["@value"]
1862 expected_triples = {
1863 "insert": [
1864 "<https://w3id.org/oc/meta/br/06501844005> <http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0680503589>"
1865 ]
1866 }
1867 self.check_sparql_query_content(
1868 merge_query, expected_triples
1869 )
1871 self.assertTrue(
1872 merge_snapshot_found, "No merge snapshot found in provenance"
1873 )
1876if __name__ == "__main__":
1877 unittest.main()