Coverage for ramose / skg_if / _base.py: 74%

559 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-01 13:49 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import csv 

6import json 

7from collections.abc import Callable 

8from dataclasses import dataclass 

9from dataclasses import field as dataclass_field 

10from io import StringIO 

11from math import ceil 

12from re import sub 

13from typing import NoReturn 

14from urllib.parse import parse_qs, urlencode, urlsplit 

15 

16from ramose import HttpError 

17 

18_YEAR_MONTH_PART_COUNT = 2 

19_UNPROCESSABLE_CONTENT = 422 

20 

21_PRODUCT_COLUMNS: dict[str, str] = dict.fromkeys( 

22 ( 

23 "local_identifier", 

24 "identifier_scheme", 

25 "identifier_value", 

26 "title", 

27 "title_lang", 

28 "abstract", 

29 "abstract_lang", 

30 "product_type", 

31 "topic_term", 

32 "topic_identifier_scheme", 

33 "topic_identifier_value", 

34 "topic_label", 

35 "topic_label_lang", 

36 "topic_provenance_associated_with", 

37 "topic_provenance_trust", 

38 "contribution_by_family_name", 

39 "contribution_by_given_name", 

40 "contribution_by_name", 

41 "contribution_by_identifier_scheme", 

42 "contribution_by_identifier_value", 

43 "contribution_by_local_identifier", 

44 "contribution_role", 

45 "contribution_type", 

46 "_contribution_key", 

47 "_contribution_next_key", 

48 "contribution_declared_affiliation_name", 

49 "contribution_declared_affiliation_short_name", 

50 "contribution_declared_affiliation_country", 

51 "contribution_declared_affiliation_local_identifier", 

52 "contribution_declared_affiliation_identifier_scheme", 

53 "contribution_declared_affiliation_identifier_value", 

54 "contribution_declared_affiliation_type", 

55 "contribution_declared_affiliation_website", 

56 "contribution_declared_affiliation_other_name", 

57 "manifestation_type_class", 

58 "manifestation_type_label", 

59 "manifestation_type_label_lang", 

60 "manifestation_identifier_scheme", 

61 "manifestation_identifier_value", 

62 "manifestation_dates_type", 

63 "manifestation_dates_value", 

64 "manifestation_peer_review_status", 

65 "manifestation_peer_review_description", 

66 "manifestation_access_rights_status", 

67 "manifestation_access_rights_description", 

68 "manifestation_licence", 

69 "manifestation_version", 

70 "manifestation_biblio_volume", 

71 "manifestation_biblio_issue", 

72 "manifestation_biblio_edition", 

73 "manifestation_biblio_number", 

74 "manifestation_biblio_pages_first", 

75 "manifestation_biblio_pages_last", 

76 "manifestation_biblio_in_name", 

77 "manifestation_biblio_in_local_identifier", 

78 "manifestation_biblio_in_identifier_scheme", 

79 "manifestation_biblio_in_identifier_value", 

80 "manifestation_biblio_in_acronym", 

81 "manifestation_biblio_hosting_data_source_local_identifier", 

82 "manifestation_biblio_hosting_data_source_name", 

83 "manifestation_biblio_hosting_data_source_identifier_scheme", 

84 "manifestation_biblio_hosting_data_source_identifier_value", 

85 "related_products_cites", 

86 "related_products_is_supplemented_by", 

87 "related_products_is_documented_by", 

88 "related_products_is_new_version_of", 

89 "related_products_is_part_of", 

90 "funding_local_identifier", 

91 "funding_grant_number", 

92 "funding_title", 

93 "funding_title_lang", 

94 "funding_abstract", 

95 "funding_abstract_lang", 

96 "funding_acronym", 

97 "funding_identifier_scheme", 

98 "funding_identifier_value", 

99 "funding_stream", 

100 "funding_agency_name", 

101 "funding_agency_short_name", 

102 "funding_agency_country", 

103 "funding_agency_local_identifier", 

104 "funding_agency_identifier_scheme", 

105 "funding_agency_identifier_value", 

106 "funding_agency_type", 

107 "funding_agency_website", 

108 "relevant_organisation_name", 

109 "relevant_organisation_short_name", 

110 "relevant_organisation_country", 

111 "relevant_organisation_local_identifier", 

112 "relevant_organisation_identifier_scheme", 

113 "relevant_organisation_identifier_value", 

114 "relevant_organisation_type", 

115 "relevant_organisation_website", 

116 "relevant_organisation_other_name", 

117 ), 

118 "", 

119) 

120 

121SKGIF_CONTEXT = [ 

122 "https://w3id.org/skg-if/context/1.1.0/skg-if.json", 

123 "https://w3id.org/skg-if/context/1.0.0/skg-if-api.json", 

124 {"@base": "https://w3id.org/skg-if/sandbox/oc/"}, 

125] 

126 

127 

128def _collect_identifiers( 

129 rows: list[dict], 

130 scheme_col: str = "identifier_scheme", 

131 value_col: str = "identifier_value", 

132) -> list[dict]: 

133 seen = set() 

134 identifiers = [] 

135 for row in rows: 

136 scheme = row[scheme_col] 

137 value = row[value_col] 

138 if scheme and value and (scheme, value) not in seen: 

139 seen.add((scheme, value)) 

140 identifiers.append({"value": value, "scheme": scheme}) 

141 return identifiers 

142 

143 

144def _order_linked_list(items: dict[str, dict], next_map: dict[str, str | None]) -> list[dict]: 

145 if not items: 

146 return [] 

147 

148 next_values = set(next_map.values()) - {None} 

149 start_candidates = [key for key in items if key not in next_values] 

150 if not start_candidates: 

151 return list(items.values()) 

152 

153 ordered = [] 

154 current = start_candidates[0] 

155 visited = set() 

156 while current and current in items and current not in visited: 

157 visited.add(current) 

158 ordered.append(items[current]) 

159 current = next_map.get(current) 

160 

161 for key, contributor in items.items(): 

162 if key not in visited: 

163 ordered.append(contributor) 

164 

165 return ordered 

166 

167 

168def _build_agent(row: dict) -> dict | None: 

169 family_name = row["contribution_by_family_name"] 

170 given_name = row["contribution_by_given_name"] 

171 full_name = row["contribution_by_name"] 

172 id_scheme = row["contribution_by_identifier_scheme"] 

173 id_value = row["contribution_by_identifier_value"] 

174 agent_local_id = row["contribution_by_local_identifier"] 

175 role = row["contribution_role"] 

176 

177 is_person = bool(family_name or given_name) 

178 

179 if is_person: 

180 display_name = f"{family_name}, {given_name}" if family_name and given_name else (family_name or given_name) 

181 entity_type = "person" 

182 elif full_name: 

183 display_name = full_name 

184 entity_type = "organisation" if role == "publisher" else "agent" 

185 else: 

186 return None 

187 

188 agent: dict = {"name": display_name, "entity_type": entity_type} 

189 if is_person: 

190 if family_name: 

191 agent["family_name"] = family_name 

192 if given_name: 

193 agent["given_name"] = given_name 

194 if id_scheme and id_value: 

195 agent["identifiers"] = [{"value": id_value, "scheme": id_scheme}] 

196 if not agent_local_id: 

197 msg = f"Missing required local_identifier for {entity_type} '{display_name}'" 

198 raise ValueError(msg) 

199 agent["local_identifier"] = agent_local_id 

200 return agent 

201 

202 

203def _build_org(row: dict, prefix: str) -> dict: 

204 org: dict = {"entity_type": "organisation"} 

205 name = row[f"{prefix}_name"] 

206 if name: 

207 org["name"] = name 

208 for field in ("short_name", "country", "website"): 

209 val = row[f"{prefix}_{field}"] 

210 if val: 

211 org[field] = val 

212 local_id = row[f"{prefix}_local_identifier"] 

213 if not local_id: 

214 msg = f"Missing required local_identifier for organisation '{row[f'{prefix}_name']}'" 

215 raise ValueError(msg) 

216 org["local_identifier"] = local_id 

217 return org 

218 

219 

220def _merge_org_multivalued(entry: dict, row: dict, prefix: str) -> None: 

221 id_scheme = row[f"{prefix}_identifier_scheme"] 

222 id_value = row[f"{prefix}_identifier_value"] 

223 if id_scheme and id_value and (id_scheme, id_value) not in entry["seen_ids"]: 

224 entry["seen_ids"].add((id_scheme, id_value)) 

225 entry["obj"].setdefault("identifiers", []).append({"value": id_value, "scheme": id_scheme}) 

226 org_type = row[f"{prefix}_type"] 

227 if org_type and org_type not in entry["seen_types"]: 

228 entry["seen_types"].add(org_type) 

229 entry["obj"].setdefault("types", []).append(org_type) 

230 if f"{prefix}_other_name" in row: 

231 other_name = row[f"{prefix}_other_name"] 

232 if other_name and other_name not in entry["seen_other_names"]: 

233 entry["seen_other_names"].add(other_name) 

234 entry["obj"].setdefault("other_names", []).append(other_name) 

235 

236 

237def _collect_declared_affiliations(rows: list[dict], role: str, key: str, store: dict) -> None: 

238 prefix = "contribution_declared_affiliation" 

239 role_store = store.setdefault(role, {}).setdefault(key, {}) 

240 for row in rows: 

241 if row["contribution_role"] != role or row["_contribution_key"] != key: 

242 continue 

243 aff_name = row[f"{prefix}_name"] 

244 aff_local_id = row[f"{prefix}_local_identifier"] 

245 if not aff_name and not aff_local_id: 

246 continue 

247 if aff_local_id not in role_store: 

248 role_store[aff_local_id] = { 

249 "obj": _build_org(row, prefix), 

250 "seen_ids": set(), 

251 "seen_types": set(), 

252 "seen_other_names": set(), 

253 } 

254 _merge_org_multivalued(role_store[aff_local_id], row, prefix) 

255 

256 

257def _enrich_contributor( 

258 contributor: dict, 

259 key: str, 

260 role_type: str, 

261 contribution_types: dict, 

262 affiliations: dict, 

263) -> None: 

264 types = contribution_types.get(role_type, {}).get(key) 

265 if types: 

266 contributor["contribution_types"] = types 

267 affs = affiliations.get(role_type, {}).get(key) 

268 if affs: 

269 contributor["declared_affiliations"] = [entry["obj"] for entry in affs.values()] 

270 

271 

272@dataclass 

273class _ContributorAccumulator: 

274 by_role_type: dict[str, dict[str, dict]] = dataclass_field(default_factory=dict) 

275 next_map: dict[str, dict[str, str | None]] = dataclass_field(default_factory=dict) 

276 types: dict[str, dict[str, list[str]]] = dataclass_field(default_factory=dict) 

277 affiliations: dict[str, dict[str, dict]] = dataclass_field(default_factory=dict) 

278 

279 

280def _process_contributor_row( 

281 row: dict, 

282 rows: list[dict], 

283 acc: _ContributorAccumulator, 

284) -> None: 

285 role = row["contribution_role"] 

286 key = row["_contribution_key"] 

287 if not role or not key: 

288 return 

289 

290 if role not in acc.by_role_type: 

291 acc.by_role_type[role] = {} 

292 acc.next_map[role] = {} 

293 acc.types[role] = {} 

294 

295 contribution_type = row["contribution_type"] 

296 if contribution_type: 

297 type_list = acc.types[role].setdefault(key, []) 

298 if contribution_type not in type_list: 

299 type_list.append(contribution_type) 

300 

301 if key in acc.by_role_type[role]: 

302 existing = acc.by_role_type[role][key] 

303 id_scheme = row["contribution_by_identifier_scheme"] 

304 id_value = row["contribution_by_identifier_value"] 

305 if id_scheme and id_value and not existing["by"].get("identifiers"): 

306 existing["by"]["identifiers"] = [{"value": id_value, "scheme": id_scheme}] 

307 return 

308 

309 _collect_declared_affiliations(rows, role, key, acc.affiliations) 

310 agent = _build_agent(row) 

311 if not agent: 

312 return 

313 acc.by_role_type[role][key] = {"role": role, "by": agent} 

314 acc.next_map[role][key] = row["_contribution_next_key"] or None 

315 

316 

317def _collect_contributors(rows: list[dict]) -> list[dict]: 

318 acc = _ContributorAccumulator() 

319 

320 for row in rows: 

321 _process_contributor_row(row, rows, acc) 

322 

323 result = [] 

324 for role_type in ["author", "editor", "publisher"]: 

325 if role_type not in acc.by_role_type: 

326 continue 

327 ordered = _order_linked_list(acc.by_role_type[role_type], acc.next_map[role_type]) 

328 for rank, contributor in enumerate(ordered, start=1): 

329 contributor["rank"] = rank 

330 key = next(k for k, v in acc.by_role_type[role_type].items() if v is contributor) 

331 _enrich_contributor(contributor, key, role_type, acc.types, acc.affiliations) 

332 result.append(contributor) 

333 

334 return result 

335 

336 

337def _build_venue(rows: list[dict], venue_name: str, venue_local_id: str) -> dict: 

338 venue: dict = {"name": venue_name, "entity_type": "venue"} 

339 if not venue_local_id: 

340 msg = f"Missing required local_identifier for venue '{venue_name}'" 

341 raise ValueError(msg) 

342 venue["local_identifier"] = venue_local_id 

343 acronym = rows[0]["manifestation_biblio_in_acronym"] 

344 if acronym: 

345 venue["acronym"] = acronym 

346 

347 venue_ids_seen = set() 

348 venue_identifiers = [] 

349 for row in rows: 

350 venue_scheme = row["manifestation_biblio_in_identifier_scheme"] 

351 venue_value = row["manifestation_biblio_in_identifier_value"] 

352 if venue_scheme and venue_value and (venue_scheme, venue_value) not in venue_ids_seen: 

353 venue_ids_seen.add((venue_scheme, venue_value)) 

354 venue_identifiers.append({"value": venue_value, "scheme": venue_scheme}) 

355 if venue_identifiers: 

356 venue["identifiers"] = venue_identifiers 

357 return venue 

358 

359 

360def _normalize_datetime(date_str: str) -> str: 

361 parts = date_str.split("-") 

362 if len(parts) == 1: 

363 return f"{parts[0]}-01-01T00:00:00" 

364 if len(parts) == _YEAR_MONTH_PART_COUNT: 

365 return f"{parts[0]}-{parts[1]}-01T00:00:00" 

366 if "T" not in date_str: 

367 return f"{date_str}T00:00:00" 

368 return date_str 

369 

370 

371def _collect_manifestation_dates(rows: list[dict]) -> dict[str, list[str]]: 

372 dates: dict[str, list[str]] = {} 

373 seen: set[tuple[str, str]] = set() 

374 for row in rows: 

375 date_type = row["manifestation_dates_type"] 

376 date_value = row["manifestation_dates_value"] 

377 if date_type and date_value and (date_type, date_value) not in seen: 

378 seen.add((date_type, date_value)) 

379 dates.setdefault(date_type, []).append(_normalize_datetime(date_value)) 

380 return dates 

381 

382 

383_BIBLIO_SIMPLE_FIELDS: tuple[tuple[str, str], ...] = ( 

384 ("manifestation_biblio_volume", "volume"), 

385 ("manifestation_biblio_issue", "issue"), 

386 ("manifestation_biblio_edition", "edition"), 

387 ("manifestation_biblio_number", "number"), 

388) 

389 

390 

391def _build_biblio_venue(rows: list[dict], first_row: dict) -> dict | None: 

392 venue_name = first_row["manifestation_biblio_in_name"] 

393 if not venue_name: 

394 return None 

395 venue_local_id = first_row["manifestation_biblio_in_local_identifier"] 

396 return _build_venue(rows, venue_name, venue_local_id) 

397 

398 

399def _build_biblio_hosting(rows: list[dict], first_row: dict) -> dict | None: 

400 hosting_local_id = first_row["manifestation_biblio_hosting_data_source_local_identifier"] 

401 if not hosting_local_id: 

402 return None 

403 hosting: dict = {"local_identifier": hosting_local_id, "entity_type": "datasource"} 

404 hosting_name = first_row["manifestation_biblio_hosting_data_source_name"] 

405 if hosting_name: 

406 hosting["name"] = hosting_name 

407 hosting_identifiers = _collect_identifiers( 

408 rows, 

409 "manifestation_biblio_hosting_data_source_identifier_scheme", 

410 "manifestation_biblio_hosting_data_source_identifier_value", 

411 ) 

412 if hosting_identifiers: 

413 hosting["identifiers"] = hosting_identifiers 

414 return hosting 

415 

416 

417def _build_biblio(rows: list[dict]) -> dict: 

418 first_row = rows[0] 

419 biblio: dict = {} 

420 for sparql_var, json_key in _BIBLIO_SIMPLE_FIELDS: 

421 value = first_row[sparql_var] 

422 if value: 

423 biblio[json_key] = value 

424 first_page = first_row["manifestation_biblio_pages_first"] 

425 last_page = first_row["manifestation_biblio_pages_last"] 

426 if first_page and last_page: 

427 biblio["pages"] = {"first": first_page, "last": last_page} 

428 venue = _build_biblio_venue(rows, first_row) 

429 if venue: 

430 biblio["in"] = venue 

431 hosting = _build_biblio_hosting(rows, first_row) 

432 if hosting: 

433 biblio["hosting_data_source"] = hosting 

434 return biblio 

435 

436 

437def _build_manifestation_type(first_row: dict) -> dict | None: 

438 type_class = first_row["manifestation_type_class"] 

439 if not type_class: 

440 return None 

441 separator = "#" if "#" in type_class else "/" 

442 defined_in = type_class.rsplit(separator, 1)[0] 

443 manifestation_type: dict = {"class": type_class, "defined_in": defined_in} 

444 type_label = first_row["manifestation_type_label"] 

445 if type_label: 

446 label_lang = first_row["manifestation_type_label_lang"] or "none" 

447 manifestation_type["labels"] = {label_lang: type_label} 

448 return manifestation_type 

449 

450 

451def _build_status_with_description(first_row: dict, status_field: str, desc_field: str) -> dict | None: 

452 status = first_row[status_field] 

453 if not status: 

454 return None 

455 result: dict = {"status": status} 

456 desc = first_row[desc_field] 

457 if desc: 

458 result["description"] = desc 

459 return result 

460 

461 

462def _build_manifestation(rows: list[dict]) -> dict | None: 

463 first_row = rows[0] 

464 manifestation: dict = {} 

465 

466 manifestation_type = _build_manifestation_type(first_row) 

467 if manifestation_type: 

468 manifestation["type"] = manifestation_type 

469 

470 dates = _collect_manifestation_dates(rows) 

471 if dates: 

472 manifestation["dates"] = dates 

473 

474 identifiers = _collect_identifiers(rows, "manifestation_identifier_scheme", "manifestation_identifier_value") 

475 if identifiers: 

476 manifestation["identifiers"] = identifiers 

477 

478 peer_review = _build_status_with_description( 

479 first_row, "manifestation_peer_review_status", "manifestation_peer_review_description" 

480 ) 

481 if peer_review: 

482 manifestation["peer_review"] = peer_review 

483 

484 access_rights = _build_status_with_description( 

485 first_row, "manifestation_access_rights_status", "manifestation_access_rights_description" 

486 ) 

487 if access_rights: 

488 manifestation["access_rights"] = access_rights 

489 

490 licence = first_row["manifestation_licence"] 

491 if licence: 

492 manifestation["licence"] = licence 

493 

494 version = first_row["manifestation_version"] 

495 if version: 

496 manifestation["version"] = version 

497 

498 biblio = _build_biblio(rows) 

499 if biblio: 

500 manifestation["biblio"] = biblio 

501 

502 return manifestation or None 

503 

504 

505_RELATED_PRODUCT_COLUMNS = [ 

506 "related_products_cites", 

507 "related_products_is_supplemented_by", 

508 "related_products_is_documented_by", 

509 "related_products_is_new_version_of", 

510 "related_products_is_part_of", 

511] 

512 

513 

514def _collect_related_products(rows: list[dict]) -> dict: 

515 result: dict[str, list[str]] = {} 

516 for column in _RELATED_PRODUCT_COLUMNS: 

517 key = column.replace("related_products_", "") 

518 seen: set[str] = set() 

519 values: list[str] = [] 

520 for row in rows: 

521 val = row[column] 

522 if val and val not in seen: 

523 seen.add(val) 

524 values.append(val) 

525 if values: 

526 result[key] = values 

527 return result 

528 

529 

530def _collect_topics(rows: list[dict]) -> list[dict]: 

531 topics_by_uri: dict[str, dict] = {} 

532 seen_identifiers: dict[str, set] = {} 

533 seen_provenance: dict[str, set] = {} 

534 

535 for row in rows: 

536 uri = row["topic_term"] 

537 if not uri: 

538 continue 

539 

540 if uri not in topics_by_uri: 

541 topics_by_uri[uri] = {"term": {"local_identifier": uri, "entity_type": "topic"}} 

542 seen_identifiers[uri] = set() 

543 seen_provenance[uri] = set() 

544 

545 topic = topics_by_uri[uri] 

546 term = topic["term"] 

547 

548 label = row["topic_label"] 

549 if label: 

550 lang = row["topic_label_lang"] or "none" 

551 term.setdefault("labels", {})[lang] = label 

552 

553 id_scheme = row["topic_identifier_scheme"] 

554 id_value = row["topic_identifier_value"] 

555 if id_scheme and id_value and (id_scheme, id_value) not in seen_identifiers[uri]: 

556 seen_identifiers[uri].add((id_scheme, id_value)) 

557 term.setdefault("identifiers", []).append({"scheme": id_scheme, "value": id_value}) 

558 

559 prov_agent = row["topic_provenance_associated_with"] 

560 prov_trust = row["topic_provenance_trust"] 

561 if prov_agent and prov_trust and prov_agent not in seen_provenance[uri]: 

562 seen_provenance[uri].add(prov_agent) 

563 topic.setdefault("provenance", []).append({"associated_with": prov_agent, "trust": float(prov_trust)}) 

564 

565 return list(topics_by_uri.values()) 

566 

567 

568def _collect_organisation(rows: list[dict], prefix: str) -> list[dict]: 

569 entries: dict[str, dict] = {} 

570 for row in rows: 

571 name = row[f"{prefix}_name"] 

572 local_id = row[f"{prefix}_local_identifier"] 

573 if not name and not local_id: 

574 continue 

575 if local_id not in entries: 

576 entries[local_id] = { 

577 "obj": _build_org(row, prefix), 

578 "seen_ids": set(), 

579 "seen_types": set(), 

580 "seen_other_names": set(), 

581 } 

582 _merge_org_multivalued(entries[local_id], row, prefix) 

583 return [entry["obj"] for entry in entries.values()] 

584 

585 

586def _build_grant(row: dict) -> dict: 

587 funding_local_id = row["funding_local_identifier"] 

588 if not funding_local_id: 

589 msg = "Missing required local_identifier for grant" 

590 raise ValueError(msg) 

591 grant: dict = {"local_identifier": funding_local_id, "entity_type": "grant"} 

592 for field, csv_col in ( 

593 ("grant_number", "funding_grant_number"), 

594 ("acronym", "funding_acronym"), 

595 ("funding_stream", "funding_stream"), 

596 ): 

597 val = row[csv_col] 

598 if val: 

599 grant[field] = val 

600 title = row["funding_title"] 

601 if title: 

602 grant["titles"] = {row["funding_title_lang"] or "none": title} 

603 abstract = row["funding_abstract"] 

604 if abstract: 

605 grant["abstracts"] = {row["funding_abstract_lang"] or "none": abstract} 

606 agency_name = row["funding_agency_name"] 

607 if agency_name: 

608 grant["funding_agency"] = _build_org(row, "funding_agency") 

609 return grant 

610 

611 

612def _collect_funding(rows: list[dict]) -> list[dict]: 

613 funding_by_key: dict[str, dict] = {} 

614 seen_ids: dict[str, set[tuple[str, str]]] = {} 

615 agency_trackers: dict[str, dict] = {} 

616 

617 for row in rows: 

618 local_id = row["funding_local_identifier"] 

619 if not local_id: 

620 continue 

621 if local_id not in funding_by_key: 

622 funding_by_key[local_id] = _build_grant(row) 

623 seen_ids[local_id] = set() 

624 agency_trackers[local_id] = {"seen_ids": set(), "seen_types": set()} 

625 

626 id_scheme = row["funding_identifier_scheme"] 

627 id_value = row["funding_identifier_value"] 

628 if id_scheme and id_value and (id_scheme, id_value) not in seen_ids[local_id]: 

629 seen_ids[local_id].add((id_scheme, id_value)) 

630 funding_by_key[local_id].setdefault("identifiers", []).append({"value": id_value, "scheme": id_scheme}) 

631 

632 if "funding_agency" not in funding_by_key[local_id]: 

633 continue 

634 tracker = agency_trackers[local_id] 

635 agency = funding_by_key[local_id]["funding_agency"] 

636 a_scheme = row["funding_agency_identifier_scheme"] 

637 a_value = row["funding_agency_identifier_value"] 

638 if a_scheme and a_value and (a_scheme, a_value) not in tracker["seen_ids"]: 

639 tracker["seen_ids"].add((a_scheme, a_value)) 

640 agency.setdefault("identifiers", []).append({"value": a_value, "scheme": a_scheme}) 

641 a_type = row["funding_agency_type"] 

642 if a_type and a_type not in tracker["seen_types"]: 

643 tracker["seen_types"].add(a_type) 

644 agency.setdefault("types", []).append(a_type) 

645 

646 return list(funding_by_key.values()) 

647 

648 

649def normalize_local_identifier_url(local_identifier: str) -> tuple[str]: 

650 # Reverse proxies (e.g. Traefik) merge duplicate slashes in request paths, 

651 # turning "https://example.org/..." into "https:/example.org/...": restore the scheme separator. 

652 return (sub(r"^(https?):/+", r"\1://", local_identifier),) 

653 

654 

655def _canonical_path(path: str) -> str: 

656 segments = [segment for segment in path.split("/") if segment] 

657 for index, segment in enumerate(segments): 

658 if segment in ENTITY_TYPES and index + 1 < len(segments): 

659 identifier = normalize_local_identifier_url("/".join(segments[index + 1 :]))[0] 

660 return "/" + "/".join(segments[: index + 1]) + "/" + identifier 

661 return path 

662 

663 

664def _build_search_result_page(url: str) -> dict: 

665 return {"local_identifier": url, "entity_type": "search_result_page"} 

666 

667 

668def _meta_base_url(request_url: str) -> str: 

669 parsed = urlsplit(request_url) 

670 path = _canonical_path(parsed.path) 

671 if parsed.scheme and parsed.netloc: 

672 return f"{parsed.scheme}://{parsed.netloc}{path}" 

673 return path 

674 

675 

676def _page_url(base_path: str, params: dict[str, list[str]], page: int) -> str: 

677 page_params = {**params, "page": [str(page)]} 

678 return f"{base_path}?{urlencode(page_params, doseq=True, safe=':,')}" 

679 

680 

681def _raise_unprocessable(message: str) -> NoReturn: 

682 raise HttpError(_UNPROCESSABLE_CONTENT, f"HTTP status code {_UNPROCESSABLE_CONTENT}: {message}") 

683 

684 

685def _parse_positive_int_param(params: dict[str, list[str]], name: str) -> int: 

686 raw_value = params[name][0] 

687 try: 

688 value = int(raw_value) 

689 except ValueError: 

690 _raise_unprocessable(f"{name} must be an integer, got {raw_value!r}") 

691 if value < 1: 

692 _raise_unprocessable(f"{name} must be >= 1, got {value}") 

693 return value 

694 

695 

696def _validate_page_range(page: int, total_items: int, total_pages: int) -> None: 

697 if total_items and page > total_pages: 

698 _raise_unprocessable(f"page {page} exceeds total pages {total_pages}") 

699 

700 

701def _build_meta(request_url: str, graph_size: int) -> dict: 

702 parsed = urlsplit(request_url) 

703 base_url = _meta_base_url(request_url) 

704 if _is_single_entity_request(request_url): 

705 return {"local_identifier": base_url, "entity_type": "single_entity"} 

706 params = parse_qs(parsed.query) 

707 if "total_items" in params: 

708 total_items = int(params["total_items"][0]) 

709 page = _parse_positive_int_param(params, "page") 

710 page_size = _parse_positive_int_param(params, "page_size") 

711 elif "page_size" in params: 

712 total_items = graph_size 

713 page = _parse_positive_int_param(params, "page") if "page" in params else 1 

714 page_size = _parse_positive_int_param(params, "page_size") 

715 elif "page" in params: 

716 _raise_unprocessable("page requires page_size") 

717 else: 

718 total_items = graph_size 

719 page = 1 

720 page_size = max(graph_size, 1) 

721 total_pages = ceil(total_items / page_size) if page_size > 0 else 0 

722 non_pagination_params = {k: v for k, v in params.items() if k not in ("page", "page_size", "total_items")} 

723 clean_params = {**non_pagination_params, "page": [str(page)], "page_size": [str(page_size)]} 

724 self_url = f"{base_url}?{urlencode(clean_params, doseq=True, safe=':,')}" 

725 meta = _build_search_result_page(self_url) 

726 if page < total_pages: 

727 meta["next_page"] = _build_search_result_page(_page_url(base_url, clean_params, page + 1)) 

728 if page > 1: 

729 meta["prev_page"] = _build_search_result_page(_page_url(base_url, clean_params, page - 1)) 

730 base_params = {k: v for k, v in clean_params.items() if k not in ("page", "page_size")} 

731 search_result_url = f"{base_url}?{urlencode(base_params, doseq=True, safe=':,')}" if base_params else base_url 

732 meta["part_of"] = { 

733 "local_identifier": search_result_url, 

734 "entity_type": "search_result", 

735 "total_items": total_items, 

736 "first_page": _build_search_result_page(_page_url(base_url, clean_params, 1)), 

737 "last_page": _build_search_result_page(_page_url(base_url, clean_params, max(total_pages, 1))), 

738 } 

739 return meta 

740 

741 

742_BUILDER_COLUMN_PREFIXES = ( 

743 "identifier_", 

744 "contribution_", 

745 "manifestation_", 

746 "related_products_", 

747 "topic_", 

748 "funding_", 

749 "relevant_organisation_", 

750) 

751 

752 

753def _collect_passthrough_fields(first_row: dict, active_formatted: set[str]) -> dict: 

754 entity: dict = {} 

755 for col, val in first_row.items(): 

756 if col.startswith("_") or col in active_formatted: 

757 continue 

758 if any(col.startswith(prefix) for prefix in _BUILDER_COLUMN_PREFIXES): 

759 continue 

760 if val: 

761 entity[col] = val 

762 return entity 

763 

764 

765def _add_formatted_text(entity: dict, first_row: dict, field: str, lang_field: str, output_key: str) -> None: 

766 if first_row.get(field): 

767 lang = first_row.get(lang_field) or "none" 

768 entity[output_key] = {lang: [first_row[field]]} 

769 

770 

771_SECTION_BUILDERS: tuple[tuple[str, str, Callable], ...] = ( 

772 ("identifier_scheme", "identifiers", _collect_identifiers), 

773 ("contribution_role", "contributions", _collect_contributors), 

774 ("topic_term", "topics", _collect_topics), 

775 ("funding_local_identifier", "funding", _collect_funding), 

776) 

777 

778 

779def _build_entity(rows: list[dict]) -> dict: 

780 first_row = rows[0] 

781 columns = set(first_row) 

782 

783 active_formatted: set[str] = set() 

784 if "title" in columns: 

785 active_formatted.update(("title", "title_lang")) 

786 if "abstract" in columns: 

787 active_formatted.update(("abstract", "abstract_lang")) 

788 

789 entity = _collect_passthrough_fields(first_row, active_formatted) 

790 _add_formatted_text(entity, first_row, "title", "title_lang", "titles") 

791 _add_formatted_text(entity, first_row, "abstract", "abstract_lang", "abstracts") 

792 

793 for anchor, key, builder in _SECTION_BUILDERS: 

794 if anchor in columns and (section := builder(rows)): 

795 entity[key] = section 

796 if "manifestation_type_class" in columns and (manifestation := _build_manifestation(rows)): 

797 entity["manifestations"] = [manifestation] 

798 if columns & set(_RELATED_PRODUCT_COLUMNS) and (related := _collect_related_products(rows)): 

799 entity["related_products"] = related 

800 if "relevant_organisation_name" in columns and ( 

801 organisations := _collect_organisation(rows, "relevant_organisation") 

802 ): 

803 entity["relevant_organisations"] = organisations 

804 

805 return entity 

806 

807 

808def _build_entities(rows: list[dict]) -> list[dict]: 

809 if not rows: 

810 return [] 

811 groups: dict[str, list[dict]] = {} 

812 for row in rows: 

813 groups.setdefault(row["local_identifier"], []).append(row) 

814 return [_build_entity(group) for group in groups.values()] 

815 

816 

817ENTITY_TYPES = frozenset({"products", "persons", "organisations", "grants", "venues", "topics", "datasources"}) 

818 

819_ENTITY_TYPE_MAP: dict[str, str] = { 

820 "products": "product", 

821 "persons": "person", 

822 "organisations": "organisation", 

823 "grants": "grant", 

824 "venues": "venue", 

825 "topics": "topic", 

826 "datasources": "datasource", 

827} 

828 

829 

830def _extract_entity_type(request_url: str) -> str | None: 

831 for segment in urlsplit(request_url).path.split("/"): 

832 if segment in _ENTITY_TYPE_MAP: 

833 return _ENTITY_TYPE_MAP[segment] 

834 return None 

835 

836 

837def _is_single_entity_request(request_url: str) -> bool: 

838 segments = [s for s in urlsplit(request_url).path.split("/") if s] 

839 for i, segment in enumerate(segments): 

840 if segment in ENTITY_TYPES: 

841 return i + 1 < len(segments) 

842 return False 

843 

844 

845_COLUMN_GROUP_PREFIXES = ( 

846 "identifier_", 

847 "contribution_", 

848 "_contribution_", 

849 "manifestation_", 

850 "related_products_", 

851 "topic_", 

852 "funding_", 

853 "relevant_organisation_", 

854) 

855 

856_CORE_COLUMNS = frozenset(col for col in _PRODUCT_COLUMNS if not any(col.startswith(p) for p in _COLUMN_GROUP_PREFIXES)) 

857 

858 

859def _fill_missing_columns(rows: list[dict[str, str]]) -> list[dict[str, str]]: 

860 if not rows: 

861 return rows 

862 present = set(rows[0]) 

863 active_prefixes = {prefix for prefix in _COLUMN_GROUP_PREFIXES if any(col.startswith(prefix) for col in present)} 

864 missing = { 

865 col: "" 

866 for col in _PRODUCT_COLUMNS 

867 if col not in present 

868 and ((active_prefixes and col in _CORE_COLUMNS) or any(col.startswith(p) for p in active_prefixes)) 

869 } 

870 if not missing: 

871 return rows 

872 return [missing | row for row in rows] 

873 

874 

875def to_skg_if(csv_str: str, request_url: str = "") -> str: 

876 rows = _fill_missing_columns(list(csv.DictReader(StringIO(csv_str)))) 

877 if not rows and _is_single_entity_request(request_url): 

878 msg = "HTTP status code 404: entity not found" 

879 raise HttpError(404, msg) 

880 graph = _build_entities(rows) 

881 

882 total_entities = len(graph) 

883 

884 parsed = urlsplit(request_url) 

885 params = parse_qs(parsed.query) 

886 if "page_size" in params and "total_items" not in params: 

887 page_size = _parse_positive_int_param(params, "page_size") 

888 page = _parse_positive_int_param(params, "page") if "page" in params else 1 

889 total_pages = ceil(total_entities / page_size) if page_size > 0 else 0 

890 _validate_page_range(page, total_entities, total_pages) 

891 start = (page - 1) * page_size 

892 graph = graph[start : start + page_size] 

893 elif "page" in params and "total_items" not in params: 

894 _raise_unprocessable("page requires page_size") 

895 

896 entity_type = _extract_entity_type(request_url) 

897 if entity_type: 

898 for entity in graph: 

899 entity["entity_type"] = entity_type 

900 

901 result = { 

902 "@context": SKGIF_CONTEXT, 

903 "meta": _build_meta(request_url, total_entities), 

904 "@graph": graph, 

905 } 

906 return json.dumps(result, ensure_ascii=False, indent=4)