Coverage for oc_ds_converter / oc_idmanager / pmcid.py: 73%

99 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it> 

2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

3# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7 

8from json import loads 

9from re import match, sub 

10from time import sleep 

11from urllib.parse import quote, unquote 

12 

13from oc_ds_converter.oc_idmanager.base import IdentifierManager 

14from requests import ReadTimeout, get 

15from requests.exceptions import ConnectionError 

16from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager 

17from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager 

18 

19 

20 

21class PMCIDManager(IdentifierManager): 

22 """This class implements an identifier manager for PMCID identifier""" 

23 

24 def __init__(self, use_api_service: bool = True, storage_manager: StorageManager | None = None, testing: bool = True) -> None: 

25 """PMCID manager constructor.""" 

26 super(PMCIDManager, self).__init__() 

27 self._use_api_service = use_api_service 

28 if storage_manager is None: 

29 self.storage_manager = RedisStorageManager(testing=testing) 

30 else: 

31 self.storage_manager = storage_manager 

32 self._api = "https://www.ncbi.nlm.nih.gov/pmc/utils/idconv/v1.0/" 

33 self._use_api_service = use_api_service 

34 self._p = "pmcid:" 

35 

36 # If there's a need to obtain more metadata from a PMCID, consider using Entrez (aka E-Utilities) API ( 

37 # https://eutils.ncbi.nlm.nih.gov/entrez/eutils/), which of course works with different parameters and 

38 # returns different responses. 

39 # The ID Converter API only provides alternative IDs (doi, pmid) for the work associated to the queried pmcid. 

40 

41 def validated_as_id(self, id_string): 

42 arxiv_vaidation_value = self.storage_manager.get_value(id_string) 

43 if isinstance(arxiv_vaidation_value, bool): 

44 return arxiv_vaidation_value 

45 else: 

46 return None 

47 

48 

49 def is_valid(self, pmcid, get_extra_info=False): 

50 pmcid = self.normalise(pmcid, include_prefix=True) 

51 

52 if pmcid is None: 

53 return False 

54 else: 

55 pmc_vaidation_value = self.storage_manager.get_value(pmcid) 

56 if isinstance(pmc_vaidation_value, bool): 

57 return pmc_vaidation_value 

58 else: 

59 if get_extra_info: 

60 info = self.exists(pmcid, get_extra_info=True) 

61 self.storage_manager.set_full_value(pmcid,info[1]) 

62 return (info[0] and self.syntax_ok(pmcid)), info[1] 

63 validity_check = self.syntax_ok(pmcid) and self.exists(pmcid) 

64 self.storage_manager.set_value(pmcid, validity_check) 

65 

66 return validity_check 

67 

68 def normalise(self, id_string, include_prefix=False): 

69 try: 

70 if id_string.startswith(self._p): 

71 id_string = id_string[len(self._p):] 

72 else: 

73 id_string = id_string 

74 

75 pmcid_string = sub( 

76 r"\0+", "", sub(r"\s+", "", unquote(id_string[id_string.index("PMC"):])) 

77 ) 

78 return "%s%s" % ( 

79 self._p if include_prefix else "", 

80 pmcid_string.strip(), 

81 ) 

82 except: 

83 # Any error in processing the DOI will return None 

84 return None 

85 

86 def syntax_ok(self, id_string): 

87 

88 if not id_string.startswith("pmcid:"): 

89 id_string = self._p + id_string 

90 return True if match(r"^pmcid:PMC[1-9]\d+(\.\d{1,2})?$", id_string) else False 

91 

92 def exists(self, pmcid_full, get_extra_info=False, allow_extra_api=None): 

93 valid_bool = True 

94 if self._use_api_service: 

95 pmcid = self.normalise(pmcid_full) 

96 if pmcid is not None: 

97 tentative = 3 

98 while tentative: 

99 tentative -= 1 

100 try: 

101 parameters = { 

102 'ids': quote(pmcid), 

103 'format': 'json', 

104 'idtype': 'pmcid' 

105 } 

106 

107 r = get(self._api, params=parameters, headers=self._headers, timeout=30) 

108 if r.status_code == 200: 

109 r.encoding = "utf-8" 

110 json_res = loads(r.text) 

111 if get_extra_info: 

112 extra_info_result = {} 

113 try: 

114 result = True if not json_res['records'][0].get('status') =='error' else False 

115 extra_info_result['valid'] = result 

116 extra_info_result['id'] = pmcid 

117 return result, extra_info_result 

118 except KeyError: 

119 extra_info_result["valid"] = False 

120 extra_info_result['id'] = pmcid 

121 return False, extra_info_result 

122 try: 

123 return True if not json_res['records'][0].get('status') =='error' else False 

124 

125 except KeyError: 

126 return False 

127 

128 elif 400 <= r.status_code < 500: 

129 if get_extra_info: 

130 return False, {"id":pmcid, "valid": False} 

131 return False 

132 except ReadTimeout: 

133 # Do nothing, just try again 

134 pass 

135 except ConnectionError: 

136 # Sleep 5 seconds, then try again 

137 sleep(5) 

138 valid_bool = False 

139 else: 

140 if get_extra_info: 

141 return False, {"id":pmcid, "valid": False} 

142 return False 

143 

144 if get_extra_info: 

145 return valid_bool, {"valid": valid_bool} 

146 return valid_bool 

147 

148 def extra_info(self, api_response, choose_api=None, info_dict={}): 

149 result = {} 

150 result["valid"] = True 

151 # to be implemented 

152 return result