Coverage for oc_ds_converter / oc_idmanager / url.py: 72%
87 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023-2024 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
6import urllib.parse
7from time import sleep
9import validators
10from oc_ds_converter.oc_idmanager import *
11from oc_ds_converter.oc_idmanager.base import IdentifierManager
12from requests import ReadTimeout, get
13from requests.exceptions import ConnectionError
16class URLManager(IdentifierManager):
17 """This class implements an identifier manager for url identifier"""
19 def __init__(self, data={}, use_api_service=True):
20 """URL manager constructor."""
21 super(URLManager, self).__init__()
22 self._use_api_service = use_api_service
23 self._p = "url:"
24 self._data = data
25 self._scheme_https = "https://"
26 self._scheme_http = "http://"
28 def is_valid(self, url, get_extra_info=False):
29 url = self.normalise(url, include_prefix=True)
30 if url is None:
31 return False
32 else:
33 if url not in self._data or self._data[url] is None:
34 if get_extra_info:
35 info = self.exists(url, get_extra_info=True)
36 self._data[url] = info[1]
37 return (info[0] and self.syntax_ok(url)), info[1]
38 self._data[url] = dict()
39 self._data[url]["valid"] = True if (self.syntax_ok(url) and self.exists(url)) else False
40 return self._data[url].get("valid")
42 if get_extra_info:
43 return self._data[url].get("valid"), self._data[url]
44 return self._data[url].get("valid")
46 def normalise(self, id_string, include_prefix=False):
47 id_string = str(id_string)
48 url_string = id_string.strip()
49 if url_string.startswith(self._p):
50 url_string = url_string[len(self._p):]
51 if url_string.endswith("/"):
52 url_string = url_string[:-1]
53 if url_string.startswith("https://"):
54 url_string = url_string[len("https://"):]
55 elif url_string.startswith("http://"):
56 url_string = url_string[len("http://"):]
57 if url_string.startswith("www."):
58 url_string = url_string[len("www."):]
59 try:
60 url_string = urllib.parse.quote(url_string, safe="%/:=&?~#+!$,;'@()*[]")
61 return "%s%s" % (self._p if include_prefix else "", url_string)
62 except:
63 # Any error in processing the URL will return None
64 return None
66 def syntax_ok(self, id_string):
67 if id_string.startswith(self._p):
68 id_string = id_string[len(self._p):]
69 return True if validators.url(self._scheme_https + id_string) else False
71 def exists(self, url_full, get_extra_info=False, allow_extra_api=None):
72 valid_bool = True
73 if self._use_api_service:
74 url = self.normalise(url_full)
75 if url is not None:
76 variations = [
77 f"https://www.{url}",
78 f"https://{url}",
79 f"http://www.{url}",
80 f"http://{url}"
81 ]
83 for variation in variations:
84 tentative = 3
85 while tentative:
86 tentative -= 1
87 try:
88 r = get(variation,
89 headers=self._headers,
90 timeout=30,
91 )
92 if r.status_code == 200:
93 if get_extra_info:
94 return True, {"valid": True}
95 return True
96 elif r.status_code == 404:
97 if get_extra_info:
98 return False, {"valid": False}
99 return False
101 except ReadTimeout:
102 # Do nothing, just try again
103 pass
104 except ConnectionError:
105 # Sleep 5 seconds, then try again
106 sleep(5)
108 valid_bool = False
110 else:
111 if get_extra_info:
112 return False, {"valid": False}
113 return False
114 if get_extra_info:
115 return valid_bool, {"valid": valid_bool}
116 return valid_bool
118 def extra_info(self, api_response, choose_api=None, info_dict={}):
119 result = {}
120 result["valid"] = True
121 return result