Coverage for oc_meta/plugins/csv_generator_lite/csv_generator_lite.py: 36%
356 statements
« prev ^ index » next coverage.py v6.5.0, created at 2026-01-15 10:29 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2026-01-15 10:29 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2024 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17from __future__ import annotations
19import csv
20import json
21import multiprocessing
22import os
23import re
24from typing import Dict, List, Optional
25from zipfile import ZipFile
27import redis
28from pebble import ProcessPool
29from tqdm import tqdm
31csv.field_size_limit(2**31 - 1)
33FIELDNAMES = [
34 "id",
35 "title",
36 "author",
37 "issue",
38 "volume",
39 "venue",
40 "page",
41 "pub_date",
42 "type",
43 "publisher",
44 "editor",
45]
47URI_TYPE_DICT = {
48 "http://purl.org/spar/doco/Abstract": "abstract",
49 "http://purl.org/spar/fabio/ArchivalDocument": "archival document",
50 "http://purl.org/spar/fabio/AudioDocument": "audio document",
51 "http://purl.org/spar/fabio/Book": "book",
52 "http://purl.org/spar/fabio/BookChapter": "book chapter",
53 "http://purl.org/spar/fabio/ExpressionCollection": "book section",
54 "http://purl.org/spar/fabio/BookSeries": "book series",
55 "http://purl.org/spar/fabio/BookSet": "book set",
56 "http://purl.org/spar/fabio/ComputerProgram": "computer program",
57 "http://purl.org/spar/doco/Part": "book part",
58 "http://purl.org/spar/fabio/Expression": "",
59 "http://purl.org/spar/fabio/DataFile": "dataset",
60 "http://purl.org/spar/fabio/DataManagementPlan": "data management plan",
61 "http://purl.org/spar/fabio/Thesis": "dissertation",
62 "http://purl.org/spar/fabio/Editorial": "editorial",
63 "http://purl.org/spar/fabio/Journal": "journal",
64 "http://purl.org/spar/fabio/JournalArticle": "journal article",
65 "http://purl.org/spar/fabio/JournalEditorial": "journal editorial",
66 "http://purl.org/spar/fabio/JournalIssue": "journal issue",
67 "http://purl.org/spar/fabio/JournalVolume": "journal volume",
68 "http://purl.org/spar/fabio/Newspaper": "newspaper",
69 "http://purl.org/spar/fabio/NewspaperArticle": "newspaper article",
70 "http://purl.org/spar/fabio/NewspaperIssue": "newspaper issue",
71 "http://purl.org/spar/fr/ReviewVersion": "peer review",
72 "http://purl.org/spar/fabio/AcademicProceedings": "proceedings",
73 "http://purl.org/spar/fabio/Preprint": "preprint",
74 "http://purl.org/spar/fabio/Presentation": "presentation",
75 "http://purl.org/spar/fabio/ProceedingsPaper": "proceedings article",
76 "http://purl.org/spar/fabio/ReferenceBook": "reference book",
77 "http://purl.org/spar/fabio/ReferenceEntry": "reference entry",
78 "http://purl.org/spar/fabio/ReportDocument": "report",
79 "http://purl.org/spar/fabio/RetractionNotice": "retraction notice",
80 "http://purl.org/spar/fabio/Series": "series",
81 "http://purl.org/spar/fabio/SpecificationDocument": "standard",
82 "http://purl.org/spar/fabio/WebContent": "web content",
83}
85def init_redis_connection(
86 host: str = "localhost", port: int = 6379, db: int = 2
87) -> redis.Redis:
88 client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
89 client.ping()
90 return client
93def load_processed_omids_to_redis(output_dir: str, redis_client: redis.Redis) -> int:
94 if not os.path.exists(output_dir):
95 return 0
97 redis_client.delete("processed_omids")
99 count = 0
100 BATCH_SIZE = 1000
101 csv_files = [f for f in os.listdir(output_dir) if f.endswith(".csv")]
103 for filename in tqdm(csv_files, desc="Loading existing identifiers"):
104 filepath = os.path.join(output_dir, filename)
105 with open(filepath, "r", encoding="utf-8") as f:
106 reader = csv.DictReader(f)
107 batch_pipe = redis_client.pipeline()
108 batch_count = 0
110 for row in reader:
111 omids = [
112 id_part.strip()
113 for id_part in row["id"].split()
114 if id_part.startswith("omid:br/")
115 ]
116 for omid in omids:
117 batch_pipe.sadd("processed_omids", omid)
118 batch_count += 1
119 count += 1
121 if batch_count >= BATCH_SIZE:
122 batch_pipe.execute()
123 batch_pipe = redis_client.pipeline()
124 batch_count = 0
126 if batch_count > 0:
127 batch_pipe.execute()
129 return count
132def is_omid_processed(omid: str, redis_client: redis.Redis) -> bool:
133 return redis_client.sismember("processed_omids", omid)
136def find_file(
137 rdf_dir: str, dir_split_number: int, items_per_file: int, uri: str
138) -> Optional[str]:
139 entity_regex: str = (
140 r"^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$"
141 )
142 entity_match = re.match(entity_regex, uri)
143 if entity_match:
144 cur_number = int(entity_match.group(4))
145 cur_file_split = (
146 (cur_number - 1) // items_per_file
147 ) * items_per_file + items_per_file
148 cur_split = (
149 (cur_number - 1) // dir_split_number
150 ) * dir_split_number + dir_split_number
152 short_name = entity_match.group(2)
153 sub_folder = entity_match.group(3) or ""
154 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split))
155 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + ".zip"
157 return cur_file_path if os.path.exists(cur_file_path) else None
158 return None
161def load_json_from_file(filepath: str) -> dict:
162 with ZipFile(filepath, "r") as zip_file:
163 json_filename = zip_file.namelist()[0]
164 with zip_file.open(json_filename) as json_file:
165 json_content = json_file.read().decode("utf-8")
166 return json.loads(json_content)
169def process_identifier(id_data: dict) -> Optional[str]:
170 try:
171 id_schema = id_data["http://purl.org/spar/datacite/usesIdentifierScheme"][0][
172 "@id"
173 ].split("/datacite/")[1]
174 literal_value = id_data[
175 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue"
176 ][0]["@value"]
177 return f"{id_schema}:{literal_value}"
178 except (KeyError, IndexError):
179 return None
182def process_responsible_agent(
183 ra_data: dict, ra_uri: str, rdf_dir: str, dir_split_number: int, items_per_file: int
184) -> Optional[str]:
185 try:
186 family_name = ra_data.get("http://xmlns.com/foaf/0.1/familyName", [{}])[0].get(
187 "@value", ""
188 )
189 given_name = ra_data.get("http://xmlns.com/foaf/0.1/givenName", [{}])[0].get(
190 "@value", ""
191 )
192 foaf_name = ra_data.get("http://xmlns.com/foaf/0.1/name", [{}])[0].get(
193 "@value", ""
194 )
196 if family_name or given_name:
197 if family_name and given_name:
198 name = f"{family_name}, {given_name}"
199 elif family_name:
200 name = f"{family_name},"
201 else:
202 name = f", {given_name}"
203 elif foaf_name:
204 name = foaf_name
205 else:
206 return None
208 omid = ra_uri.split("/")[-1]
209 identifiers = [f"omid:ra/{omid}"]
211 if "http://purl.org/spar/datacite/hasIdentifier" in ra_data:
212 for identifier in ra_data["http://purl.org/spar/datacite/hasIdentifier"]:
213 id_uri = identifier["@id"]
214 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri)
215 if id_file:
216 id_data = load_json_from_file(id_file)
217 for graph in id_data:
218 for entity in graph.get("@graph", []):
219 if entity["@id"] == id_uri:
220 id_value = process_identifier(entity)
221 if id_value:
222 identifiers.append(id_value)
224 if identifiers:
225 return f"{name} [{' '.join(identifiers)}]"
226 return name
227 except (KeyError, IndexError):
228 return None
231def process_venue_title(
232 venue_data: dict,
233 venue_uri: str,
234 rdf_dir: str,
235 dir_split_number: int,
236 items_per_file: int,
237) -> str:
238 venue_title = venue_data.get("http://purl.org/dc/terms/title", [{}])[0].get(
239 "@value", ""
240 )
241 if not venue_title:
242 return ""
244 omid = venue_uri.split("/")[-1]
245 identifiers = [f"omid:br/{omid}"]
247 if "http://purl.org/spar/datacite/hasIdentifier" in venue_data:
248 for identifier in venue_data["http://purl.org/spar/datacite/hasIdentifier"]:
249 id_uri = identifier["@id"]
250 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri)
251 if id_file:
252 id_data = load_json_from_file(id_file)
253 for graph in id_data:
254 for entity in graph.get("@graph", []):
255 if entity["@id"] == id_uri:
256 id_value = process_identifier(entity)
257 if id_value:
258 identifiers.append(id_value)
260 return f"{venue_title} [{' '.join(identifiers)}]" if identifiers else venue_title
263def process_hierarchical_venue(
264 entity: dict, rdf_dir: str, dir_split_number: int, items_per_file: int
265) -> Dict[str, str]:
266 result = {"volume": "", "issue": "", "venue": ""}
267 entity_types = entity.get("@type", [])
269 if "http://purl.org/spar/fabio/JournalIssue" in entity_types:
270 result["issue"] = entity.get(
271 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}]
272 )[0].get("@value", "")
273 elif "http://purl.org/spar/fabio/JournalVolume" in entity_types:
274 result["volume"] = entity.get(
275 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}]
276 )[0].get("@value", "")
277 else:
278 result["venue"] = process_venue_title(
279 entity, entity["@id"], rdf_dir, dir_split_number, items_per_file
280 )
281 return result
283 if "http://purl.org/vocab/frbr/core#partOf" in entity:
284 parent_uri = entity["http://purl.org/vocab/frbr/core#partOf"][0]["@id"]
285 parent_file = find_file(rdf_dir, dir_split_number, items_per_file, parent_uri)
286 if parent_file:
287 parent_data = load_json_from_file(parent_file)
288 for graph in parent_data:
289 for parent_entity in graph.get("@graph", []):
290 if parent_entity["@id"] == parent_uri:
291 parent_info = process_hierarchical_venue(
292 parent_entity, rdf_dir, dir_split_number, items_per_file
293 )
294 for key, value in parent_info.items():
295 if not result[key]:
296 result[key] = value
298 return result
301def find_first_ar_by_role(
302 agent_roles: Dict, next_relations: Dict, role_type: str
303) -> Optional[str]:
304 role_ars = {
305 ar_uri: ar_data
306 for ar_uri, ar_data in agent_roles.items()
307 if role_type
308 in ar_data.get("http://purl.org/spar/pro/withRole", [{}])[0].get("@id", "")
309 }
311 role_next_relations = {
312 ar_uri: next_ar
313 for ar_uri, next_ar in next_relations.items()
314 if ar_uri in role_ars and next_ar in role_ars
315 }
317 referenced_ars = set(role_next_relations.values())
318 for ar_uri in role_ars:
319 if ar_uri not in referenced_ars:
320 return ar_uri
322 return next(iter(role_ars)) if role_ars else None
325def process_bibliographic_resource(
326 br_data: dict, rdf_dir: str, dir_split_number: int, items_per_file: int
327) -> Optional[Dict[str, str]]:
328 br_types = br_data.get("@type", [])
329 if (
330 "http://purl.org/spar/fabio/JournalVolume" in br_types
331 or "http://purl.org/spar/fabio/JournalIssue" in br_types
332 ):
333 return None
335 output = {field: "" for field in FIELDNAMES}
337 try:
338 entity_id = br_data.get("@id", "")
339 identifiers = [f'omid:br/{entity_id.split("/")[-1]}'] if entity_id else []
341 output["title"] = br_data.get("http://purl.org/dc/terms/title", [{}])[0].get(
342 "@value", ""
343 )
344 output["pub_date"] = br_data.get(
345 "http://prismstandard.org/namespaces/basic/2.0/publicationDate", [{}]
346 )[0].get("@value", "")
348 br_types = [
349 t
350 for t in br_data.get("@type", [])
351 if t != "http://purl.org/spar/fabio/Expression"
352 ]
353 output["type"] = URI_TYPE_DICT.get(br_types[0], "") if br_types else ""
355 if "http://purl.org/spar/datacite/hasIdentifier" in br_data:
356 for identifier in br_data["http://purl.org/spar/datacite/hasIdentifier"]:
357 id_uri = identifier["@id"]
358 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri)
359 if id_file:
360 id_data = load_json_from_file(id_file)
361 for graph in id_data:
362 for entity in graph.get("@graph", []):
363 if entity["@id"] == id_uri:
364 id_value = process_identifier(entity)
365 if id_value:
366 identifiers.append(id_value)
367 output["id"] = " ".join(identifiers)
369 authors = []
370 editors = []
371 publishers = []
372 agent_roles = {}
373 next_relations = {}
375 if "http://purl.org/spar/pro/isDocumentContextFor" in br_data:
376 for ar_data in br_data["http://purl.org/spar/pro/isDocumentContextFor"]:
377 ar_uri = ar_data["@id"]
378 ar_file = find_file(rdf_dir, dir_split_number, items_per_file, ar_uri)
379 if ar_file:
380 ar_data = load_json_from_file(ar_file)
381 for graph in ar_data:
382 for entity in graph.get("@graph", []):
383 if entity["@id"] == ar_uri:
384 agent_roles[ar_uri] = entity
385 if "https://w3id.org/oc/ontology/hasNext" in entity:
386 next_ar = entity[
387 "https://w3id.org/oc/ontology/hasNext"
388 ][0]["@id"]
389 next_relations[ar_uri] = next_ar
391 for role_type, role_list in [
392 ("author", authors),
393 ("editor", editors),
394 ("publisher", publishers),
395 ]:
396 first_ar = find_first_ar_by_role(agent_roles, next_relations, role_type)
397 if not first_ar:
398 continue
400 current_ar = first_ar
401 processed_ars = set()
402 max_iterations = len(agent_roles)
403 iterations = 0
405 while current_ar and current_ar in agent_roles:
406 if current_ar in processed_ars or iterations >= max_iterations:
407 print(
408 f"Warning: Detected cycle in hasNext relations or exceeded maximum iterations at AR: {current_ar}"
409 )
410 break
412 processed_ars.add(current_ar)
413 iterations += 1
415 entity = agent_roles[current_ar]
416 role = entity.get("http://purl.org/spar/pro/withRole", [{}])[0].get(
417 "@id", ""
418 )
420 if role_type in role:
421 if "http://purl.org/spar/pro/isHeldBy" in entity:
422 ra_uri = entity["http://purl.org/spar/pro/isHeldBy"][0][
423 "@id"
424 ]
425 ra_file = find_file(
426 rdf_dir, dir_split_number, items_per_file, ra_uri
427 )
428 if ra_file:
429 ra_data = load_json_from_file(ra_file)
430 for ra_graph in ra_data:
431 for ra_entity in ra_graph.get("@graph", []):
432 if ra_entity["@id"] == ra_uri:
433 agent_name = process_responsible_agent(
434 ra_entity,
435 ra_uri,
436 rdf_dir,
437 dir_split_number,
438 items_per_file,
439 )
440 if agent_name:
441 role_list.append(agent_name)
443 current_ar = next_relations.get(current_ar)
445 output["author"] = "; ".join(authors)
446 output["editor"] = "; ".join(editors)
447 output["publisher"] = "; ".join(publishers)
449 if "http://purl.org/vocab/frbr/core#partOf" in br_data:
450 venue_uri = br_data["http://purl.org/vocab/frbr/core#partOf"][0]["@id"]
451 venue_file = find_file(rdf_dir, dir_split_number, items_per_file, venue_uri)
452 if venue_file:
453 venue_data = load_json_from_file(venue_file)
454 for graph in venue_data:
455 for entity in graph.get("@graph", []):
456 if entity["@id"] == venue_uri:
457 venue_info = process_hierarchical_venue(
458 entity, rdf_dir, dir_split_number, items_per_file
459 )
460 output.update(venue_info)
462 if "http://purl.org/vocab/frbr/core#embodiment" in br_data:
463 page_uri = br_data["http://purl.org/vocab/frbr/core#embodiment"][0]["@id"]
464 page_file = find_file(rdf_dir, dir_split_number, items_per_file, page_uri)
465 if page_file:
466 page_data = load_json_from_file(page_file)
467 for graph in page_data:
468 for entity in graph.get("@graph", []):
469 if entity["@id"] == page_uri:
470 start_page = entity.get(
471 "http://prismstandard.org/namespaces/basic/2.0/startingPage",
472 [{}],
473 )[0].get("@value", "")
474 end_page = entity.get(
475 "http://prismstandard.org/namespaces/basic/2.0/endingPage",
476 [{}],
477 )[0].get("@value", "")
478 if start_page or end_page:
479 output["page"] = f"{start_page}-{end_page}"
481 except (KeyError, IndexError) as e:
482 print(f"Error processing bibliographic resource: {e}")
484 return output
487def process_single_file(args):
488 filepath, input_dir, dir_split_number, items_per_file, redis_params = args
489 results = []
491 redis_client = redis.Redis(
492 host=redis_params["host"],
493 port=redis_params["port"],
494 db=redis_params["db"],
495 decode_responses=True,
496 )
498 data = load_json_from_file(filepath)
499 for graph in data:
500 for entity in graph.get("@graph", []):
501 entity_types = entity.get("@type", [])
502 if (
503 "http://purl.org/spar/fabio/JournalVolume" in entity_types
504 or "http://purl.org/spar/fabio/JournalIssue" in entity_types
505 ):
506 continue
508 entity_id = entity.get("@id", "")
509 if entity_id:
510 omid = f"omid:br/{entity_id.split('/')[-1]}"
511 if is_omid_processed(omid, redis_client):
512 continue
514 br_data = process_bibliographic_resource(
515 entity, input_dir, dir_split_number, items_per_file
516 )
517 if br_data:
518 results.append(br_data)
520 return results
523class ResultBuffer:
524 def __init__(self, output_dir: str, max_rows: int = 3000):
525 self.buffer = []
526 self.output_dir = output_dir
527 self.max_rows = max_rows
528 self.file_counter = self._get_last_file_number() + 1
529 self.pbar = None
531 def _get_last_file_number(self) -> int:
532 if not os.path.exists(self.output_dir):
533 return -1
535 max_number = -1
536 for filename in os.listdir(self.output_dir):
537 if filename.startswith("output_") and filename.endswith(".csv"):
538 try:
539 number = int(filename[7:-4])
540 max_number = max(max_number, number)
541 except ValueError:
542 continue
543 return max_number
545 def set_progress_bar(self, total: int) -> None:
546 self.pbar = tqdm(total=total, desc="Processing files")
548 def update_progress(self) -> None:
549 if self.pbar:
550 self.pbar.update(1)
552 def close_progress_bar(self) -> None:
553 if self.pbar:
554 self.pbar.close()
556 def add_results(self, results: List[Dict[str, str]]) -> None:
557 self.buffer.extend(results)
558 while len(self.buffer) >= self.max_rows:
559 self._write_buffer_chunk()
561 def _write_buffer_chunk(self) -> None:
562 chunk = self.buffer[: self.max_rows]
563 output_file = os.path.join(self.output_dir, f"output_{self.file_counter}.csv")
564 write_csv(output_file, chunk)
565 self.buffer = self.buffer[self.max_rows :]
566 self.file_counter += 1
568 def flush(self) -> None:
569 if self.buffer:
570 output_file = os.path.join(
571 self.output_dir, f"output_{self.file_counter}.csv"
572 )
573 write_csv(output_file, self.buffer)
574 self.buffer = []
575 self.file_counter += 1
578def generate_csv(
579 input_dir: str,
580 output_dir: str,
581 dir_split_number: int,
582 items_per_file: int,
583 redis_host: str = "localhost",
584 redis_port: int = 6379,
585 redis_db: int = 2,
586) -> None:
587 if not os.path.exists(output_dir):
588 os.makedirs(output_dir)
590 redis_client = init_redis_connection(redis_host, redis_port, redis_db)
591 load_processed_omids_to_redis(output_dir, redis_client)
593 br_dir = os.path.join(input_dir, "br")
594 if not os.path.exists(br_dir):
595 print(f"Error: bibliographic resources directory not found at {br_dir}")
596 return
598 all_files = []
599 for root, _, files in os.walk(br_dir):
600 if "prov" in root:
601 continue
602 all_files.extend(os.path.join(root, f) for f in files if f.endswith(".zip"))
604 if not all_files:
605 print("No files found to process")
606 return
608 print(f"Processing {len(all_files)} files...")
610 redis_params = {"host": redis_host, "port": redis_port, "db": redis_db}
611 result_buffer = ResultBuffer(output_dir)
612 result_buffer.set_progress_bar(len(all_files))
614 with ProcessPool(max_workers=os.cpu_count(), max_tasks=1, context=multiprocessing.get_context('spawn')) as executor:
615 for filepath in all_files:
616 future = executor.schedule(
617 function=process_single_file,
618 args=(
619 (
620 filepath,
621 input_dir,
622 dir_split_number,
623 items_per_file,
624 redis_params,
625 ),
626 ),
627 )
628 try:
629 results = future.result()
630 if results:
631 result_buffer.add_results(results)
632 except Exception as e:
633 print(f"Error processing file {filepath}: {e}")
634 result_buffer.update_progress()
636 result_buffer.flush()
637 result_buffer.close_progress_bar()
638 redis_client.delete("processed_omids")
639 print("Processing complete. Redis cache cleared.")
642def write_csv(filepath: str, data: List[Dict[str, str]]) -> None:
643 with open(filepath, "w", newline="", encoding="utf-8") as f:
644 writer = csv.DictWriter(f, fieldnames=FIELDNAMES)
645 writer.writeheader()
646 writer.writerows(data)