Coverage for src/api/indexapi_common.py: 100%

107 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2026-04-03 13:54 +0000

1import json 

2import os 

3from requests import RequestException, post 

4from json import loads 

5from datetime import datetime 

6from dateutil.relativedelta import relativedelta 

7 

8with open("conf.json") as f: 

9 c = json.load(f) 

10 

11env_config = { 

12 "base_url": os.getenv("BASE_URL", c["base_url"]), 

13 "sparql_endpoint_index": os.getenv("SPARQL_ENDPOINT_INDEX", c["sparql_endpoint_index"]), 

14 "sparql_endpoint_meta": os.getenv("SPARQL_ENDPOINT_META", c["sparql_endpoint_meta"]), 

15 "sync_enabled": os.getenv("SYNC_ENABLED", "false").lower() == "true" 

16} 

17 

18 

19def lower(s): 

20 return s.lower(), 

21 

22 

23def br_meta_metadata(values): 

24 sparql_endpoint = env_config["sparql_endpoint_meta"] 

25 

26 sparql_query = """ 

27 PREFIX pro: <http://purl.org/spar/pro/> 

28 PREFIX frbr: <http://purl.org/vocab/frbr/core#> 

29 PREFIX fabio: <http://purl.org/spar/fabio/> 

30 PREFIX datacite: <http://purl.org/spar/datacite/> 

31 PREFIX literal: <http://www.essepuntato.it/2010/06/literalreification/> 

32 PREFIX prism: <http://prismstandard.org/namespaces/basic/2.0/> 

33 SELECT DISTINCT ?val ?pubDate (GROUP_CONCAT(DISTINCT ?id; SEPARATOR=' __ ') AS ?ids) (GROUP_CONCAT(?venue; separator="; ") as ?source) (GROUP_CONCAT(?raAuthor; separator="; ") as ?author) 

34 WHERE { 

35 VALUES ?val { """ + " ".join(values) + """ } 

36 OPTIONAL { ?val prism:publicationDate ?pubDate. } 

37 OPTIONAL { 

38 ?val datacite:hasIdentifier ?identifier. 

39 ?identifier datacite:usesIdentifierScheme ?scheme; 

40 literal:hasLiteralValue ?literalValue. 

41 BIND(CONCAT(STRAFTER(STR(?scheme), "http://purl.org/spar/datacite/"), ":", ?literalValue) AS ?id) 

42 } 

43 OPTIONAL { 

44 ?val a fabio:JournalArticle; 

45 frbr:partOf+ ?venue. 

46 ?venue a fabio:Journal. 

47 } 

48 OPTIONAL { 

49 ?val frbr:partOf ?venue. 

50 } 

51 OPTIONAL { 

52 ?val pro:isDocumentContextFor ?arAuthor. 

53 ?arAuthor pro:withRole pro:author; 

54 pro:isHeldBy ?raAuthor. 

55 } 

56 } GROUP BY ?val ?pubDate 

57 """ 

58 

59 headers = {"Accept": "application/sparql-results+json", "Content-Type": "application/sparql-query"} 

60 

61 try: 

62 response = post(sparql_endpoint, headers=headers, data=sparql_query) 

63 response.raise_for_status() 

64 except RequestException: 

65 return {}, [] 

66 r = loads(response.text) 

67 results = r["results"]["bindings"] 

68 res_json = {elem["val"]["value"]: elem for elem in results} 

69 return res_json, ["val", "pubDate", "ids", "source", "author"] 

70 

71 

72def br_meta_anyids(values): 

73 sparql_endpoint = env_config["sparql_endpoint_meta"] 

74 

75 sparql_query = """ 

76 PREFIX datacite: <http://purl.org/spar/datacite/> 

77 PREFIX literal: <http://www.essepuntato.it/2010/06/literalreification/> 

78 SELECT DISTINCT ?val (GROUP_CONCAT(DISTINCT ?id; SEPARATOR=' __ ') AS ?ids) 

79 WHERE { 

80 VALUES ?val { """ + " ".join(values) + """ } 

81 OPTIONAL { 

82 ?val datacite:hasIdentifier ?identifier. 

83 ?identifier datacite:usesIdentifierScheme ?scheme; 

84 literal:hasLiteralValue ?literalValue. 

85 BIND(CONCAT(STRAFTER(STR(?scheme), "http://purl.org/spar/datacite/"), ":", ?literalValue) AS ?id) 

86 } 

87 } GROUP BY ?val 

88 """ 

89 

90 headers = {"Accept": "application/sparql-results+json", "Content-Type": "application/sparql-query"} 

91 

92 try: 

93 response = post(sparql_endpoint, headers=headers, data=sparql_query) 

94 response.raise_for_status() 

95 except RequestException: 

96 return {}, [] 

97 r = loads(response.text) 

98 results = r["results"]["bindings"] 

99 res_json = {elem["val"]["value"]: elem for elem in results} 

100 return res_json, ["val", "ids"] 

101 

102 

103def get_unique_brs_metadata(l_url_brs, ids_only=False): 

104 res: list[list[str]] = [] 

105 l_brs = ["<" + _url_br + ">" for _url_br in l_url_brs] 

106 

107 fetch = br_meta_anyids if ids_only else br_meta_metadata 

108 i = 0 

109 chunk_size = 3000 

110 brs_meta: dict[str, dict[str, dict[str, str]]] = {} 

111 while i < len(l_brs): 

112 chunk = l_brs[i:i + chunk_size] 

113 m_br = fetch(chunk) 

114 brs_meta.update(m_br[0]) 

115 if i == 0: 

116 res.append(m_br[1]) 

117 i += chunk_size 

118 

119 unique_brs_anyid: list[set[str]] = [] 

120 for k_val in brs_meta.values(): 

121 br_ids = k_val["ids"]["value"] 

122 if br_ids: 

123 s = set(br_ids.split(" __ ")) 

124 _c_intersection = 0 

125 for __unique in unique_brs_anyid: 

126 _c_intersection += len(__unique.intersection(s)) 

127 if _c_intersection == 0: 

128 unique_brs_anyid.append(s) 

129 br_values = [k_val[k]["value"] if k in k_val else "" for k in res[0]] 

130 res.append(br_values) 

131 

132 f_res = {} 

133 for row in res[1:]: 

134 f_res[row[0]] = {k_val: row[i] for i, k_val in enumerate(res[0])} 

135 

136 return f_res 

137 

138 

139def get_pub_date(elem): 

140 return elem["pubDate"] 

141 

142 

143def get_source(elem): 

144 return elem["source"].split("; ") 

145 

146 

147def get_author(elem): 

148 return elem["author"].split("; ") 

149 

150 

151def get_id_val(val): 

152 return val.replace("https://w3id.org/oc/meta/br/", "") 

153 

154 

155def cit_journal_sc(citing_source_ids, cited_source_ids): 

156 if len(set(citing_source_ids).intersection(set(cited_source_ids))) > 0: 

157 return "yes" 

158 return "no" 

159 

160 

161def cit_author_sc(citing_authors, cited_authors): 

162 if len(set(citing_authors).intersection(set(cited_authors))) > 0: 

163 return "yes" 

164 return "no" 

165 

166 

167def cit_duration(citing_complete_pub_date, cited_complete_pub_date): 

168 

169 def _contains_years(date): 

170 return date is not None and len(date) >= 4 

171 

172 def _contains_months(date): 

173 return date is not None and len(date) >= 7 

174 

175 def _contains_days(date): 

176 return date is not None and len(date) >= 10 

177 

178 consider_years = _contains_years(citing_complete_pub_date) and _contains_years(cited_complete_pub_date) 

179 consider_months = _contains_months(citing_complete_pub_date) and _contains_months(cited_complete_pub_date) 

180 consider_days = _contains_days(citing_complete_pub_date) and _contains_days(cited_complete_pub_date) 

181 

182 if not consider_years: 

183 return "" 

184 citing_pub_datetime = datetime.strptime((citing_complete_pub_date + "-01-01")[:10], "%Y-%m-%d") 

185 cited_pub_datetime = datetime.strptime((cited_complete_pub_date + "-01-01")[:10], "%Y-%m-%d") 

186 

187 delta = relativedelta(citing_pub_datetime, cited_pub_datetime) 

188 

189 result = "" 

190 if ( 

191 delta.years < 0 

192 or (delta.years == 0 and delta.months < 0 and consider_months) 

193 or ( 

194 delta.years == 0 

195 and delta.months == 0 

196 and delta.days < 0 

197 and consider_days 

198 ) 

199 ): 

200 result += "-" 

201 result += "P%sY" % abs(delta.years) 

202 

203 if consider_months: 

204 result += "%sM" % abs(delta.months) 

205 

206 if consider_days: 

207 result += "%sD" % abs(delta.days) 

208 

209 return result