Coverage for ramose / skg_if / _base.py: 74%
559 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-01 13:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-01 13:49 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import csv
6import json
7from collections.abc import Callable
8from dataclasses import dataclass
9from dataclasses import field as dataclass_field
10from io import StringIO
11from math import ceil
12from re import sub
13from typing import NoReturn
14from urllib.parse import parse_qs, urlencode, urlsplit
16from ramose import HttpError
18_YEAR_MONTH_PART_COUNT = 2
19_UNPROCESSABLE_CONTENT = 422
21_PRODUCT_COLUMNS: dict[str, str] = dict.fromkeys(
22 (
23 "local_identifier",
24 "identifier_scheme",
25 "identifier_value",
26 "title",
27 "title_lang",
28 "abstract",
29 "abstract_lang",
30 "product_type",
31 "topic_term",
32 "topic_identifier_scheme",
33 "topic_identifier_value",
34 "topic_label",
35 "topic_label_lang",
36 "topic_provenance_associated_with",
37 "topic_provenance_trust",
38 "contribution_by_family_name",
39 "contribution_by_given_name",
40 "contribution_by_name",
41 "contribution_by_identifier_scheme",
42 "contribution_by_identifier_value",
43 "contribution_by_local_identifier",
44 "contribution_role",
45 "contribution_type",
46 "_contribution_key",
47 "_contribution_next_key",
48 "contribution_declared_affiliation_name",
49 "contribution_declared_affiliation_short_name",
50 "contribution_declared_affiliation_country",
51 "contribution_declared_affiliation_local_identifier",
52 "contribution_declared_affiliation_identifier_scheme",
53 "contribution_declared_affiliation_identifier_value",
54 "contribution_declared_affiliation_type",
55 "contribution_declared_affiliation_website",
56 "contribution_declared_affiliation_other_name",
57 "manifestation_type_class",
58 "manifestation_type_label",
59 "manifestation_type_label_lang",
60 "manifestation_identifier_scheme",
61 "manifestation_identifier_value",
62 "manifestation_dates_type",
63 "manifestation_dates_value",
64 "manifestation_peer_review_status",
65 "manifestation_peer_review_description",
66 "manifestation_access_rights_status",
67 "manifestation_access_rights_description",
68 "manifestation_licence",
69 "manifestation_version",
70 "manifestation_biblio_volume",
71 "manifestation_biblio_issue",
72 "manifestation_biblio_edition",
73 "manifestation_biblio_number",
74 "manifestation_biblio_pages_first",
75 "manifestation_biblio_pages_last",
76 "manifestation_biblio_in_name",
77 "manifestation_biblio_in_local_identifier",
78 "manifestation_biblio_in_identifier_scheme",
79 "manifestation_biblio_in_identifier_value",
80 "manifestation_biblio_in_acronym",
81 "manifestation_biblio_hosting_data_source_local_identifier",
82 "manifestation_biblio_hosting_data_source_name",
83 "manifestation_biblio_hosting_data_source_identifier_scheme",
84 "manifestation_biblio_hosting_data_source_identifier_value",
85 "related_products_cites",
86 "related_products_is_supplemented_by",
87 "related_products_is_documented_by",
88 "related_products_is_new_version_of",
89 "related_products_is_part_of",
90 "funding_local_identifier",
91 "funding_grant_number",
92 "funding_title",
93 "funding_title_lang",
94 "funding_abstract",
95 "funding_abstract_lang",
96 "funding_acronym",
97 "funding_identifier_scheme",
98 "funding_identifier_value",
99 "funding_stream",
100 "funding_agency_name",
101 "funding_agency_short_name",
102 "funding_agency_country",
103 "funding_agency_local_identifier",
104 "funding_agency_identifier_scheme",
105 "funding_agency_identifier_value",
106 "funding_agency_type",
107 "funding_agency_website",
108 "relevant_organisation_name",
109 "relevant_organisation_short_name",
110 "relevant_organisation_country",
111 "relevant_organisation_local_identifier",
112 "relevant_organisation_identifier_scheme",
113 "relevant_organisation_identifier_value",
114 "relevant_organisation_type",
115 "relevant_organisation_website",
116 "relevant_organisation_other_name",
117 ),
118 "",
119)
121SKGIF_CONTEXT = [
122 "https://w3id.org/skg-if/context/1.1.0/skg-if.json",
123 "https://w3id.org/skg-if/context/1.0.0/skg-if-api.json",
124 {"@base": "https://w3id.org/skg-if/sandbox/oc/"},
125]
128def _collect_identifiers(
129 rows: list[dict],
130 scheme_col: str = "identifier_scheme",
131 value_col: str = "identifier_value",
132) -> list[dict]:
133 seen = set()
134 identifiers = []
135 for row in rows:
136 scheme = row[scheme_col]
137 value = row[value_col]
138 if scheme and value and (scheme, value) not in seen:
139 seen.add((scheme, value))
140 identifiers.append({"value": value, "scheme": scheme})
141 return identifiers
144def _order_linked_list(items: dict[str, dict], next_map: dict[str, str | None]) -> list[dict]:
145 if not items:
146 return []
148 next_values = set(next_map.values()) - {None}
149 start_candidates = [key for key in items if key not in next_values]
150 if not start_candidates:
151 return list(items.values())
153 ordered = []
154 current = start_candidates[0]
155 visited = set()
156 while current and current in items and current not in visited:
157 visited.add(current)
158 ordered.append(items[current])
159 current = next_map.get(current)
161 for key, contributor in items.items():
162 if key not in visited:
163 ordered.append(contributor)
165 return ordered
168def _build_agent(row: dict) -> dict | None:
169 family_name = row["contribution_by_family_name"]
170 given_name = row["contribution_by_given_name"]
171 full_name = row["contribution_by_name"]
172 id_scheme = row["contribution_by_identifier_scheme"]
173 id_value = row["contribution_by_identifier_value"]
174 agent_local_id = row["contribution_by_local_identifier"]
175 role = row["contribution_role"]
177 is_person = bool(family_name or given_name)
179 if is_person:
180 display_name = f"{family_name}, {given_name}" if family_name and given_name else (family_name or given_name)
181 entity_type = "person"
182 elif full_name:
183 display_name = full_name
184 entity_type = "organisation" if role == "publisher" else "agent"
185 else:
186 return None
188 agent: dict = {"name": display_name, "entity_type": entity_type}
189 if is_person:
190 if family_name:
191 agent["family_name"] = family_name
192 if given_name:
193 agent["given_name"] = given_name
194 if id_scheme and id_value:
195 agent["identifiers"] = [{"value": id_value, "scheme": id_scheme}]
196 if not agent_local_id:
197 msg = f"Missing required local_identifier for {entity_type} '{display_name}'"
198 raise ValueError(msg)
199 agent["local_identifier"] = agent_local_id
200 return agent
203def _build_org(row: dict, prefix: str) -> dict:
204 org: dict = {"entity_type": "organisation"}
205 name = row[f"{prefix}_name"]
206 if name:
207 org["name"] = name
208 for field in ("short_name", "country", "website"):
209 val = row[f"{prefix}_{field}"]
210 if val:
211 org[field] = val
212 local_id = row[f"{prefix}_local_identifier"]
213 if not local_id:
214 msg = f"Missing required local_identifier for organisation '{row[f'{prefix}_name']}'"
215 raise ValueError(msg)
216 org["local_identifier"] = local_id
217 return org
220def _merge_org_multivalued(entry: dict, row: dict, prefix: str) -> None:
221 id_scheme = row[f"{prefix}_identifier_scheme"]
222 id_value = row[f"{prefix}_identifier_value"]
223 if id_scheme and id_value and (id_scheme, id_value) not in entry["seen_ids"]:
224 entry["seen_ids"].add((id_scheme, id_value))
225 entry["obj"].setdefault("identifiers", []).append({"value": id_value, "scheme": id_scheme})
226 org_type = row[f"{prefix}_type"]
227 if org_type and org_type not in entry["seen_types"]:
228 entry["seen_types"].add(org_type)
229 entry["obj"].setdefault("types", []).append(org_type)
230 if f"{prefix}_other_name" in row:
231 other_name = row[f"{prefix}_other_name"]
232 if other_name and other_name not in entry["seen_other_names"]:
233 entry["seen_other_names"].add(other_name)
234 entry["obj"].setdefault("other_names", []).append(other_name)
237def _collect_declared_affiliations(rows: list[dict], role: str, key: str, store: dict) -> None:
238 prefix = "contribution_declared_affiliation"
239 role_store = store.setdefault(role, {}).setdefault(key, {})
240 for row in rows:
241 if row["contribution_role"] != role or row["_contribution_key"] != key:
242 continue
243 aff_name = row[f"{prefix}_name"]
244 aff_local_id = row[f"{prefix}_local_identifier"]
245 if not aff_name and not aff_local_id:
246 continue
247 if aff_local_id not in role_store:
248 role_store[aff_local_id] = {
249 "obj": _build_org(row, prefix),
250 "seen_ids": set(),
251 "seen_types": set(),
252 "seen_other_names": set(),
253 }
254 _merge_org_multivalued(role_store[aff_local_id], row, prefix)
257def _enrich_contributor(
258 contributor: dict,
259 key: str,
260 role_type: str,
261 contribution_types: dict,
262 affiliations: dict,
263) -> None:
264 types = contribution_types.get(role_type, {}).get(key)
265 if types:
266 contributor["contribution_types"] = types
267 affs = affiliations.get(role_type, {}).get(key)
268 if affs:
269 contributor["declared_affiliations"] = [entry["obj"] for entry in affs.values()]
272@dataclass
273class _ContributorAccumulator:
274 by_role_type: dict[str, dict[str, dict]] = dataclass_field(default_factory=dict)
275 next_map: dict[str, dict[str, str | None]] = dataclass_field(default_factory=dict)
276 types: dict[str, dict[str, list[str]]] = dataclass_field(default_factory=dict)
277 affiliations: dict[str, dict[str, dict]] = dataclass_field(default_factory=dict)
280def _process_contributor_row(
281 row: dict,
282 rows: list[dict],
283 acc: _ContributorAccumulator,
284) -> None:
285 role = row["contribution_role"]
286 key = row["_contribution_key"]
287 if not role or not key:
288 return
290 if role not in acc.by_role_type:
291 acc.by_role_type[role] = {}
292 acc.next_map[role] = {}
293 acc.types[role] = {}
295 contribution_type = row["contribution_type"]
296 if contribution_type:
297 type_list = acc.types[role].setdefault(key, [])
298 if contribution_type not in type_list:
299 type_list.append(contribution_type)
301 if key in acc.by_role_type[role]:
302 existing = acc.by_role_type[role][key]
303 id_scheme = row["contribution_by_identifier_scheme"]
304 id_value = row["contribution_by_identifier_value"]
305 if id_scheme and id_value and not existing["by"].get("identifiers"):
306 existing["by"]["identifiers"] = [{"value": id_value, "scheme": id_scheme}]
307 return
309 _collect_declared_affiliations(rows, role, key, acc.affiliations)
310 agent = _build_agent(row)
311 if not agent:
312 return
313 acc.by_role_type[role][key] = {"role": role, "by": agent}
314 acc.next_map[role][key] = row["_contribution_next_key"] or None
317def _collect_contributors(rows: list[dict]) -> list[dict]:
318 acc = _ContributorAccumulator()
320 for row in rows:
321 _process_contributor_row(row, rows, acc)
323 result = []
324 for role_type in ["author", "editor", "publisher"]:
325 if role_type not in acc.by_role_type:
326 continue
327 ordered = _order_linked_list(acc.by_role_type[role_type], acc.next_map[role_type])
328 for rank, contributor in enumerate(ordered, start=1):
329 contributor["rank"] = rank
330 key = next(k for k, v in acc.by_role_type[role_type].items() if v is contributor)
331 _enrich_contributor(contributor, key, role_type, acc.types, acc.affiliations)
332 result.append(contributor)
334 return result
337def _build_venue(rows: list[dict], venue_name: str, venue_local_id: str) -> dict:
338 venue: dict = {"name": venue_name, "entity_type": "venue"}
339 if not venue_local_id:
340 msg = f"Missing required local_identifier for venue '{venue_name}'"
341 raise ValueError(msg)
342 venue["local_identifier"] = venue_local_id
343 acronym = rows[0]["manifestation_biblio_in_acronym"]
344 if acronym:
345 venue["acronym"] = acronym
347 venue_ids_seen = set()
348 venue_identifiers = []
349 for row in rows:
350 venue_scheme = row["manifestation_biblio_in_identifier_scheme"]
351 venue_value = row["manifestation_biblio_in_identifier_value"]
352 if venue_scheme and venue_value and (venue_scheme, venue_value) not in venue_ids_seen:
353 venue_ids_seen.add((venue_scheme, venue_value))
354 venue_identifiers.append({"value": venue_value, "scheme": venue_scheme})
355 if venue_identifiers:
356 venue["identifiers"] = venue_identifiers
357 return venue
360def _normalize_datetime(date_str: str) -> str:
361 parts = date_str.split("-")
362 if len(parts) == 1:
363 return f"{parts[0]}-01-01T00:00:00"
364 if len(parts) == _YEAR_MONTH_PART_COUNT:
365 return f"{parts[0]}-{parts[1]}-01T00:00:00"
366 if "T" not in date_str:
367 return f"{date_str}T00:00:00"
368 return date_str
371def _collect_manifestation_dates(rows: list[dict]) -> dict[str, list[str]]:
372 dates: dict[str, list[str]] = {}
373 seen: set[tuple[str, str]] = set()
374 for row in rows:
375 date_type = row["manifestation_dates_type"]
376 date_value = row["manifestation_dates_value"]
377 if date_type and date_value and (date_type, date_value) not in seen:
378 seen.add((date_type, date_value))
379 dates.setdefault(date_type, []).append(_normalize_datetime(date_value))
380 return dates
383_BIBLIO_SIMPLE_FIELDS: tuple[tuple[str, str], ...] = (
384 ("manifestation_biblio_volume", "volume"),
385 ("manifestation_biblio_issue", "issue"),
386 ("manifestation_biblio_edition", "edition"),
387 ("manifestation_biblio_number", "number"),
388)
391def _build_biblio_venue(rows: list[dict], first_row: dict) -> dict | None:
392 venue_name = first_row["manifestation_biblio_in_name"]
393 if not venue_name:
394 return None
395 venue_local_id = first_row["manifestation_biblio_in_local_identifier"]
396 return _build_venue(rows, venue_name, venue_local_id)
399def _build_biblio_hosting(rows: list[dict], first_row: dict) -> dict | None:
400 hosting_local_id = first_row["manifestation_biblio_hosting_data_source_local_identifier"]
401 if not hosting_local_id:
402 return None
403 hosting: dict = {"local_identifier": hosting_local_id, "entity_type": "datasource"}
404 hosting_name = first_row["manifestation_biblio_hosting_data_source_name"]
405 if hosting_name:
406 hosting["name"] = hosting_name
407 hosting_identifiers = _collect_identifiers(
408 rows,
409 "manifestation_biblio_hosting_data_source_identifier_scheme",
410 "manifestation_biblio_hosting_data_source_identifier_value",
411 )
412 if hosting_identifiers:
413 hosting["identifiers"] = hosting_identifiers
414 return hosting
417def _build_biblio(rows: list[dict]) -> dict:
418 first_row = rows[0]
419 biblio: dict = {}
420 for sparql_var, json_key in _BIBLIO_SIMPLE_FIELDS:
421 value = first_row[sparql_var]
422 if value:
423 biblio[json_key] = value
424 first_page = first_row["manifestation_biblio_pages_first"]
425 last_page = first_row["manifestation_biblio_pages_last"]
426 if first_page and last_page:
427 biblio["pages"] = {"first": first_page, "last": last_page}
428 venue = _build_biblio_venue(rows, first_row)
429 if venue:
430 biblio["in"] = venue
431 hosting = _build_biblio_hosting(rows, first_row)
432 if hosting:
433 biblio["hosting_data_source"] = hosting
434 return biblio
437def _build_manifestation_type(first_row: dict) -> dict | None:
438 type_class = first_row["manifestation_type_class"]
439 if not type_class:
440 return None
441 separator = "#" if "#" in type_class else "/"
442 defined_in = type_class.rsplit(separator, 1)[0]
443 manifestation_type: dict = {"class": type_class, "defined_in": defined_in}
444 type_label = first_row["manifestation_type_label"]
445 if type_label:
446 label_lang = first_row["manifestation_type_label_lang"] or "none"
447 manifestation_type["labels"] = {label_lang: type_label}
448 return manifestation_type
451def _build_status_with_description(first_row: dict, status_field: str, desc_field: str) -> dict | None:
452 status = first_row[status_field]
453 if not status:
454 return None
455 result: dict = {"status": status}
456 desc = first_row[desc_field]
457 if desc:
458 result["description"] = desc
459 return result
462def _build_manifestation(rows: list[dict]) -> dict | None:
463 first_row = rows[0]
464 manifestation: dict = {}
466 manifestation_type = _build_manifestation_type(first_row)
467 if manifestation_type:
468 manifestation["type"] = manifestation_type
470 dates = _collect_manifestation_dates(rows)
471 if dates:
472 manifestation["dates"] = dates
474 identifiers = _collect_identifiers(rows, "manifestation_identifier_scheme", "manifestation_identifier_value")
475 if identifiers:
476 manifestation["identifiers"] = identifiers
478 peer_review = _build_status_with_description(
479 first_row, "manifestation_peer_review_status", "manifestation_peer_review_description"
480 )
481 if peer_review:
482 manifestation["peer_review"] = peer_review
484 access_rights = _build_status_with_description(
485 first_row, "manifestation_access_rights_status", "manifestation_access_rights_description"
486 )
487 if access_rights:
488 manifestation["access_rights"] = access_rights
490 licence = first_row["manifestation_licence"]
491 if licence:
492 manifestation["licence"] = licence
494 version = first_row["manifestation_version"]
495 if version:
496 manifestation["version"] = version
498 biblio = _build_biblio(rows)
499 if biblio:
500 manifestation["biblio"] = biblio
502 return manifestation or None
505_RELATED_PRODUCT_COLUMNS = [
506 "related_products_cites",
507 "related_products_is_supplemented_by",
508 "related_products_is_documented_by",
509 "related_products_is_new_version_of",
510 "related_products_is_part_of",
511]
514def _collect_related_products(rows: list[dict]) -> dict:
515 result: dict[str, list[str]] = {}
516 for column in _RELATED_PRODUCT_COLUMNS:
517 key = column.replace("related_products_", "")
518 seen: set[str] = set()
519 values: list[str] = []
520 for row in rows:
521 val = row[column]
522 if val and val not in seen:
523 seen.add(val)
524 values.append(val)
525 if values:
526 result[key] = values
527 return result
530def _collect_topics(rows: list[dict]) -> list[dict]:
531 topics_by_uri: dict[str, dict] = {}
532 seen_identifiers: dict[str, set] = {}
533 seen_provenance: dict[str, set] = {}
535 for row in rows:
536 uri = row["topic_term"]
537 if not uri:
538 continue
540 if uri not in topics_by_uri:
541 topics_by_uri[uri] = {"term": {"local_identifier": uri, "entity_type": "topic"}}
542 seen_identifiers[uri] = set()
543 seen_provenance[uri] = set()
545 topic = topics_by_uri[uri]
546 term = topic["term"]
548 label = row["topic_label"]
549 if label:
550 lang = row["topic_label_lang"] or "none"
551 term.setdefault("labels", {})[lang] = label
553 id_scheme = row["topic_identifier_scheme"]
554 id_value = row["topic_identifier_value"]
555 if id_scheme and id_value and (id_scheme, id_value) not in seen_identifiers[uri]:
556 seen_identifiers[uri].add((id_scheme, id_value))
557 term.setdefault("identifiers", []).append({"scheme": id_scheme, "value": id_value})
559 prov_agent = row["topic_provenance_associated_with"]
560 prov_trust = row["topic_provenance_trust"]
561 if prov_agent and prov_trust and prov_agent not in seen_provenance[uri]:
562 seen_provenance[uri].add(prov_agent)
563 topic.setdefault("provenance", []).append({"associated_with": prov_agent, "trust": float(prov_trust)})
565 return list(topics_by_uri.values())
568def _collect_organisation(rows: list[dict], prefix: str) -> list[dict]:
569 entries: dict[str, dict] = {}
570 for row in rows:
571 name = row[f"{prefix}_name"]
572 local_id = row[f"{prefix}_local_identifier"]
573 if not name and not local_id:
574 continue
575 if local_id not in entries:
576 entries[local_id] = {
577 "obj": _build_org(row, prefix),
578 "seen_ids": set(),
579 "seen_types": set(),
580 "seen_other_names": set(),
581 }
582 _merge_org_multivalued(entries[local_id], row, prefix)
583 return [entry["obj"] for entry in entries.values()]
586def _build_grant(row: dict) -> dict:
587 funding_local_id = row["funding_local_identifier"]
588 if not funding_local_id:
589 msg = "Missing required local_identifier for grant"
590 raise ValueError(msg)
591 grant: dict = {"local_identifier": funding_local_id, "entity_type": "grant"}
592 for field, csv_col in (
593 ("grant_number", "funding_grant_number"),
594 ("acronym", "funding_acronym"),
595 ("funding_stream", "funding_stream"),
596 ):
597 val = row[csv_col]
598 if val:
599 grant[field] = val
600 title = row["funding_title"]
601 if title:
602 grant["titles"] = {row["funding_title_lang"] or "none": title}
603 abstract = row["funding_abstract"]
604 if abstract:
605 grant["abstracts"] = {row["funding_abstract_lang"] or "none": abstract}
606 agency_name = row["funding_agency_name"]
607 if agency_name:
608 grant["funding_agency"] = _build_org(row, "funding_agency")
609 return grant
612def _collect_funding(rows: list[dict]) -> list[dict]:
613 funding_by_key: dict[str, dict] = {}
614 seen_ids: dict[str, set[tuple[str, str]]] = {}
615 agency_trackers: dict[str, dict] = {}
617 for row in rows:
618 local_id = row["funding_local_identifier"]
619 if not local_id:
620 continue
621 if local_id not in funding_by_key:
622 funding_by_key[local_id] = _build_grant(row)
623 seen_ids[local_id] = set()
624 agency_trackers[local_id] = {"seen_ids": set(), "seen_types": set()}
626 id_scheme = row["funding_identifier_scheme"]
627 id_value = row["funding_identifier_value"]
628 if id_scheme and id_value and (id_scheme, id_value) not in seen_ids[local_id]:
629 seen_ids[local_id].add((id_scheme, id_value))
630 funding_by_key[local_id].setdefault("identifiers", []).append({"value": id_value, "scheme": id_scheme})
632 if "funding_agency" not in funding_by_key[local_id]:
633 continue
634 tracker = agency_trackers[local_id]
635 agency = funding_by_key[local_id]["funding_agency"]
636 a_scheme = row["funding_agency_identifier_scheme"]
637 a_value = row["funding_agency_identifier_value"]
638 if a_scheme and a_value and (a_scheme, a_value) not in tracker["seen_ids"]:
639 tracker["seen_ids"].add((a_scheme, a_value))
640 agency.setdefault("identifiers", []).append({"value": a_value, "scheme": a_scheme})
641 a_type = row["funding_agency_type"]
642 if a_type and a_type not in tracker["seen_types"]:
643 tracker["seen_types"].add(a_type)
644 agency.setdefault("types", []).append(a_type)
646 return list(funding_by_key.values())
649def normalize_local_identifier_url(local_identifier: str) -> tuple[str]:
650 # Reverse proxies (e.g. Traefik) merge duplicate slashes in request paths,
651 # turning "https://example.org/..." into "https:/example.org/...": restore the scheme separator.
652 return (sub(r"^(https?):/+", r"\1://", local_identifier),)
655def _canonical_path(path: str) -> str:
656 segments = [segment for segment in path.split("/") if segment]
657 for index, segment in enumerate(segments):
658 if segment in ENTITY_TYPES and index + 1 < len(segments):
659 identifier = normalize_local_identifier_url("/".join(segments[index + 1 :]))[0]
660 return "/" + "/".join(segments[: index + 1]) + "/" + identifier
661 return path
664def _build_search_result_page(url: str) -> dict:
665 return {"local_identifier": url, "entity_type": "search_result_page"}
668def _meta_base_url(request_url: str) -> str:
669 parsed = urlsplit(request_url)
670 path = _canonical_path(parsed.path)
671 if parsed.scheme and parsed.netloc:
672 return f"{parsed.scheme}://{parsed.netloc}{path}"
673 return path
676def _page_url(base_path: str, params: dict[str, list[str]], page: int) -> str:
677 page_params = {**params, "page": [str(page)]}
678 return f"{base_path}?{urlencode(page_params, doseq=True, safe=':,')}"
681def _raise_unprocessable(message: str) -> NoReturn:
682 raise HttpError(_UNPROCESSABLE_CONTENT, f"HTTP status code {_UNPROCESSABLE_CONTENT}: {message}")
685def _parse_positive_int_param(params: dict[str, list[str]], name: str) -> int:
686 raw_value = params[name][0]
687 try:
688 value = int(raw_value)
689 except ValueError:
690 _raise_unprocessable(f"{name} must be an integer, got {raw_value!r}")
691 if value < 1:
692 _raise_unprocessable(f"{name} must be >= 1, got {value}")
693 return value
696def _validate_page_range(page: int, total_items: int, total_pages: int) -> None:
697 if total_items and page > total_pages:
698 _raise_unprocessable(f"page {page} exceeds total pages {total_pages}")
701def _build_meta(request_url: str, graph_size: int) -> dict:
702 parsed = urlsplit(request_url)
703 base_url = _meta_base_url(request_url)
704 if _is_single_entity_request(request_url):
705 return {"local_identifier": base_url, "entity_type": "single_entity"}
706 params = parse_qs(parsed.query)
707 if "total_items" in params:
708 total_items = int(params["total_items"][0])
709 page = _parse_positive_int_param(params, "page")
710 page_size = _parse_positive_int_param(params, "page_size")
711 elif "page_size" in params:
712 total_items = graph_size
713 page = _parse_positive_int_param(params, "page") if "page" in params else 1
714 page_size = _parse_positive_int_param(params, "page_size")
715 elif "page" in params:
716 _raise_unprocessable("page requires page_size")
717 else:
718 total_items = graph_size
719 page = 1
720 page_size = max(graph_size, 1)
721 total_pages = ceil(total_items / page_size) if page_size > 0 else 0
722 non_pagination_params = {k: v for k, v in params.items() if k not in ("page", "page_size", "total_items")}
723 clean_params = {**non_pagination_params, "page": [str(page)], "page_size": [str(page_size)]}
724 self_url = f"{base_url}?{urlencode(clean_params, doseq=True, safe=':,')}"
725 meta = _build_search_result_page(self_url)
726 if page < total_pages:
727 meta["next_page"] = _build_search_result_page(_page_url(base_url, clean_params, page + 1))
728 if page > 1:
729 meta["prev_page"] = _build_search_result_page(_page_url(base_url, clean_params, page - 1))
730 base_params = {k: v for k, v in clean_params.items() if k not in ("page", "page_size")}
731 search_result_url = f"{base_url}?{urlencode(base_params, doseq=True, safe=':,')}" if base_params else base_url
732 meta["part_of"] = {
733 "local_identifier": search_result_url,
734 "entity_type": "search_result",
735 "total_items": total_items,
736 "first_page": _build_search_result_page(_page_url(base_url, clean_params, 1)),
737 "last_page": _build_search_result_page(_page_url(base_url, clean_params, max(total_pages, 1))),
738 }
739 return meta
742_BUILDER_COLUMN_PREFIXES = (
743 "identifier_",
744 "contribution_",
745 "manifestation_",
746 "related_products_",
747 "topic_",
748 "funding_",
749 "relevant_organisation_",
750)
753def _collect_passthrough_fields(first_row: dict, active_formatted: set[str]) -> dict:
754 entity: dict = {}
755 for col, val in first_row.items():
756 if col.startswith("_") or col in active_formatted:
757 continue
758 if any(col.startswith(prefix) for prefix in _BUILDER_COLUMN_PREFIXES):
759 continue
760 if val:
761 entity[col] = val
762 return entity
765def _add_formatted_text(entity: dict, first_row: dict, field: str, lang_field: str, output_key: str) -> None:
766 if first_row.get(field):
767 lang = first_row.get(lang_field) or "none"
768 entity[output_key] = {lang: [first_row[field]]}
771_SECTION_BUILDERS: tuple[tuple[str, str, Callable], ...] = (
772 ("identifier_scheme", "identifiers", _collect_identifiers),
773 ("contribution_role", "contributions", _collect_contributors),
774 ("topic_term", "topics", _collect_topics),
775 ("funding_local_identifier", "funding", _collect_funding),
776)
779def _build_entity(rows: list[dict]) -> dict:
780 first_row = rows[0]
781 columns = set(first_row)
783 active_formatted: set[str] = set()
784 if "title" in columns:
785 active_formatted.update(("title", "title_lang"))
786 if "abstract" in columns:
787 active_formatted.update(("abstract", "abstract_lang"))
789 entity = _collect_passthrough_fields(first_row, active_formatted)
790 _add_formatted_text(entity, first_row, "title", "title_lang", "titles")
791 _add_formatted_text(entity, first_row, "abstract", "abstract_lang", "abstracts")
793 for anchor, key, builder in _SECTION_BUILDERS:
794 if anchor in columns and (section := builder(rows)):
795 entity[key] = section
796 if "manifestation_type_class" in columns and (manifestation := _build_manifestation(rows)):
797 entity["manifestations"] = [manifestation]
798 if columns & set(_RELATED_PRODUCT_COLUMNS) and (related := _collect_related_products(rows)):
799 entity["related_products"] = related
800 if "relevant_organisation_name" in columns and (
801 organisations := _collect_organisation(rows, "relevant_organisation")
802 ):
803 entity["relevant_organisations"] = organisations
805 return entity
808def _build_entities(rows: list[dict]) -> list[dict]:
809 if not rows:
810 return []
811 groups: dict[str, list[dict]] = {}
812 for row in rows:
813 groups.setdefault(row["local_identifier"], []).append(row)
814 return [_build_entity(group) for group in groups.values()]
817ENTITY_TYPES = frozenset({"products", "persons", "organisations", "grants", "venues", "topics", "datasources"})
819_ENTITY_TYPE_MAP: dict[str, str] = {
820 "products": "product",
821 "persons": "person",
822 "organisations": "organisation",
823 "grants": "grant",
824 "venues": "venue",
825 "topics": "topic",
826 "datasources": "datasource",
827}
830def _extract_entity_type(request_url: str) -> str | None:
831 for segment in urlsplit(request_url).path.split("/"):
832 if segment in _ENTITY_TYPE_MAP:
833 return _ENTITY_TYPE_MAP[segment]
834 return None
837def _is_single_entity_request(request_url: str) -> bool:
838 segments = [s for s in urlsplit(request_url).path.split("/") if s]
839 for i, segment in enumerate(segments):
840 if segment in ENTITY_TYPES:
841 return i + 1 < len(segments)
842 return False
845_COLUMN_GROUP_PREFIXES = (
846 "identifier_",
847 "contribution_",
848 "_contribution_",
849 "manifestation_",
850 "related_products_",
851 "topic_",
852 "funding_",
853 "relevant_organisation_",
854)
856_CORE_COLUMNS = frozenset(col for col in _PRODUCT_COLUMNS if not any(col.startswith(p) for p in _COLUMN_GROUP_PREFIXES))
859def _fill_missing_columns(rows: list[dict[str, str]]) -> list[dict[str, str]]:
860 if not rows:
861 return rows
862 present = set(rows[0])
863 active_prefixes = {prefix for prefix in _COLUMN_GROUP_PREFIXES if any(col.startswith(prefix) for col in present)}
864 missing = {
865 col: ""
866 for col in _PRODUCT_COLUMNS
867 if col not in present
868 and ((active_prefixes and col in _CORE_COLUMNS) or any(col.startswith(p) for p in active_prefixes))
869 }
870 if not missing:
871 return rows
872 return [missing | row for row in rows]
875def to_skg_if(csv_str: str, request_url: str = "") -> str:
876 rows = _fill_missing_columns(list(csv.DictReader(StringIO(csv_str))))
877 if not rows and _is_single_entity_request(request_url):
878 msg = "HTTP status code 404: entity not found"
879 raise HttpError(404, msg)
880 graph = _build_entities(rows)
882 total_entities = len(graph)
884 parsed = urlsplit(request_url)
885 params = parse_qs(parsed.query)
886 if "page_size" in params and "total_items" not in params:
887 page_size = _parse_positive_int_param(params, "page_size")
888 page = _parse_positive_int_param(params, "page") if "page" in params else 1
889 total_pages = ceil(total_entities / page_size) if page_size > 0 else 0
890 _validate_page_range(page, total_entities, total_pages)
891 start = (page - 1) * page_size
892 graph = graph[start : start + page_size]
893 elif "page" in params and "total_items" not in params:
894 _raise_unprocessable("page requires page_size")
896 entity_type = _extract_entity_type(request_url)
897 if entity_type:
898 for entity in graph:
899 entity["entity_type"] = entity_type
901 result = {
902 "@context": SKGIF_CONTEXT,
903 "meta": _build_meta(request_url, total_entities),
904 "@graph": graph,
905 }
906 return json.dumps(result, ensure_ascii=False, indent=4)