Coverage for test/entity_merger_test.py: 98%
684 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
1import csv
2import json
3import os
4import re
5import unittest
6from shutil import rmtree
8import redis
9import yaml
10from oc_meta.run.merge.entities import EntityMerger
11from oc_meta.run.meta_editor import MetaEditor
12from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
13from oc_ocdm.graph import GraphSet
14from oc_ocdm.prov.prov_set import ProvSet
15from oc_ocdm.storer import Storer
16from rdflib import URIRef
17from SPARQLWrapper import POST, SPARQLWrapper
19BASE = os.path.join("test", "merger")
20OUTPUT = os.path.join(BASE, "output/")
21META_CONFIG = os.path.join("test", "merger", "meta_config.yaml")
22SERVER = "http://127.0.0.1:8805/sparql"
23PROV_SERVER = "http://127.0.0.1:8806/sparql"
25# Redis configuration
26REDIS_HOST = "localhost"
27REDIS_PORT = 6379
28REDIS_DB = 5 # For counters
29REDIS_CACHE_DB = 2 # For cache, using same test DB as on_triplestore_test.py
32def reset_triplestore():
33 """Reset the test triplestore graphs"""
34 # Reset main triplestore
35 endpoint = SPARQLWrapper(SERVER)
36 for graph in [
37 "https://w3id.org/oc/meta/br/",
38 "https://w3id.org/oc/meta/ra/",
39 "https://w3id.org/oc/meta/re/",
40 "https://w3id.org/oc/meta/id/",
41 "https://w3id.org/oc/meta/ar/",
42 ]:
43 endpoint.setQuery(f"CLEAR GRAPH <{graph}>")
44 endpoint.setMethod(POST)
45 endpoint.query()
47 # Reset provenance triplestore
48 prov_endpoint = SPARQLWrapper(PROV_SERVER)
49 for graph in [
50 "https://w3id.org/oc/meta/br/",
51 "https://w3id.org/oc/meta/ra/",
52 "https://w3id.org/oc/meta/re/",
53 "https://w3id.org/oc/meta/id/",
54 "https://w3id.org/oc/meta/ar/",
55 ]:
56 prov_endpoint.setQuery(f"CLEAR GRAPH <{graph}>")
57 prov_endpoint.setMethod(POST)
58 prov_endpoint.query()
61def reset_redis_counters():
62 redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
63 redis_cache_client = redis.Redis(
64 host=REDIS_HOST, port=REDIS_PORT, db=REDIS_CACHE_DB
65 )
66 redis_client.flushdb()
67 redis_cache_client.flushdb()
70def get_counter_handler():
71 return RedisCounterHandler(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
74class TestEntityMerger(unittest.TestCase):
75 @classmethod
76 def setUpClass(cls):
77 cls.counter_handler = get_counter_handler()
79 def setUp(self):
80 # Reset environment
81 if os.path.exists(OUTPUT):
82 rmtree(OUTPUT)
83 os.makedirs(os.path.join(BASE, "csv"), exist_ok=True)
84 reset_triplestore()
85 reset_redis_counters()
87 # Create temporary directory for cache files
88 self.temp_dir = os.path.join("test", "temp_entity_merger_test")
89 if os.path.exists(self.temp_dir):
90 rmtree(self.temp_dir)
91 os.makedirs(self.temp_dir)
93 # Setup cache files
94 self.cache_file = os.path.join(self.temp_dir, "ts_upload_cache.json")
95 self.failed_file = os.path.join(self.temp_dir, "failed_queries.txt")
96 self.stop_file = os.path.join(self.temp_dir, ".stop_upload")
98 # Create separate directories for data and provenance update queries
99 self.data_update_dir = os.path.join(self.temp_dir, "to_be_uploaded_data")
100 self.prov_update_dir = os.path.join(self.temp_dir, "to_be_uploaded_prov")
101 os.makedirs(self.data_update_dir, exist_ok=True)
102 os.makedirs(self.prov_update_dir, exist_ok=True)
104 # Update config with Redis and cache settings
105 with open(META_CONFIG, encoding="utf-8") as file:
106 settings = yaml.full_load(file)
107 settings.update(
108 {
109 "redis_host": REDIS_HOST,
110 "redis_port": REDIS_PORT,
111 "redis_db": REDIS_DB,
112 "redis_cache_db": REDIS_CACHE_DB,
113 "ts_upload_cache": self.cache_file,
114 "ts_failed_queries": self.failed_file,
115 "ts_stop_file": self.stop_file,
116 "provenance_triplestore_url": PROV_SERVER,
117 "data_update_dir": self.data_update_dir,
118 "prov_update_dir": self.prov_update_dir
119 }
120 )
121 with open(META_CONFIG, "w", encoding="utf-8") as file:
122 yaml.dump(settings, file)
124 # Initialize test data
125 self.setup_test_data()
127 # Create merger instance
128 self.merger = EntityMerger(
129 meta_config=META_CONFIG,
130 resp_agent="https://orcid.org/0000-0002-8420-0696",
131 entity_types=["ra", "br", "id"],
132 stop_file_path="stop.out",
133 workers=4,
134 )
136 def tearDown(self):
137 if os.path.exists(os.path.join(BASE, "csv")):
138 rmtree(os.path.join(BASE, "csv"))
139 if os.path.exists(OUTPUT):
140 rmtree(OUTPUT)
141 if os.path.exists("stop.out"):
142 os.remove("stop.out")
143 if os.path.exists(self.temp_dir):
144 rmtree(self.temp_dir)
145 reset_triplestore()
146 reset_redis_counters()
148 def setup_test_data(self):
149 """Create initial test data in triplestore"""
150 # Create a GraphSet for test data
151 g_set = GraphSet(
152 "https://w3id.org/oc/meta/",
153 supplier_prefix="060",
154 custom_counter_handler=self.counter_handler,
155 )
157 # Create first author entity with specific ID
158 author1 = g_set.add_ra(
159 resp_agent="https://orcid.org/0000-0002-8420-0696",
160 res=URIRef("https://w3id.org/oc/meta/ra/0601"),
161 )
162 author1.has_name("John Smith")
164 # Create ORCID identifier for author1 with specific ID
165 orcid_id = g_set.add_id(
166 resp_agent="https://orcid.org/0000-0002-8420-0696",
167 res=URIRef("https://w3id.org/oc/meta/id/0601"),
168 )
169 orcid_id.create_orcid("0000-0001-1234-5678")
170 author1.has_identifier(orcid_id)
172 # Create second author entity with specific ID
173 author2 = g_set.add_ra(
174 resp_agent="https://orcid.org/0000-0002-8420-0696",
175 res=URIRef("https://w3id.org/oc/meta/ra/0602"),
176 )
177 author2.has_name("J. Smith")
179 # Create VIAF identifier for author2 with specific ID
180 viaf_id = g_set.add_id(
181 resp_agent="https://orcid.org/0000-0002-8420-0696",
182 res=URIRef("https://w3id.org/oc/meta/id/0602"),
183 )
184 viaf_id.create_viaf("12345678")
185 author2.has_identifier(viaf_id)
187 # Create a publication with specific ID
188 pub = g_set.add_br(
189 resp_agent="https://orcid.org/0000-0002-8420-0696",
190 res=URIRef("https://w3id.org/oc/meta/br/0601"),
191 )
192 pub.has_title("Test Publication")
193 pub.has_pub_date("2024-01-01")
195 # Create role for first author with specific ID
196 role1 = g_set.add_ar(
197 resp_agent="https://orcid.org/0000-0002-8420-0696",
198 res=URIRef("https://w3id.org/oc/meta/ar/0601"),
199 )
200 role1.create_author()
201 role1.is_held_by(author1)
202 pub.has_contributor(role1)
204 # Create role for second author with specific ID
205 role2 = g_set.add_ar(
206 resp_agent="https://orcid.org/0000-0002-8420-0696",
207 res=URIRef("https://w3id.org/oc/meta/ar/0602"),
208 )
209 role2.create_author()
210 role2.is_held_by(author2)
211 pub.has_contributor(role2)
213 prov = ProvSet(
214 g_set,
215 "https://w3id.org/oc/meta/",
216 wanted_label=False,
217 custom_counter_handler=self.counter_handler,
218 )
219 prov.generate_provenance()
221 rdf_output = os.path.join(OUTPUT, "rdf") + os.sep
223 res_storer = Storer(
224 abstract_set=g_set,
225 dir_split=10000,
226 n_file_item=1000,
227 output_format="json-ld",
228 zip_output=False,
229 )
230 prov_storer = Storer(
231 abstract_set=prov,
232 dir_split=10000,
233 n_file_item=1000,
234 output_format="json-ld",
235 zip_output=False,
236 )
238 res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
239 prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
240 res_storer.upload_all(
241 triplestore_url=SERVER,
242 base_dir=rdf_output,
243 batch_size=10,
244 save_queries=False,
245 )
247 # Create CSV file for merger
248 merge_data = [
249 {
250 "surviving_entity": str(author1.res),
251 "merged_entities": str(author2.res),
252 "Done": "False",
253 }
254 ]
255 self.write_csv("merge_test.csv", merge_data)
257 def write_csv(self, filename: str, data: list):
258 filepath = os.path.join(BASE, "csv", filename)
259 with open(filepath, "w", newline="", encoding="utf-8") as f:
260 writer = csv.DictWriter(f, fieldnames=data[0].keys())
261 writer.writeheader()
262 writer.writerows(data)
264 def check_sparql_query_content(self, query: str, expected_triples: dict):
265 """
266 Check if a SPARQL query contains expected triples in DELETE and INSERT sections.
268 Args:
269 query (str): The SPARQL update query string
270 expected_triples (dict): Dictionary with 'delete' and 'insert' keys containing
271 lists of triple patterns to check for
272 """
273 # Split query into DELETE and INSERT sections
274 delete_match = re.search(
275 r"DELETE DATA \{ GRAPH.*?\{(.*?)\}.*?\}", query, re.DOTALL
276 )
277 insert_match = re.search(
278 r"INSERT DATA \{ GRAPH.*?\{(.*?)\}.*?\}", query, re.DOTALL
279 )
281 delete_section = delete_match.group(1) if delete_match else ""
282 insert_section = insert_match.group(1) if insert_match else ""
283 # Check DELETE patterns
284 if "delete" in expected_triples:
285 for triple in expected_triples["delete"]:
286 self.assertIn(
287 triple,
288 delete_section.strip(),
289 f"Expected triple not found in DELETE section: {triple}",
290 )
292 # Check INSERT patterns
293 if "insert" in expected_triples:
294 for triple in expected_triples["insert"]:
295 self.assertIn(
296 triple,
297 insert_section.strip(),
298 f"Expected triple not found in INSERT section: {triple}",
299 )
301 def test_get_entity_type(self):
302 """Test the static method get_entity_type"""
303 test_cases = [
304 ("https://w3id.org/oc/meta/ra/06107", "ra"),
305 ("https://w3id.org/oc/meta/br/06101", "br"),
306 ("https://w3id.org/oc/meta/id/06105", "id"),
307 ("https://example.com/invalid/url", None),
308 ("", None),
309 ]
311 for url, expected in test_cases:
312 with self.subTest(url=url):
313 self.assertEqual(EntityMerger.get_entity_type(url), expected)
315 def test_read_write_csv(self):
316 """Test CSV read and write operations"""
317 test_data = [
318 {
319 "surviving_entity": "https://w3id.org/oc/meta/ra/06107",
320 "merged_entities": "https://w3id.org/oc/meta/ra/06205",
321 "Done": "False",
322 }
323 ]
325 # Write test data
326 test_file = os.path.join(BASE, "csv", "test.csv")
327 EntityMerger.write_csv(test_file, test_data)
329 # Read back and verify
330 read_data = EntityMerger.read_csv(test_file)
331 self.assertEqual(test_data, read_data)
333 def test_count_csv_rows(self):
334 """Test CSV row counting"""
335 # Test with empty file
336 empty_file = os.path.join(BASE, "csv", "empty.csv")
337 with open(empty_file, "w", newline="", encoding="utf-8") as f:
338 writer = csv.DictWriter(f, fieldnames=["col1", "col2"])
339 writer.writeheader()
340 self.assertEqual(EntityMerger.count_csv_rows(empty_file), 0)
342 # Test with multiple rows
343 test_file = os.path.join(BASE, "input", "0.csv")
344 row_count = EntityMerger.count_csv_rows(test_file)
345 self.assertEqual(row_count, 1)
347 def test_process_file_with_stop_file(self):
348 """Test that processing stops when stop file is present"""
349 # Create stop file
350 with open(self.merger.stop_file_path, "w") as f:
351 f.write("")
353 # Process test file
354 test_file = os.path.join(BASE, "csv", "merge_test.csv")
355 result = self.merger.process_file(test_file)
357 # Verify the file wasn't processed (Done should still be False)
358 data = EntityMerger.read_csv(test_file)
359 self.assertEqual(data[0]["Done"], "False")
361 def test_process_folder(self):
362 """Test processing multiple files in a folder"""
363 csv_folder = os.path.join(BASE, "csv")
364 self.merger.process_folder(csv_folder)
366 # Verify all files were processed
367 for filename in ["merge_test.csv"]:
368 with self.subTest(file=filename):
369 data = EntityMerger.read_csv(os.path.join(csv_folder, filename))
370 self.assertEqual(data[0]["Done"], "True")
372 def test_process_folder_with_worker_limit(self):
373 """Test processing folder with worker count > 4"""
374 self.merger.workers = 5
375 csv_folder = os.path.join(BASE, "csv")
377 # Create a large file
378 large_data = [
379 {
380 "surviving_entity": f"https://w3id.org/oc/meta/ra/0610{i}",
381 "merged_entities": f"https://w3id.org/oc/meta/ra/0620{i}",
382 "Done": "False",
383 }
384 for i in range(15000)
385 ] # Create more than 10000 rows
386 self.write_csv("large.csv", large_data)
388 self.merger.process_folder(csv_folder)
390 # Verify only small files were processed
391 large_file_data = EntityMerger.read_csv(os.path.join(csv_folder, "large.csv"))
392 self.assertEqual(
393 large_file_data[0]["Done"], "False"
394 ) # Large file should be skipped
396 small_file_data = EntityMerger.read_csv(
397 os.path.join(csv_folder, "merge_test.csv")
398 )
399 self.assertEqual(
400 small_file_data[0]["Done"], "True"
401 ) # Small file should be processed
403 def test_merge_authors_with_real_data(self):
404 """Test merging two author entities with real data"""
405 # Process the merge
406 csv_folder = os.path.join(BASE, "csv")
407 self.merger.process_folder(csv_folder)
409 # Verify files structure
410 rdf_path = os.path.join(OUTPUT)
411 self.assertTrue(
412 os.path.exists(os.path.join(rdf_path, "rdf", "ra", "060", "10000", "1000"))
413 )
414 self.assertTrue(
415 os.path.exists(
416 os.path.join(rdf_path, "rdf", "ra", "060", "10000", "1000", "prov")
417 )
418 )
420 # Load and verify data files
421 ra_file = os.path.join(rdf_path, "rdf", "ra", "060", "10000", "1000.json")
422 with open(ra_file) as f:
423 data = json.load(f)
424 for graph in data:
425 for entity in graph.get("@graph", []):
426 if entity["@id"] == "https://w3id.org/oc/meta/ra/0601":
427 # Check has both identifiers
428 identifiers = {
429 id_obj["@id"]
430 for id_obj in entity[
431 "http://purl.org/spar/datacite/hasIdentifier"
432 ]
433 }
434 self.assertEqual(len(identifiers), 2)
435 self.assertIn("https://w3id.org/oc/meta/id/0601", identifiers)
436 self.assertIn("https://w3id.org/oc/meta/id/0602", identifiers)
438 # Check name
439 self.assertEqual(
440 entity["http://xmlns.com/foaf/0.1/name"][0]["@value"],
441 "J. Smith",
442 )
444 # Check merged entity no longer exists
445 if entity["@id"] == "https://w3id.org/oc/meta/ra/0602":
446 self.fail("Merged entity should not exist")
448 # Check role reassignment
449 ar_file = os.path.join(rdf_path, "rdf", "ar", "060", "10000", "1000.json")
450 with open(ar_file) as f:
451 data = json.load(f)
452 for graph in data:
453 for entity in graph.get("@graph", []):
454 agent = entity["http://purl.org/spar/pro/isHeldBy"][0]["@id"]
455 self.assertEqual(
456 agent,
457 "https://w3id.org/oc/meta/ra/0601",
458 "All roles should point to surviving entity",
459 )
461 # Check provenance
462 prov_file = os.path.join(
463 rdf_path, "rdf", "ra", "060", "10000", "1000", "prov", "se.json"
464 )
465 found_merge_prov = False
466 expected_triples = {
467 "delete": [
468 '<https://w3id.org/oc/meta/ra/0601> <http://xmlns.com/foaf/0.1/name> "John Smith"'
469 ],
470 "insert": [
471 "<https://w3id.org/oc/meta/ra/0601> <http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0602>",
472 '<https://w3id.org/oc/meta/ra/0601> <http://xmlns.com/foaf/0.1/name> "J. Smith"',
473 ],
474 }
475 with open(prov_file) as f:
476 data = json.load(f)
477 for graph in data:
478 for entity in graph.get("@graph", []):
479 if (
480 "/prov/se/" in entity["@id"]
481 and "merge"
482 in entity.get("http://purl.org/dc/terms/description", [{}])[0]
483 .get("@value", "")
484 .lower()
485 ):
486 found_merge_prov = True
488 # Check provenance fields
489 self.assertIn(
490 "http://www.w3.org/ns/prov#generatedAtTime", entity
491 )
492 self.assertIn(
493 "http://www.w3.org/ns/prov#wasAttributedTo", entity
494 )
495 self.assertIn(
496 "https://w3id.org/oc/ontology/hasUpdateQuery", entity
497 )
499 # Get actual query and normalize both expected and actual
500 actual_query = entity[
501 "https://w3id.org/oc/ontology/hasUpdateQuery"
502 ][0]["@value"]
503 self.check_sparql_query_content(actual_query, expected_triples)
505 self.assertTrue(found_merge_prov, "No merge provenance found")
507 def test_merge_with_invalid_entity_type(self):
508 """Test merging with an invalid entity type"""
509 # Create test data with invalid entity type
510 invalid_data = [
511 {
512 "surviving_entity": "https://w3id.org/oc/meta/invalid/0601",
513 "merged_entities": "https://w3id.org/oc/meta/invalid/0602",
514 "Done": "False",
515 }
516 ]
517 test_file = os.path.join(BASE, "csv", "invalid_type.csv")
518 self.write_csv("invalid_type.csv", invalid_data)
519 self.merger.process_file(test_file)
520 data = EntityMerger.read_csv(test_file)
521 self.assertEqual(data[0]["Done"], "False")
523 def test_merge_with_nonexistent_entities(self):
524 """Test merging when one or both entities don't exist"""
525 # Create test data with nonexistent entities
526 nonexistent_data = [
527 {
528 "surviving_entity": "https://w3id.org/oc/meta/ra/9999",
529 "merged_entities": "https://w3id.org/oc/meta/ra/9998",
530 "Done": "False",
531 }
532 ]
533 test_file = os.path.join(BASE, "csv", "nonexistent.csv")
534 self.write_csv("nonexistent.csv", nonexistent_data)
535 self.merger.process_file(test_file)
536 data = EntityMerger.read_csv(test_file)
537 self.assertEqual(data[0]["Done"], "True")
539 def test_merge_multiple_entities(self):
540 """Test merging multiple entities into one surviving entity"""
541 # Create additional test entities
542 g_set = GraphSet(
543 "https://w3id.org/oc/meta/",
544 supplier_prefix="060",
545 custom_counter_handler=self.counter_handler,
546 )
548 # Create additional authors
549 author3 = g_set.add_ra(
550 resp_agent="https://orcid.org/0000-0002-8420-0696",
551 res=URIRef("https://w3id.org/oc/meta/ra/0603"),
552 )
553 author3.has_name("John A. Smith")
555 author4 = g_set.add_ra(
556 resp_agent="https://orcid.org/0000-0002-8420-0696",
557 res=URIRef("https://w3id.org/oc/meta/ra/0604"),
558 )
559 author4.has_name("J A Smith")
561 # Add identifiers
562 viaf_id = g_set.add_id(
563 resp_agent="https://orcid.org/0000-0002-8420-0696",
564 res=URIRef("https://w3id.org/oc/meta/id/0603"),
565 )
566 viaf_id.create_viaf("123456789")
567 author3.has_identifier(viaf_id)
569 researcher_id = g_set.add_id(
570 resp_agent="https://orcid.org/0000-0002-8420-0696",
571 res=URIRef("https://w3id.org/oc/meta/id/0604"),
572 )
573 researcher_id.create_wikidata("Q12345")
574 author4.has_identifier(researcher_id)
576 # Create publications and roles
577 pub2 = g_set.add_br(
578 resp_agent="https://orcid.org/0000-0002-8420-0696",
579 res=URIRef("https://w3id.org/oc/meta/br/0602"),
580 )
581 pub2.has_title("Another Test Publication")
583 role3 = g_set.add_ar(
584 resp_agent="https://orcid.org/0000-0002-8420-0696",
585 res=URIRef("https://w3id.org/oc/meta/ar/0603"),
586 )
587 role3.create_author()
588 role3.is_held_by(author3)
589 pub2.has_contributor(role3)
591 role4 = g_set.add_ar(
592 resp_agent="https://orcid.org/0000-0002-8420-0696",
593 res=URIRef("https://w3id.org/oc/meta/ar/0604"),
594 )
595 role4.create_author()
596 role4.is_held_by(author4)
597 pub2.has_contributor(role4)
599 # Store and upload
600 prov = ProvSet(
601 g_set,
602 "https://w3id.org/oc/meta/",
603 wanted_label=False,
604 custom_counter_handler=self.counter_handler,
605 )
606 prov.generate_provenance()
608 rdf_output = os.path.join(OUTPUT, "rdf") + os.sep
610 res_storer = Storer(
611 abstract_set=g_set,
612 dir_split=10000,
613 n_file_item=1000,
614 output_format="json-ld",
615 zip_output=False,
616 )
617 prov_storer = Storer(
618 abstract_set=prov,
619 dir_split=10000,
620 n_file_item=1000,
621 output_format="json-ld",
622 zip_output=False,
623 )
625 res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
626 prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
627 res_storer.upload_all(
628 triplestore_url=SERVER,
629 base_dir=rdf_output,
630 batch_size=10,
631 save_queries=False,
632 )
634 # Create merge data
635 merge_data = [
636 {
637 "surviving_entity": "https://w3id.org/oc/meta/ra/0601",
638 "merged_entities": "https://w3id.org/oc/meta/ra/0602; https://w3id.org/oc/meta/ra/0603; https://w3id.org/oc/meta/ra/0604",
639 "Done": "False",
640 }
641 ]
642 test_file = os.path.join(BASE, "csv", "multiple_merge.csv")
643 self.write_csv("multiple_merge.csv", merge_data)
645 # Process the merge
646 csv_folder = os.path.join(BASE, "csv")
647 self.merger.process_folder(csv_folder)
649 # Verify the results by checking the output files
650 rdf_path = os.path.join(OUTPUT, "rdf")
652 # 1. Check researcher file for surviving entity and merged data
653 ra_file = os.path.join(rdf_path, "ra", "060", "10000", "1000.json")
654 with open(ra_file) as f:
655 data = json.load(f)
656 for graph in data:
657 for entity in graph.get("@graph", []):
658 if entity["@id"] == "https://w3id.org/oc/meta/ra/0601":
659 # Check has all identifiers
660 identifiers = {
661 id_obj["@id"]
662 for id_obj in entity[
663 "http://purl.org/spar/datacite/hasIdentifier"
664 ]
665 }
666 self.assertEqual(len(identifiers), 4)
667 expected_ids = {
668 "https://w3id.org/oc/meta/id/0601",
669 "https://w3id.org/oc/meta/id/0602",
670 "https://w3id.org/oc/meta/id/0603",
671 "https://w3id.org/oc/meta/id/0604",
672 }
673 self.assertEqual(identifiers, expected_ids)
675 # Check name (should take the last merged name)
676 self.assertEqual(
677 entity["http://xmlns.com/foaf/0.1/name"][0]["@value"],
678 "J A Smith",
679 )
681 # Check merged entities no longer exist
682 self.assertNotIn(
683 entity["@id"],
684 [
685 "https://w3id.org/oc/meta/ra/0602",
686 "https://w3id.org/oc/meta/ra/0603",
687 "https://w3id.org/oc/meta/ra/0604",
688 ],
689 )
691 # 2. Check role assignments in agent role file
692 ar_file = os.path.join(rdf_path, "ar", "060", "10000", "1000.json")
693 with open(ar_file) as f:
694 data = json.load(f)
695 for graph in data:
696 for entity in graph.get("@graph", []):
697 if "http://purl.org/spar/pro/isHeldBy" in entity:
698 agent = entity["http://purl.org/spar/pro/isHeldBy"][0]["@id"]
699 self.assertEqual(
700 agent,
701 "https://w3id.org/oc/meta/ra/0601",
702 "All roles should point to surviving entity",
703 )
705 # 3. Check provenance
706 prov_file = os.path.join(
707 rdf_path, "ra", "060", "10000", "1000", "prov", "se.json"
708 )
709 with open(prov_file) as f:
710 data = json.load(f)
712 # Get all provenance snapshots for surviving entity
713 surviving_snapshots = []
714 for graph in data:
715 if graph["@id"] == "https://w3id.org/oc/meta/ra/0601/prov/":
716 for entity in graph.get("@graph", []):
717 # Skip creation snapshot
718 if (
719 "created"
720 not in entity.get(
721 "http://purl.org/dc/terms/description", [{}]
722 )[0]
723 .get("@value", "")
724 .lower()
725 ):
726 surviving_snapshots.append(entity)
728 # Should have 2 merge snapshots (one partial, one final)
729 self.assertEqual(
730 len(surviving_snapshots), 2, "Should have exactly 2 merge snapshots"
731 )
733 # Verify partial merge (0601 with 0602)
734 partial_merge = next(
735 s
736 for s in surviving_snapshots
737 if "0602" in s["http://purl.org/dc/terms/description"][0]["@value"]
738 and "0603" not in s["http://purl.org/dc/terms/description"][0]["@value"]
739 )
741 # Check partial merge metadata
742 self.assertIn("http://www.w3.org/ns/prov#generatedAtTime", partial_merge)
743 self.assertIn("http://www.w3.org/ns/prov#wasAttributedTo", partial_merge)
744 self.assertEqual(
745 partial_merge["http://www.w3.org/ns/prov#wasAttributedTo"][0]["@id"],
746 "https://orcid.org/0000-0002-8420-0696",
747 )
749 # Check partial merge query content
750 partial_query = partial_merge[
751 "https://w3id.org/oc/ontology/hasUpdateQuery"
752 ][0]["@value"]
753 expected_partial = {
754 "delete": [
755 '<https://w3id.org/oc/meta/ra/0601> <http://xmlns.com/foaf/0.1/name> "John Smith"'
756 ],
757 "insert": [
758 "<https://w3id.org/oc/meta/ra/0601> <http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0602>",
759 '<https://w3id.org/oc/meta/ra/0601> <http://xmlns.com/foaf/0.1/name> "J. Smith"',
760 ],
761 }
762 self.check_sparql_query_content(partial_query, expected_partial)
764 # Verify final merge (0601 with 0602, 0603, 0604)
765 final_merge = next(
766 s
767 for s in surviving_snapshots
768 if "0602" in s["http://purl.org/dc/terms/description"][0]["@value"]
769 and "0603" in s["http://purl.org/dc/terms/description"][0]["@value"]
770 and "0604" in s["http://purl.org/dc/terms/description"][0]["@value"]
771 )
773 # Check final merge metadata
774 self.assertIn("http://www.w3.org/ns/prov#generatedAtTime", final_merge)
775 self.assertIn("http://www.w3.org/ns/prov#wasAttributedTo", final_merge)
776 self.assertEqual(
777 final_merge["http://www.w3.org/ns/prov#wasAttributedTo"][0]["@id"],
778 "https://orcid.org/0000-0002-8420-0696",
779 )
781 # Check final merge query content
782 final_query = final_merge["https://w3id.org/oc/ontology/hasUpdateQuery"][0][
783 "@value"
784 ]
785 expected_final = {
786 "delete": [
787 '<https://w3id.org/oc/meta/ra/0601> <http://xmlns.com/foaf/0.1/name> "John Smith"'
788 ],
789 "insert": [
790 "<https://w3id.org/oc/meta/ra/0601> <http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0603>",
791 "<https://w3id.org/oc/meta/ra/0601> <http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0604>",
792 '<https://w3id.org/oc/meta/ra/0601> <http://xmlns.com/foaf/0.1/name> "J A Smith"',
793 "<https://w3id.org/oc/meta/ra/0601> <http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0602>",
794 ],
795 }
796 self.check_sparql_query_content(final_query, expected_final)
798 # Verify deletion snapshots exist for merged entities
799 merged_ids = ["0602", "0603", "0604"]
800 for merged_id in merged_ids:
801 merged_snapshots = []
802 for graph in data:
803 if graph["@id"] == f"https://w3id.org/oc/meta/ra/{merged_id}/prov/":
804 for entity in graph.get("@graph", []):
805 if (
806 "deleted"
807 in entity.get(
808 "http://purl.org/dc/terms/description", [{}]
809 )[0]
810 .get("@value", "")
811 .lower()
812 ):
813 merged_snapshots.append(entity)
815 self.assertGreater(
816 len(merged_snapshots),
817 0,
818 f"No deletion snapshot found for ra/{merged_id}",
819 )
821 # Verify deletion queries
822 for snapshot in merged_snapshots:
823 self.assertIn(
824 "https://w3id.org/oc/ontology/hasUpdateQuery", snapshot
825 )
826 delete_query = snapshot[
827 "https://w3id.org/oc/ontology/hasUpdateQuery"
828 ][0]["@value"]
829 self.assertIn(
830 f"<https://w3id.org/oc/meta/ra/{merged_id}>", delete_query
831 )
832 self.assertIn("DELETE DATA", delete_query)
834 def test_merge_with_conflicting_data(self):
835 """Test merging entities with conflicting information"""
836 # Create test entities with conflicting data
837 g_set = GraphSet(
838 "https://w3id.org/oc/meta/",
839 supplier_prefix="060",
840 custom_counter_handler=self.counter_handler,
841 )
843 # Create conflicting authors
844 author5 = g_set.add_ra(
845 resp_agent="https://orcid.org/0000-0002-8420-0696",
846 res=URIRef("https://w3id.org/oc/meta/ra/0605"),
847 )
848 author5.has_name("John Smith")
849 author5.has_given_name("John")
850 author5.has_family_name("Smith")
852 author6 = g_set.add_ra(
853 resp_agent="https://orcid.org/0000-0002-8420-0696",
854 res=URIRef("https://w3id.org/oc/meta/ra/0606"),
855 )
856 author6.has_name("Johnny Smith")
857 author6.has_given_name("Johnny")
858 author6.has_family_name("Smith")
860 # Add same identifier to both (which should be impossible in real data)
861 same_orcid = g_set.add_id(
862 resp_agent="https://orcid.org/0000-0002-8420-0696",
863 res=URIRef("https://w3id.org/oc/meta/id/0605"),
864 )
865 same_orcid.create_orcid("0000-0001-9999-9999")
866 author5.has_identifier(same_orcid)
868 same_orcid2 = g_set.add_id(
869 resp_agent="https://orcid.org/0000-0002-8420-0696",
870 res=URIRef("https://w3id.org/oc/meta/id/0606"),
871 )
872 same_orcid2.create_orcid("0000-0001-9999-9999")
873 author6.has_identifier(same_orcid2)
875 # Store and upload
876 prov = ProvSet(
877 g_set,
878 "https://w3id.org/oc/meta/",
879 wanted_label=False,
880 custom_counter_handler=self.counter_handler,
881 )
882 prov.generate_provenance()
884 rdf_output = os.path.join(OUTPUT, "rdf") + os.sep
886 res_storer = Storer(
887 abstract_set=g_set,
888 dir_split=10000,
889 n_file_item=1000,
890 output_format="json-ld",
891 zip_output=False,
892 )
893 prov_storer = Storer(
894 abstract_set=prov,
895 dir_split=10000,
896 n_file_item=1000,
897 output_format="json-ld",
898 zip_output=False,
899 )
901 res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
902 prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
903 res_storer.upload_all(
904 triplestore_url=SERVER,
905 base_dir=rdf_output,
906 batch_size=10,
907 save_queries=False,
908 )
910 # Create merge data
911 merge_data = [
912 {
913 "surviving_entity": "https://w3id.org/oc/meta/ra/0605",
914 "merged_entities": "https://w3id.org/oc/meta/ra/0606",
915 "Done": "False",
916 }
917 ]
918 test_file = os.path.join(BASE, "csv", "conflicting_merge.csv")
919 self.write_csv("conflicting_merge.csv", merge_data)
921 # Process the merge
922 csv_folder = os.path.join(BASE, "csv")
923 self.merger.process_folder(csv_folder)
925 # Verify the results by checking the output files
926 rdf_path = os.path.join(OUTPUT, "rdf")
928 # 1. Check researcher file for surviving entity and merged data
929 ra_file = os.path.join(rdf_path, "ra", "060", "10000", "1000.json")
930 with open(ra_file) as f:
931 data = json.load(f)
932 for graph in data:
933 for entity in graph.get("@graph", []):
934 if entity["@id"] == "https://w3id.org/oc/meta/ra/0605":
935 # Check identifiers - should only keep one instance
936 identifiers = {
937 id_obj["@id"]
938 for id_obj in entity.get(
939 "http://purl.org/spar/datacite/hasIdentifier", []
940 )
941 }
942 self.assertEqual(len(identifiers), 1)
943 self.assertEqual(
944 identifiers, {"https://w3id.org/oc/meta/id/0605"}
945 )
947 # Check name was preserved
948 self.assertEqual(
949 entity["http://xmlns.com/foaf/0.1/name"][0]["@value"],
950 "Johnny Smith",
951 )
952 self.assertEqual(
953 entity["http://xmlns.com/foaf/0.1/givenName"][0]["@value"],
954 "Johnny",
955 )
956 self.assertEqual(
957 entity["http://xmlns.com/foaf/0.1/familyName"][0]["@value"],
958 "Smith",
959 )
961 # Check merged entity does not exist in output
962 self.assertNotEqual(
963 entity["@id"], "https://w3id.org/oc/meta/ra/0606"
964 )
966 # 2. Check provenance
967 prov_file = os.path.join(
968 rdf_path, "ra", "060", "10000", "1000", "prov", "se.json"
969 )
970 with open(prov_file) as f:
971 data = json.load(f)
973 # Find merge snapshot
974 merge_snapshot = None
975 for graph in data:
976 if graph["@id"] == "https://w3id.org/oc/meta/ra/0605/prov/":
977 for entity in graph.get("@graph", []):
978 if (
979 "merge"
980 in entity.get("http://purl.org/dc/terms/description", [{}])[
981 0
982 ]
983 .get("@value", "")
984 .lower()
985 ):
986 merge_snapshot = entity
987 break
989 self.assertIsNotNone(merge_snapshot, "No merge snapshot found")
991 # Verify merge metadata
992 self.assertIn("http://www.w3.org/ns/prov#generatedAtTime", merge_snapshot)
993 self.assertIn("http://www.w3.org/ns/prov#wasAttributedTo", merge_snapshot)
995 # Check the merge query - should not duplicate the conflicting ORCID
996 merge_query = merge_snapshot["https://w3id.org/oc/ontology/hasUpdateQuery"][
997 0
998 ]["@value"]
999 expected_triples = {
1000 "delete": [
1001 '<https://w3id.org/oc/meta/ra/0605> <http://xmlns.com/foaf/0.1/name> "John Smith"',
1002 '<https://w3id.org/oc/meta/ra/0605> <http://xmlns.com/foaf/0.1/givenName> "John"',
1003 ],
1004 "insert": [
1005 '<https://w3id.org/oc/meta/ra/0605> <http://xmlns.com/foaf/0.1/name> "Johnny Smith"',
1006 '<https://w3id.org/oc/meta/ra/0605> <http://xmlns.com/foaf/0.1/givenName> "Johnny"',
1007 ],
1008 }
1009 self.check_sparql_query_content(merge_query, expected_triples)
1011 # Verify deletion snapshot exists for merged entity
1012 delete_snapshot = None
1013 for graph in data:
1014 if graph["@id"] == "https://w3id.org/oc/meta/ra/0606/prov/":
1015 for entity in graph.get("@graph", []):
1016 if (
1017 "deleted"
1018 in entity.get("http://purl.org/dc/terms/description", [{}])[
1019 0
1020 ]
1021 .get("@value", "")
1022 .lower()
1023 ):
1024 delete_snapshot = entity
1025 break
1027 self.assertIsNotNone(
1028 delete_snapshot, "No deletion snapshot found for merged entity"
1029 )
1031 # Verify deletion query
1032 delete_query = delete_snapshot[
1033 "https://w3id.org/oc/ontology/hasUpdateQuery"
1034 ][0]["@value"]
1035 self.assertIn("DELETE DATA", delete_query)
1036 self.assertIn("<https://w3id.org/oc/meta/ra/0606>", delete_query)
1038 def test_merge_bibliographic_resources(self):
1039 """Test merging two bibliographic resource entities"""
1040 g_set = GraphSet(
1041 "https://w3id.org/oc/meta/",
1042 supplier_prefix="060",
1043 custom_counter_handler=self.counter_handler,
1044 )
1046 # Create first publication with some metadata
1047 pub1 = g_set.add_br(
1048 resp_agent="https://orcid.org/0000-0002-8420-0696",
1049 res=URIRef("https://w3id.org/oc/meta/br/0603"),
1050 )
1051 pub1.has_title("Data Integration Methods")
1052 pub1.has_subtitle("A Comprehensive Review")
1053 pub1.has_pub_date("2023")
1055 # Create issue for pub1
1056 issue = g_set.add_br(
1057 resp_agent="https://orcid.org/0000-0002-8420-0696",
1058 res=URIRef("https://w3id.org/oc/meta/br/0605"),
1059 )
1060 issue.create_issue()
1061 issue.has_number("4")
1062 pub1.is_part_of(issue)
1064 # Create resource embodiment for pub1
1065 re1 = g_set.add_re(
1066 resp_agent="https://orcid.org/0000-0002-8420-0696",
1067 res=URIRef("https://w3id.org/oc/meta/re/0603"),
1068 )
1069 re1.has_starting_page("1")
1070 re1.has_ending_page("20")
1071 pub1.has_format(re1)
1073 # Add DOI identifier for pub1
1074 doi_id = g_set.add_id(
1075 resp_agent="https://orcid.org/0000-0002-8420-0696",
1076 res=URIRef("https://w3id.org/oc/meta/id/0603"),
1077 )
1078 doi_id.create_doi("10.1000/example.doi.1")
1079 pub1.has_identifier(doi_id)
1081 # Create second publication with complementary metadata
1082 pub2 = g_set.add_br(
1083 resp_agent="https://orcid.org/0000-0002-8420-0696",
1084 res=URIRef("https://w3id.org/oc/meta/br/0604"),
1085 )
1086 pub2.has_title("Data Integration Methods") # Same title
1087 pub2.has_pub_date("2023") # Same year
1089 # Create volume for pub2
1090 volume = g_set.add_br(
1091 resp_agent="https://orcid.org/0000-0002-8420-0696",
1092 res=URIRef("https://w3id.org/oc/meta/br/0606"),
1093 )
1094 volume.create_volume()
1095 volume.has_number("15")
1096 pub2.is_part_of(volume)
1098 # Create resource embodiment for pub2
1099 re2 = g_set.add_re(
1100 resp_agent="https://orcid.org/0000-0002-8420-0696",
1101 res=URIRef("https://w3id.org/oc/meta/re/0604"),
1102 )
1103 re2.has_starting_page("100")
1104 re2.has_ending_page("120")
1105 pub2.has_format(re2)
1107 # Add ISBN identifier for pub2
1108 isbn_id = g_set.add_id(
1109 resp_agent="https://orcid.org/0000-0002-8420-0696",
1110 res=URIRef("https://w3id.org/oc/meta/id/0604"),
1111 )
1112 isbn_id.create_isbn("978-0-123456-47-2")
1113 pub2.has_identifier(isbn_id)
1115 # Create authors and roles
1116 author1 = g_set.add_ra(
1117 resp_agent="https://orcid.org/0000-0002-8420-0696",
1118 res=URIRef("https://w3id.org/oc/meta/ra/0605"),
1119 )
1120 author1.has_name("Jane Doe")
1122 author2 = g_set.add_ra(
1123 resp_agent="https://orcid.org/0000-0002-8420-0696",
1124 res=URIRef("https://w3id.org/oc/meta/ra/0606"),
1125 )
1126 author2.has_name("John Smith")
1128 # Add roles for pub1
1129 role1 = g_set.add_ar(
1130 resp_agent="https://orcid.org/0000-0002-8420-0696",
1131 res=URIRef("https://w3id.org/oc/meta/ar/0605"),
1132 )
1133 role1.create_author()
1134 role1.is_held_by(author1)
1135 pub1.has_contributor(role1)
1137 # Add roles for pub2
1138 role2 = g_set.add_ar(
1139 resp_agent="https://orcid.org/0000-0002-8420-0696",
1140 res=URIRef("https://w3id.org/oc/meta/ar/0606"),
1141 )
1142 role2.create_author()
1143 role2.is_held_by(author2)
1144 pub2.has_contributor(role2)
1146 # Store and upload
1147 prov = ProvSet(
1148 g_set,
1149 "https://w3id.org/oc/meta/",
1150 wanted_label=False,
1151 custom_counter_handler=self.counter_handler,
1152 )
1153 prov.generate_provenance()
1155 rdf_output = os.path.join(OUTPUT, "rdf") + os.sep
1157 res_storer = Storer(
1158 abstract_set=g_set,
1159 dir_split=10000,
1160 n_file_item=1000,
1161 output_format="json-ld",
1162 zip_output=False,
1163 )
1164 prov_storer = Storer(
1165 abstract_set=prov,
1166 dir_split=10000,
1167 n_file_item=1000,
1168 output_format="json-ld",
1169 zip_output=False,
1170 )
1172 res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
1173 prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
1174 res_storer.upload_all(
1175 triplestore_url=SERVER,
1176 base_dir=rdf_output,
1177 batch_size=10,
1178 save_queries=False,
1179 )
1181 # Create merge data
1182 merge_data = [
1183 {
1184 "surviving_entity": "https://w3id.org/oc/meta/br/0603",
1185 "merged_entities": "https://w3id.org/oc/meta/br/0604",
1186 "Done": "False",
1187 }
1188 ]
1189 test_file = os.path.join(BASE, "csv", "br_merge.csv")
1190 self.write_csv("br_merge.csv", merge_data)
1192 # Process the merge
1193 csv_folder = os.path.join(BASE, "csv")
1194 self.merger.process_folder(csv_folder)
1196 # Verify the results by checking the output files
1197 rdf_path = os.path.join(OUTPUT, "rdf")
1199 # 1. Check bibliographic resource file
1200 br_file = os.path.join(rdf_path, "br", "060", "10000", "1000.json")
1201 with open(br_file) as f:
1202 data = json.load(f)
1203 for graph in data:
1204 for entity in graph.get("@graph", []):
1205 if entity["@id"] == "https://w3id.org/oc/meta/br/0603":
1206 # Check basic metadata
1207 self.assertEqual(
1208 entity["http://purl.org/dc/terms/title"][0]["@value"],
1209 "Data Integration Methods",
1210 )
1211 self.assertEqual(
1212 entity["http://purl.org/spar/fabio/hasSubtitle"][0][
1213 "@value"
1214 ],
1215 "A Comprehensive Review",
1216 )
1217 self.assertEqual(
1218 entity[
1219 "http://prismstandard.org/namespaces/basic/2.0/publicationDate"
1220 ][0]["@value"],
1221 "2023",
1222 )
1224 # Check part relationships
1225 parts = {
1226 part["@id"]
1227 for part in entity["http://purl.org/vocab/frbr/core#partOf"]
1228 }
1229 self.assertEqual(len(parts), 1)
1230 self.assertIn(
1231 "https://w3id.org/oc/meta/br/0606", parts
1232 ) # Volume
1234 # Check formats (resource embodiments)
1235 formats = {
1236 fmt["@id"]
1237 for fmt in entity[
1238 "http://purl.org/vocab/frbr/core#embodiment"
1239 ]
1240 }
1241 self.assertEqual(len(formats), 1)
1242 self.assertIn("https://w3id.org/oc/meta/re/0603", formats)
1244 # Check identifiers
1245 identifiers = {
1246 id_obj["@id"]
1247 for id_obj in entity[
1248 "http://purl.org/spar/datacite/hasIdentifier"
1249 ]
1250 }
1251 self.assertEqual(len(identifiers), 2)
1252 self.assertIn("https://w3id.org/oc/meta/id/0603", identifiers)
1253 self.assertIn("https://w3id.org/oc/meta/id/0604", identifiers)
1255 # Check issue metadata
1256 elif entity["@id"] == "https://w3id.org/oc/meta/br/0605":
1257 self.assertIn(
1258 "http://purl.org/spar/fabio/JournalIssue", entity["@type"]
1259 )
1260 self.assertEqual(
1261 entity["http://purl.org/spar/fabio/hasSequenceIdentifier"][
1262 0
1263 ]["@value"],
1264 "4",
1265 )
1267 # Check volume metadata
1268 elif entity["@id"] == "https://w3id.org/oc/meta/br/0606":
1269 self.assertIn(
1270 "http://purl.org/spar/fabio/JournalVolume", entity["@type"]
1271 )
1272 self.assertEqual(
1273 entity["http://purl.org/spar/fabio/hasSequenceIdentifier"][
1274 0
1275 ]["@value"],
1276 "15",
1277 )
1279 # Check merged entity no longer exists
1280 self.assertNotEqual(
1281 entity["@id"], "https://w3id.org/oc/meta/br/0604"
1282 )
1284 # 2. Check resource embodiments
1285 re_file = os.path.join(rdf_path, "re", "060", "10000", "1000.json")
1286 with open(re_file) as f:
1287 data = json.load(f)
1288 res_embodiments = {}
1289 for graph in data:
1290 for entity in graph.get("@graph", []):
1291 if entity["@id"] in [
1292 "https://w3id.org/oc/meta/re/0603",
1293 "https://w3id.org/oc/meta/re/0604",
1294 ]:
1295 res_embodiments[entity["@id"]] = {
1296 "start": entity[
1297 "http://prismstandard.org/namespaces/basic/2.0/startingPage"
1298 ][0]["@value"],
1299 "end": entity[
1300 "http://prismstandard.org/namespaces/basic/2.0/endingPage"
1301 ][0]["@value"],
1302 }
1304 self.assertEqual(len(res_embodiments), 2)
1305 self.assertEqual(
1306 res_embodiments["https://w3id.org/oc/meta/re/0603"]["start"], "1"
1307 )
1308 self.assertEqual(
1309 res_embodiments["https://w3id.org/oc/meta/re/0603"]["end"], "20"
1310 )
1311 self.assertEqual(
1312 res_embodiments["https://w3id.org/oc/meta/re/0604"]["start"], "100"
1313 )
1314 self.assertEqual(
1315 res_embodiments["https://w3id.org/oc/meta/re/0604"]["end"], "120"
1316 )
1318 # 3. Check role assignments
1319 ar_file = os.path.join(rdf_path, "ar", "060", "10000", "1000.json")
1320 with open(ar_file) as f:
1321 data = json.load(f)
1322 for graph in data:
1323 for entity in graph.get("@graph", []):
1324 if entity["@id"] == "https://w3id.org/oc/meta/ar/0605":
1325 self.assertIn("http://purl.org/spar/pro/withRole", entity)
1326 self.assertEqual(
1327 entity["http://purl.org/spar/pro/withRole"][0]["@id"],
1328 "http://purl.org/spar/pro/author",
1329 )
1330 holder = entity["http://purl.org/spar/pro/isHeldBy"][0]["@id"]
1331 self.assertEqual(holder, "https://w3id.org/oc/meta/ra/0605")
1333 # 4. Check provenance
1334 prov_file = os.path.join(
1335 rdf_path, "br", "060", "10000", "1000", "prov", "se.json"
1336 )
1337 with open(prov_file) as f:
1338 data = json.load(f)
1340 # Find merge snapshot
1341 merge_snapshot = None
1342 for graph in data:
1343 if graph["@id"] == "https://w3id.org/oc/meta/br/0603/prov/":
1344 for entity in graph.get("@graph", []):
1345 if (
1346 "merge"
1347 in entity.get("http://purl.org/dc/terms/description", [{}])[
1348 0
1349 ]
1350 .get("@value", "")
1351 .lower()
1352 ):
1353 merge_snapshot = entity
1354 break
1356 self.assertIsNotNone(merge_snapshot, "No merge snapshot found")
1358 # Check merge query content
1359 merge_query = merge_snapshot["https://w3id.org/oc/ontology/hasUpdateQuery"][
1360 0
1361 ]["@value"]
1362 expected_triples = {
1363 "delete": [
1364 "<https://w3id.org/oc/meta/br/0603> <http://purl.org/vocab/frbr/core#partOf> <https://w3id.org/oc/meta/br/0605>"
1365 ],
1366 "insert": [
1367 "<https://w3id.org/oc/meta/br/0603> <http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0604>",
1368 "<https://w3id.org/oc/meta/br/0603> <http://purl.org/vocab/frbr/core#partOf> <https://w3id.org/oc/meta/br/0606>",
1369 ],
1370 }
1371 self.check_sparql_query_content(merge_query, expected_triples)
1373 # Verify deletion snapshot exists for merged entity
1374 delete_snapshot = None
1375 for graph in data:
1376 if graph["@id"] == "https://w3id.org/oc/meta/br/0604/prov/":
1377 for entity in graph.get("@graph", []):
1378 if (
1379 "deleted"
1380 in entity.get("http://purl.org/dc/terms/description", [{}])[
1381 0
1382 ]
1383 .get("@value", "")
1384 .lower()
1385 ):
1386 delete_snapshot = entity
1387 break
1389 self.assertIsNotNone(
1390 delete_snapshot, "No deletion snapshot found for merged entity"
1391 )
1393 # Verify deletion query
1394 delete_query = delete_snapshot[
1395 "https://w3id.org/oc/ontology/hasUpdateQuery"
1396 ][0]["@value"]
1397 expected_delete_triples = {
1398 "delete": [
1399 '<https://w3id.org/oc/meta/br/0604> <http://purl.org/dc/terms/title> "Data Integration Methods"',
1400 '<https://w3id.org/oc/meta/br/0604> <http://prismstandard.org/namespaces/basic/2.0/publicationDate> "2023"',
1401 "<https://w3id.org/oc/meta/br/0604> <http://purl.org/vocab/frbr/core#partOf> <https://w3id.org/oc/meta/br/0606>",
1402 "<https://w3id.org/oc/meta/br/0604> <http://purl.org/vocab/frbr/core#embodiment> <https://w3id.org/oc/meta/re/0604>",
1403 "<https://w3id.org/oc/meta/br/0604> <http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0604>",
1404 "<https://w3id.org/oc/meta/br/0604> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/spar/fabio/Expression>",
1405 "<https://w3id.org/oc/meta/br/0604> <http://purl.org/spar/pro/isDocumentContextFor> <https://w3id.org/oc/meta/ar/0606>",
1406 ]
1407 }
1408 self.check_sparql_query_content(
1409 delete_query, expected_delete_triples
1410 )
1412 # Check that all related entities have appropriate provenance
1413 for graph in data:
1414 # Check volume provenance
1415 if graph["@id"] == "https://w3id.org/oc/meta/br/0606/prov/":
1416 found_volume_creation = False
1417 for entity in graph.get("@graph", []):
1418 if (
1419 "created"
1420 in entity.get(
1421 "http://purl.org/dc/terms/description", [{}]
1422 )[0]
1423 .get("@value", "")
1424 .lower()
1425 ):
1426 found_volume_creation = True
1427 self.assertIn(
1428 "http://www.w3.org/ns/prov#generatedAtTime",
1429 entity,
1430 )
1431 self.assertIn(
1432 "http://www.w3.org/ns/prov#wasAttributedTo",
1433 entity,
1434 )
1435 self.assertTrue(
1436 found_volume_creation,
1437 "No creation provenance found for volume",
1438 )
1440 # Check resource embodiment provenance
1441 if graph["@id"] == "https://w3id.org/oc/meta/re/0604/prov/":
1442 found_re_creation = False
1443 for entity in graph.get("@graph", []):
1444 if (
1445 "created"
1446 in entity.get(
1447 "http://purl.org/dc/terms/description", [{}]
1448 )[0]
1449 .get("@value", "")
1450 .lower()
1451 ):
1452 found_re_creation = True
1453 self.assertIn(
1454 "http://www.w3.org/ns/prov#generatedAtTime",
1455 entity,
1456 )
1457 self.assertIn(
1458 "http://www.w3.org/ns/prov#wasAttributedTo",
1459 entity,
1460 )
1461 self.assertTrue(
1462 found_re_creation,
1463 "No creation provenance found for resource embodiment",
1464 )
1466 # Verify all metadata inheritance
1467 # We expect the surviving entity to inherit all identifiers
1468 # while maintaining its original metadata (title, subtitle, resource embodiment, issue, contributors)
1470 # Check if provenance shows correct sequence of operations
1471 merge_timestamps = []
1472 for graph in data:
1473 if graph["@id"] == "https://w3id.org/oc/meta/br/0603/prov/":
1474 for entity in graph.get("@graph", []):
1475 if (
1476 "merge"
1477 in entity.get(
1478 "http://purl.org/dc/terms/description", [{}]
1479 )[0]
1480 .get("@value", "")
1481 .lower()
1482 ):
1483 timestamp = entity[
1484 "http://www.w3.org/ns/prov#generatedAtTime"
1485 ][0]["@value"]
1486 merge_timestamps.append(timestamp)
1488 # Check timestamps are in correct order
1489 self.assertEqual(
1490 len(merge_timestamps),
1491 1,
1492 "Should have exactly one merge operation",
1493 )
1495 br_file = os.path.join(rdf_path, "br", "060", "10000", "1000.json")
1496 with open(br_file) as f:
1497 data = json.load(f)
1498 volume_found = False
1499 for graph in data:
1500 for entity in graph.get("@graph", []):
1501 if (
1502 entity["@id"] == "https://w3id.org/oc/meta/br/0606"
1503 ): # Volume
1504 volume_found = True
1505 self.assertIn(
1506 "http://purl.org/spar/fabio/JournalVolume",
1507 entity["@type"],
1508 )
1510 self.assertTrue(
1511 volume_found, "Volume should still exist after merge"
1512 )
1514 re_file = os.path.join(rdf_path, "re", "060", "10000", "1000.json")
1515 with open(re_file) as f:
1516 data = json.load(f)
1517 re_found = False
1518 for graph in data:
1519 for entity in graph.get("@graph", []):
1520 if (
1521 entity["@id"] == "https://w3id.org/oc/meta/re/0604"
1522 ): # RE from merged entity
1523 re_found = True
1524 self.assertEqual(
1525 entity[
1526 "http://prismstandard.org/namespaces/basic/2.0/startingPage"
1527 ][0]["@value"],
1528 "100",
1529 )
1531 self.assertTrue(
1532 re_found,
1533 "Resource embodiment should still exist after merge",
1534 )
1536 def test_fetch_related_entities_batch(self):
1537 """Test batch fetching of related entities"""
1538 meta_editor = MetaEditor(
1539 META_CONFIG, "https://orcid.org/0000-0002-8420-0696", save_queries=False
1540 )
1542 g_set = GraphSet(
1543 "https://w3id.org/oc/meta/",
1544 supplier_prefix="060",
1545 custom_counter_handler=self.counter_handler,
1546 )
1548 # Utilizziamo un insieme più piccolo di numeri validi per il test
1549 valid_numbers = [11, 12, 13, 14, 15]
1550 entities = {}
1552 # Creiamo gli autori e li memorizziamo in un dizionario per facile accesso
1553 for i in valid_numbers:
1554 ra = g_set.add_ra(
1555 resp_agent="https://orcid.org/0000-0002-8420-0696",
1556 res=URIRef(f"https://w3id.org/oc/meta/ra/060{i}"),
1557 )
1558 ra.has_name(f"Author {i}")
1559 entities[i] = ra
1561 # Creiamo le entità correlate per ogni autore
1562 for i in valid_numbers:
1563 # Creiamo l'identificatore
1564 identifier = g_set.add_id(
1565 resp_agent="https://orcid.org/0000-0002-8420-0696",
1566 res=URIRef(f"https://w3id.org/oc/meta/id/060{i}"),
1567 )
1568 identifier.create_orcid(f"0000-0001-{i:04d}-1111")
1569 entities[i].has_identifier(identifier)
1571 # Creiamo il ruolo
1572 role = g_set.add_ar(
1573 resp_agent="https://orcid.org/0000-0002-8420-0696",
1574 res=URIRef(f"https://w3id.org/oc/meta/ar/060{i}"),
1575 )
1576 role.create_author()
1577 role.is_held_by(entities[i])
1579 # Creiamo la pubblicazione
1580 pub = g_set.add_br(
1581 resp_agent="https://orcid.org/0000-0002-8420-0696",
1582 res=URIRef(f"https://w3id.org/oc/meta/br/060{i}"),
1583 )
1584 pub.has_title(f"Publication {i}")
1585 pub.has_contributor(role)
1587 prov = ProvSet(
1588 g_set,
1589 "https://w3id.org/oc/meta/",
1590 wanted_label=False,
1591 custom_counter_handler=self.counter_handler,
1592 )
1593 prov.generate_provenance()
1595 rdf_output = os.path.join(OUTPUT, "rdf") + os.sep
1597 res_storer = Storer(
1598 abstract_set=g_set,
1599 dir_split=10000,
1600 n_file_item=1000,
1601 output_format="json-ld",
1602 zip_output=False,
1603 )
1604 prov_storer = Storer(
1605 abstract_set=prov,
1606 dir_split=10000,
1607 n_file_item=1000,
1608 output_format="json-ld",
1609 zip_output=False,
1610 )
1612 res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
1613 prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
1614 res_storer.upload_all(
1615 triplestore_url=SERVER,
1616 base_dir=rdf_output,
1617 batch_size=10,
1618 save_queries=False,
1619 )
1621 batch_sizes = [1, 5, 11, 25]
1622 for batch_size in batch_sizes:
1623 with self.subTest(batch_size=batch_size):
1624 # Test con una singola entità
1625 merged_entities = [f"https://w3id.org/oc/meta/ra/060{valid_numbers[0]}"]
1626 surviving_entities = [
1627 f"https://w3id.org/oc/meta/ra/060{valid_numbers[1]}"
1628 ]
1630 related = self.merger.fetch_related_entities_batch(
1631 meta_editor=meta_editor,
1632 merged_entities=merged_entities,
1633 surviving_entities=surviving_entities,
1634 batch_size=batch_size,
1635 )
1637 expected_related = {
1638 URIRef(
1639 f"https://w3id.org/oc/meta/id/060{valid_numbers[0]}"
1640 ), # ID della merged
1641 URIRef(
1642 f"https://w3id.org/oc/meta/ar/060{valid_numbers[0]}"
1643 ), # AR della merged
1644 URIRef(
1645 f"https://w3id.org/oc/meta/id/060{valid_numbers[1]}"
1646 ), # AR della surviving
1647 }
1649 self.assertEqual(related, expected_related)
1651 # Test con multiple entità
1652 merged_entities = [
1653 f"https://w3id.org/oc/meta/ra/060{i}" for i in valid_numbers[:3]
1654 ]
1655 surviving_entities = [
1656 f"https://w3id.org/oc/meta/ra/060{valid_numbers[3]}"
1657 ]
1659 related = self.merger.fetch_related_entities_batch(
1660 meta_editor=meta_editor,
1661 merged_entities=merged_entities,
1662 surviving_entities=surviving_entities,
1663 batch_size=batch_size,
1664 )
1666 expected_related = set()
1667 for i in valid_numbers[:3]: # Entità merged
1668 expected_related.add(URIRef(f"https://w3id.org/oc/meta/id/060{i}"))
1669 expected_related.add(URIRef(f"https://w3id.org/oc/meta/ar/060{i}"))
1670 expected_related.add(
1671 URIRef(f"https://w3id.org/oc/meta/id/060{valid_numbers[3]}")
1672 )
1674 self.assertEqual(related, expected_related)
1676 def test_merge_bibliographic_resources_with_multiple_identifiers(self):
1677 """Test merging two bibliographic resources with different identifiers"""
1678 g_set = GraphSet(
1679 "https://w3id.org/oc/meta/",
1680 supplier_prefix="065",
1681 custom_counter_handler=self.counter_handler,
1682 )
1684 # Create first publication (surviving entity)
1685 pub1 = g_set.add_br(
1686 resp_agent="https://orcid.org/0000-0002-8420-0696",
1687 res=URIRef("https://w3id.org/oc/meta/br/06501844005"),
1688 )
1689 pub1.has_title("Higgsing The Stringy Higher Spin Symmetry")
1690 pub1.has_pub_date("2015-10")
1692 # Add first identifier (DOI)
1693 doi_id1 = g_set.add_id(
1694 resp_agent="https://orcid.org/0000-0002-8420-0696",
1695 res=URIRef("https://w3id.org/oc/meta/id/0680503588"),
1696 )
1697 doi_id1.create_doi("10.1007/jhep10(2015)101")
1698 pub1.has_identifier(doi_id1)
1700 # Add part of relationship
1701 journal = g_set.add_br(
1702 resp_agent="https://orcid.org/0000-0002-8420-0696",
1703 res=URIRef("https://w3id.org/oc/meta/br/06501844297"),
1704 )
1705 pub1.is_part_of(journal)
1707 # Add contributors for first publication
1708 for i in range(4):
1709 role = g_set.add_ar(
1710 resp_agent="https://orcid.org/0000-0002-8420-0696",
1711 res=URIRef(f"https://w3id.org/oc/meta/ar/0650842286{7+i}"),
1712 )
1713 role.create_author()
1714 pub1.has_contributor(role)
1716 # Create second publication (to be merged)
1717 pub2 = g_set.add_br(
1718 resp_agent="https://orcid.org/0000-0002-8420-0696",
1719 res=URIRef("https://w3id.org/oc/meta/br/06804303923"),
1720 )
1721 pub2.has_title("Higgsing The Stringy Higher Spin Symmetry")
1722 pub2.has_pub_date("2015-10-01")
1724 # Add second identifier (additional DOI)
1725 doi_id2 = g_set.add_id(
1726 resp_agent="https://orcid.org/0000-0002-8420-0696",
1727 res=URIRef("https://w3id.org/oc/meta/id/0680503589"),
1728 )
1729 doi_id2.create_doi("10.3929/ethz-b-000105964")
1730 pub2.has_identifier(doi_id2)
1732 # Add contributors for second publication
1733 for i in range(4):
1734 role = g_set.add_ar(
1735 resp_agent="https://orcid.org/0000-0002-8420-0696",
1736 res=URIRef(f"https://w3id.org/oc/meta/ar/0680174860{1+i}"),
1737 )
1738 role.create_author()
1739 pub2.has_contributor(role)
1741 # Store and upload test data
1742 prov = ProvSet(
1743 g_set,
1744 "https://w3id.org/oc/meta/",
1745 wanted_label=False,
1746 custom_counter_handler=self.counter_handler,
1747 )
1748 prov.generate_provenance()
1750 rdf_output = os.path.join(OUTPUT, "rdf") + os.sep
1752 res_storer = Storer(
1753 abstract_set=g_set,
1754 dir_split=10000,
1755 n_file_item=1000,
1756 output_format="json-ld",
1757 zip_output=False,
1758 )
1759 prov_storer = Storer(
1760 abstract_set=prov,
1761 dir_split=10000,
1762 n_file_item=1000,
1763 output_format="json-ld",
1764 zip_output=False,
1765 )
1767 res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
1768 prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
1769 res_storer.upload_all(
1770 triplestore_url=SERVER,
1771 base_dir=rdf_output,
1772 batch_size=10,
1773 save_queries=False,
1774 )
1776 # # Create merge data
1777 merge_data = [
1778 {
1779 "surviving_entity": "https://w3id.org/oc/meta/br/06501844005",
1780 "merged_entities": "https://w3id.org/oc/meta/br/06804303923",
1781 "Done": "False",
1782 }
1783 ]
1784 test_file = os.path.join(BASE, "csv", "br_dois_merge.csv")
1785 self.write_csv("br_dois_merge.csv", merge_data)
1787 # # Process the merge
1788 self.merger.process_folder(os.path.join(BASE, "csv"))
1790 # # Verify the results
1791 rdf_path = os.path.join(OUTPUT, "rdf")
1792 br_file = os.path.join(rdf_path, "br", "0650", "1850000", "1845000.json")
1794 with open(br_file) as f:
1795 data = json.load(f)
1796 surviving_entity_found = False
1798 for graph in data:
1799 for entity in graph.get("@graph", []):
1800 if entity["@id"] == "https://w3id.org/oc/meta/br/06501844005":
1801 surviving_entity_found = True
1803 # Check identifiers - should have both DOIs
1804 identifiers = {
1805 id_obj["@id"]
1806 for id_obj in entity[
1807 "http://purl.org/spar/datacite/hasIdentifier"
1808 ]
1809 }
1810 self.assertEqual(len(identifiers), 2)
1811 expected_ids = {
1812 "https://w3id.org/oc/meta/id/0680503588",
1813 "https://w3id.org/oc/meta/id/0680503589",
1814 }
1815 self.assertEqual(identifiers, expected_ids)
1817 # Check other metadata preserved
1818 self.assertEqual(
1819 entity["http://purl.org/dc/terms/title"][0]["@value"],
1820 "Higgsing The Stringy Higher Spin Symmetry",
1821 )
1822 self.assertEqual(
1823 entity[
1824 "http://prismstandard.org/namespaces/basic/2.0/publicationDate"
1825 ][0]["@value"],
1826 "2015-10-01", # Should keep original date format
1827 )
1829 # Check part of relationship preserved
1830 self.assertEqual(
1831 entity["http://purl.org/vocab/frbr/core#partOf"][0]["@id"],
1832 "https://w3id.org/oc/meta/br/06501844297",
1833 )
1835 # Verify merged entity doesn't exist
1836 self.assertNotEqual(
1837 entity["@id"], "https://w3id.org/oc/meta/br/06804303923"
1838 )
1840 self.assertTrue(
1841 surviving_entity_found, "Surviving entity not found in output"
1842 )
1844 # # Verify provenance
1845 prov_file = os.path.join(
1846 rdf_path, "br", "0650", "1850000", "1845000", "prov", "se.json"
1847 )
1848 with open(prov_file) as f:
1849 data = json.load(f)
1850 merge_snapshot_found = False
1852 for graph in data:
1853 if graph["@id"] == "https://w3id.org/oc/meta/br/06501844005/prov/":
1854 for entity in graph.get("@graph", []):
1855 if (
1856 "merge"
1857 in entity.get("http://purl.org/dc/terms/description", [{}])[
1858 0
1859 ]
1860 .get("@value", "")
1861 .lower()
1862 ):
1863 merge_snapshot_found = True
1865 # Check merge query content
1866 merge_query = entity[
1867 "https://w3id.org/oc/ontology/hasUpdateQuery"
1868 ][0]["@value"]
1869 expected_triples = {
1870 "insert": [
1871 "<https://w3id.org/oc/meta/br/06501844005> <http://purl.org/spar/datacite/hasIdentifier> <https://w3id.org/oc/meta/id/0680503589>"
1872 ]
1873 }
1874 self.check_sparql_query_content(
1875 merge_query, expected_triples
1876 )
1878 self.assertTrue(
1879 merge_snapshot_found, "No merge snapshot found in provenance"
1880 )
1883if __name__ == "__main__":
1884 unittest.main()