Coverage for oc_ds_converter / jalc / jalc_processing.py: 95%
130 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it>
3#
4# SPDX-License-Identifier: ISC
6from __future__ import annotations
8from oc_ds_converter.lib.crossref_style_processing import CrossrefStyleProcessing
9from oc_ds_converter.oc_idmanager.jid import JIDManager
10from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager
13class JalcProcessing(CrossrefStyleProcessing):
14 # Publisher prefix mapping is disabled for JALC. Unlike Crossref, which provides
15 # a /members API endpoint with authoritative publisher names and their DOI prefixes,
16 # JALC has no equivalent endpoint. The JALC /prefixes API only returns prefix, ra,
17 # siteId, and updated_date - no publisher names. Since 99.8% of JALC prefixes are
18 # not in the Crossref mapping anyway (JALC is a separate DOI registration agency),
19 # we use publisher names directly from the source data's publisher_list field.
21 def __init__(
22 self,
23 orcid_index: str | None = None,
24 storage_manager: StorageManager | None = None,
25 testing: bool = True,
26 citing: bool = True,
27 exclude_existing: bool = False,
28 use_redis_orcid_index: bool = False,
29 ):
30 super().__init__(
31 orcid_index=orcid_index,
32 publishers_filepath=None,
33 storage_manager=storage_manager,
34 testing=testing,
35 citing=citing,
36 use_redis_orcid_index=use_redis_orcid_index,
37 use_redis_publishers=False,
38 )
39 self.exclude_existing = exclude_existing
41 self.jid_m = JIDManager(storage_manager=self.storage_manager, testing=testing)
42 self.tmp_jid_m = JIDManager(storage_manager=self.temporary_manager, testing=testing)
44 self.venue_id_man_dict["jid"] = self.jid_m
45 self.venue_tmp_id_man_dict["jid"] = self.tmp_jid_m
47 @classmethod
48 def get_ja(cls, field: list) -> list:
49 """Select Japanese version of metadata, falling back to English."""
50 if all('lang' in item for item in field):
51 ja = [item for item in field if item['lang'] == 'ja']
52 ja = list(filter(lambda x: x['type'] != 'before' if 'type' in x else x, ja))
53 if ja:
54 return ja
55 en = [item for item in field if item['lang'] == 'en']
56 en = list(filter(lambda x: x['type'] != 'before' if 'type' in x else x, en))
57 if en:
58 return en
59 return field
61 def _extract_doi(self, item: dict) -> str:
62 return item.get("doi", "")
64 def _extract_title(self, item: dict) -> str:
65 title_list = item.get('title_list')
66 if title_list:
67 return self.get_ja(title_list)[0].get('title', '')
68 return ''
70 def _extract_agents(self, item: dict) -> list[dict]:
71 authors: list[dict[str, str]] = []
72 creator_list = item.get("creator_list")
73 if creator_list:
74 for creator in creator_list:
75 agent: dict[str, str] = {"role": "author"}
76 names = creator.get('names', [])
77 if names:
78 ja_name = self.get_ja(names)[0]
79 last_name = ja_name.get('last_name', '')
80 first_name = ja_name.get('first_name', '')
81 else:
82 last_name = ''
83 first_name = ''
84 full_name = ''
85 if last_name:
86 full_name += last_name
87 if first_name:
88 full_name += f', {first_name}'
89 agent["name"] = full_name
90 agent["family"] = last_name
91 agent["given"] = first_name
92 researcher_id_list = creator.get('researcher_id_list', [])
93 for researcher_id in researcher_id_list:
94 if researcher_id.get('type') == 'ORCID' and researcher_id.get('id_code'):
95 agent['orcid'] = researcher_id['id_code']
96 break
97 authors.append(agent)
98 return authors
100 def _extract_venue(self, item: dict) -> str:
101 venue_name = ''
102 journal_ids: list[str] = []
103 if 'journal_title_name_list' in item:
104 candidate_venues = self.get_ja(item['journal_title_name_list'])
105 if candidate_venues:
106 full_venue = [v for v in candidate_venues if v.get('type') == 'full']
107 if full_venue:
108 venue_name = full_venue[0].get('journal_title_name', '')
109 elif candidate_venues:
110 venue_name = candidate_venues[0].get('journal_title_name', '')
111 if 'journal_id_list' in item:
112 for v in item['journal_id_list']:
113 if isinstance(v, dict):
114 journal_id = v.get("journal_id")
115 id_type = v.get("type")
116 if journal_id and id_type:
117 schema = id_type.lower().strip()
118 if schema in ["issn", "jid"]:
119 tmp_id_man = self.venue_tmp_id_man_dict.get(schema)
120 if tmp_id_man and hasattr(tmp_id_man, 'normalise'):
121 norm_id = getattr(tmp_id_man, 'normalise')(journal_id, include_prefix=True)
122 if norm_id:
123 journal_ids.append(norm_id)
124 return f"{venue_name} [{' '.join(journal_ids)}]" if journal_ids else venue_name
126 def _extract_pub_date(self, item: dict) -> str:
127 pub_date_dict = item.get('publication_date')
128 if not pub_date_dict or not isinstance(pub_date_dict, dict):
129 return ''
130 pub_date_list: list[str] = []
131 year = pub_date_dict.get('publication_year', '')
132 if year:
133 pub_date_list.append(str(year))
134 month = pub_date_dict.get('publication_month', '')
135 if month:
136 pub_date_list.append(str(month))
137 day = pub_date_dict.get('publication_day', '')
138 if day:
139 pub_date_list.append(str(day))
140 return '-'.join(pub_date_list)
142 def _extract_pages(self, item: dict) -> str:
143 first_page = item.get('first_page', '')
144 last_page = item.get('last_page', '')
145 page_list: list[str] = []
146 if first_page:
147 page_list.append(first_page)
148 if last_page:
149 page_list.append(last_page)
150 return self.get_pages(page_list)
152 def _extract_type(self, item: dict) -> str:
153 content_type = item.get('content_type')
154 if not content_type:
155 return ''
156 type_map = {
157 'JA': 'journal article',
158 'BK': 'book',
159 'RD': 'dataset',
160 'EL': 'other',
161 'GD': 'other',
162 }
163 return type_map.get(content_type, '')
165 def _extract_publisher(self, item: dict) -> str:
166 if 'publisher_list' in item:
167 return self.get_ja(item['publisher_list'])[0].get('publisher_name', '')
168 return ''
170 def extract_all_ids(self, entity_dict: dict, is_citing: bool) -> tuple[list[str], list[str]]:
171 all_br: list[str] = []
172 all_ra: list[str] = []
174 if not is_citing:
175 citation_list = entity_dict.get("data", {}).get("citation_list", [])
176 for citation in citation_list:
177 doi = citation.get("doi")
178 if doi:
179 norm_id = self.doi_m.normalise(doi, include_prefix=True)
180 if norm_id:
181 all_br.append(norm_id)
182 return all_br, all_ra