Coverage for oc_ds_converter / oc_idmanager / url.py: 72%

87 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023-2024 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5 

6import urllib.parse 

7from time import sleep 

8 

9import validators 

10from oc_ds_converter.oc_idmanager import * 

11from oc_ds_converter.oc_idmanager.base import IdentifierManager 

12from requests import ReadTimeout, get 

13from requests.exceptions import ConnectionError 

14 

15 

16class URLManager(IdentifierManager): 

17 """This class implements an identifier manager for url identifier""" 

18 

19 def __init__(self, data={}, use_api_service=True): 

20 """URL manager constructor.""" 

21 super(URLManager, self).__init__() 

22 self._use_api_service = use_api_service 

23 self._p = "url:" 

24 self._data = data 

25 self._scheme_https = "https://" 

26 self._scheme_http = "http://" 

27 

28 def is_valid(self, url, get_extra_info=False): 

29 url = self.normalise(url, include_prefix=True) 

30 if url is None: 

31 return False 

32 else: 

33 if url not in self._data or self._data[url] is None: 

34 if get_extra_info: 

35 info = self.exists(url, get_extra_info=True) 

36 self._data[url] = info[1] 

37 return (info[0] and self.syntax_ok(url)), info[1] 

38 self._data[url] = dict() 

39 self._data[url]["valid"] = True if (self.syntax_ok(url) and self.exists(url)) else False 

40 return self._data[url].get("valid") 

41 

42 if get_extra_info: 

43 return self._data[url].get("valid"), self._data[url] 

44 return self._data[url].get("valid") 

45 

46 def normalise(self, id_string, include_prefix=False): 

47 id_string = str(id_string) 

48 url_string = id_string.strip() 

49 if url_string.startswith(self._p): 

50 url_string = url_string[len(self._p):] 

51 if url_string.endswith("/"): 

52 url_string = url_string[:-1] 

53 if url_string.startswith("https://"): 

54 url_string = url_string[len("https://"):] 

55 elif url_string.startswith("http://"): 

56 url_string = url_string[len("http://"):] 

57 if url_string.startswith("www."): 

58 url_string = url_string[len("www."):] 

59 try: 

60 url_string = urllib.parse.quote(url_string, safe="%/:=&?~#+!$,;'@()*[]") 

61 return "%s%s" % (self._p if include_prefix else "", url_string) 

62 except: 

63 # Any error in processing the URL will return None 

64 return None 

65 

66 def syntax_ok(self, id_string): 

67 if id_string.startswith(self._p): 

68 id_string = id_string[len(self._p):] 

69 return True if validators.url(self._scheme_https + id_string) else False 

70 

71 def exists(self, url_full, get_extra_info=False, allow_extra_api=None): 

72 valid_bool = True 

73 if self._use_api_service: 

74 url = self.normalise(url_full) 

75 if url is not None: 

76 variations = [ 

77 f"https://www.{url}", 

78 f"https://{url}", 

79 f"http://www.{url}", 

80 f"http://{url}" 

81 ] 

82 

83 for variation in variations: 

84 tentative = 3 

85 while tentative: 

86 tentative -= 1 

87 try: 

88 r = get(variation, 

89 headers=self._headers, 

90 timeout=30, 

91 ) 

92 if r.status_code == 200: 

93 if get_extra_info: 

94 return True, {"valid": True} 

95 return True 

96 elif r.status_code == 404: 

97 if get_extra_info: 

98 return False, {"valid": False} 

99 return False 

100 

101 except ReadTimeout: 

102 # Do nothing, just try again 

103 pass 

104 except ConnectionError: 

105 # Sleep 5 seconds, then try again 

106 sleep(5) 

107 

108 valid_bool = False 

109 

110 else: 

111 if get_extra_info: 

112 return False, {"valid": False} 

113 return False 

114 if get_extra_info: 

115 return valid_bool, {"valid": valid_bool} 

116 return valid_bool 

117 

118 def extra_info(self, api_response, choose_api=None, info_dict={}): 

119 result = {} 

120 result["valid"] = True 

121 return result