Coverage for oc_ds_converter / oc_idmanager / pmcid.py: 73%
99 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it>
2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
3# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it>
4#
5# SPDX-License-Identifier: ISC
8from json import loads
9from re import match, sub
10from time import sleep
11from urllib.parse import quote, unquote
13from oc_ds_converter.oc_idmanager.base import IdentifierManager
14from requests import ReadTimeout, get
15from requests.exceptions import ConnectionError
16from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager
17from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager
21class PMCIDManager(IdentifierManager):
22 """This class implements an identifier manager for PMCID identifier"""
24 def __init__(self, use_api_service: bool = True, storage_manager: StorageManager | None = None, testing: bool = True) -> None:
25 """PMCID manager constructor."""
26 super(PMCIDManager, self).__init__()
27 self._use_api_service = use_api_service
28 if storage_manager is None:
29 self.storage_manager = RedisStorageManager(testing=testing)
30 else:
31 self.storage_manager = storage_manager
32 self._api = "https://www.ncbi.nlm.nih.gov/pmc/utils/idconv/v1.0/"
33 self._use_api_service = use_api_service
34 self._p = "pmcid:"
36 # If there's a need to obtain more metadata from a PMCID, consider using Entrez (aka E-Utilities) API (
37 # https://eutils.ncbi.nlm.nih.gov/entrez/eutils/), which of course works with different parameters and
38 # returns different responses.
39 # The ID Converter API only provides alternative IDs (doi, pmid) for the work associated to the queried pmcid.
41 def validated_as_id(self, id_string):
42 arxiv_vaidation_value = self.storage_manager.get_value(id_string)
43 if isinstance(arxiv_vaidation_value, bool):
44 return arxiv_vaidation_value
45 else:
46 return None
49 def is_valid(self, pmcid, get_extra_info=False):
50 pmcid = self.normalise(pmcid, include_prefix=True)
52 if pmcid is None:
53 return False
54 else:
55 pmc_vaidation_value = self.storage_manager.get_value(pmcid)
56 if isinstance(pmc_vaidation_value, bool):
57 return pmc_vaidation_value
58 else:
59 if get_extra_info:
60 info = self.exists(pmcid, get_extra_info=True)
61 self.storage_manager.set_full_value(pmcid,info[1])
62 return (info[0] and self.syntax_ok(pmcid)), info[1]
63 validity_check = self.syntax_ok(pmcid) and self.exists(pmcid)
64 self.storage_manager.set_value(pmcid, validity_check)
66 return validity_check
68 def normalise(self, id_string, include_prefix=False):
69 try:
70 if id_string.startswith(self._p):
71 id_string = id_string[len(self._p):]
72 else:
73 id_string = id_string
75 pmcid_string = sub(
76 r"\0+", "", sub(r"\s+", "", unquote(id_string[id_string.index("PMC"):]))
77 )
78 return "%s%s" % (
79 self._p if include_prefix else "",
80 pmcid_string.strip(),
81 )
82 except:
83 # Any error in processing the DOI will return None
84 return None
86 def syntax_ok(self, id_string):
88 if not id_string.startswith("pmcid:"):
89 id_string = self._p + id_string
90 return True if match(r"^pmcid:PMC[1-9]\d+(\.\d{1,2})?$", id_string) else False
92 def exists(self, pmcid_full, get_extra_info=False, allow_extra_api=None):
93 valid_bool = True
94 if self._use_api_service:
95 pmcid = self.normalise(pmcid_full)
96 if pmcid is not None:
97 tentative = 3
98 while tentative:
99 tentative -= 1
100 try:
101 parameters = {
102 'ids': quote(pmcid),
103 'format': 'json',
104 'idtype': 'pmcid'
105 }
107 r = get(self._api, params=parameters, headers=self._headers, timeout=30)
108 if r.status_code == 200:
109 r.encoding = "utf-8"
110 json_res = loads(r.text)
111 if get_extra_info:
112 extra_info_result = {}
113 try:
114 result = True if not json_res['records'][0].get('status') =='error' else False
115 extra_info_result['valid'] = result
116 extra_info_result['id'] = pmcid
117 return result, extra_info_result
118 except KeyError:
119 extra_info_result["valid"] = False
120 extra_info_result['id'] = pmcid
121 return False, extra_info_result
122 try:
123 return True if not json_res['records'][0].get('status') =='error' else False
125 except KeyError:
126 return False
128 elif 400 <= r.status_code < 500:
129 if get_extra_info:
130 return False, {"id":pmcid, "valid": False}
131 return False
132 except ReadTimeout:
133 # Do nothing, just try again
134 pass
135 except ConnectionError:
136 # Sleep 5 seconds, then try again
137 sleep(5)
138 valid_bool = False
139 else:
140 if get_extra_info:
141 return False, {"id":pmcid, "valid": False}
142 return False
144 if get_extra_info:
145 return valid_bool, {"valid": valid_bool}
146 return valid_bool
148 def extra_info(self, api_response, choose_api=None, info_dict={}):
149 result = {}
150 result["valid"] = True
151 # to be implemented
152 return result