Coverage for src/api/indexapi_v2.py: 100%

88 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2026-04-03 13:54 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2023, Silvio Peroni <essepuntato@gmail.com> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17__author__ = 'Arcangelo Massari & Ivan Heibi' 

18 

19from requests import RequestException, post 

20from json import loads 

21from indexapi_common import ( 

22 lower, # noqa: F401 - used by ramose via getattr 

23 env_config, 

24 get_unique_brs_metadata, 

25 get_pub_date, 

26 get_source, 

27 get_author, 

28 get_id_val, 

29 cit_journal_sc, 

30 cit_author_sc, 

31 cit_duration, 

32) 

33 

34 

35def id2omids(s): 

36 if "omid" in s: 

37 return s.replace("omid:br/","<https://w3id.org/oc/meta/br/") +">", 

38 return __get_omid_of(s), 

39 

40def count_unique_cits(res, *args): 

41 header = res[0] 

42 citing_idx = header.index(args[1]) 

43 cited_idx = header.index(args[2]) 

44 set_oci = set() 

45 

46 # build 

47 if len(res) > 1: 

48 citing_to_dedup = [] 

49 cited_to_dedup = [] 

50 for row in res[1:]: 

51 citing_to_dedup.extend(row[citing_idx]) 

52 cited_to_dedup.extend(row[cited_idx]) 

53 

54 citing_to_dedup_meta = get_unique_brs_metadata( list(set(citing_to_dedup)), ids_only=True ) 

55 cited_to_dedup_meta = get_unique_brs_metadata( list(set(cited_to_dedup)), ids_only=True ) 

56 for _k_citing in citing_to_dedup_meta.keys(): 

57 for _k_cited in cited_to_dedup_meta.keys(): 

58 set_oci.add( (_k_citing,_k_cited) ) 

59 

60 return [["count"],[ len( set_oci ) ]], True 

61 

62# args must contain the <citing> and <cited> 

63def citations_info(res, *args): 

64 

65 header = res[0] 

66 citing_idx = header.index(args[1]) 

67 cited_idx = header.index(args[2]) 

68 

69 # build 

70 f_res = [ 

71 ["oci", "citing", "cited", "creation", "timespan", "journal_sc", "author_sc"] 

72 ] 

73 

74 if len(res) > 1: 

75 citing_to_dedup = [] 

76 cited_to_dedup = [] 

77 for row in res[1:]: 

78 citing_to_dedup.extend(row[citing_idx]) 

79 cited_to_dedup.extend(row[cited_idx]) 

80 

81 citing_to_dedup_meta = get_unique_brs_metadata( list(set(citing_to_dedup)) ) 

82 cited_to_dedup_meta = get_unique_brs_metadata( list(set(cited_to_dedup)) ) 

83 

84 for citing_entity in citing_to_dedup_meta: 

85 for cited_entity in cited_to_dedup_meta: 

86 

87 _citing = citing_to_dedup_meta[citing_entity] 

88 _cited = cited_to_dedup_meta[cited_entity] 

89 

90 res_row = [ 

91 # oci value 

92 get_id_val(citing_entity)+"-"+get_id_val(cited_entity), 

93 # citing 

94 __get_all_pids(_citing,citing_entity), 

95 # cited 

96 __get_all_pids(_cited,cited_entity), 

97 # creation = citing[pub_date] 

98 get_pub_date(_citing), 

99 # timespan = citing[pub_date] - cited[pub_date] 

100 cit_duration(get_pub_date(_citing),get_pub_date(_cited)), 

101 # journal_sc = compare citing[source_id] and cited[source_id] 

102 cit_journal_sc(get_source(_citing),get_source(_cited)), 

103 # author_sc = compare citing[source_id] and cited[source_id] 

104 cit_author_sc(get_author(_citing),get_author(_cited)) 

105 ] 

106 f_res.append(res_row) 

107 

108 return f_res, True 

109 

110# args must contain the <count> 

111def sum_all(res, *args): 

112 

113 header = res[0] 

114 count_idx = header.index(args[0]) 

115 

116 tot_count = 0 

117 for row in res[1:]: 

118 tot_count += int(row[count_idx][1]) 

119 

120 res = [header, [str(tot_count)]] 

121 return res, True 

122 

123 

124# --- 

125# Local methods 

126# --- 

127 

128def __get_omid_of(s): 

129 MULTI_VAL_MAX = 9000 

130 sparql_endpoint = env_config["sparql_endpoint_meta"] 

131 

132 # SPARQL query 

133 is_journal = False 

134 br_pre_l = ["doi","issn","isbn","pmid","pmcid","url","wikidata","wikipedia","jid","arxiv"] 

135 for br_pre in br_pre_l: 

136 if s.startswith(br_pre+":"): 

137 s = s.replace(br_pre+":","") 

138 # check if is journal 

139 is_journal = br_pre in ["issn"] 

140 break 

141 

142 sparql_query = """ 

143 PREFIX datacite: <http://purl.org/spar/datacite/> 

144 PREFIX literal: <http://www.essepuntato.it/2010/06/literalreification/> 

145 SELECT ?br { 

146 { ?identifier literal:hasLiteralValue '"""+s+"""'^^<http://www.w3.org/2001/XMLSchema#string>. } 

147 UNION 

148 {?identifier literal:hasLiteralValue '"""+s+"""'.} 

149 ?br datacite:hasIdentifier ?identifier 

150 } 

151 """ 

152 

153 # in case is a journal the SAPRQL query retrieves all associated BRs 

154 if is_journal: 

155 sparql_query = """ 

156 PREFIX datacite: <http://purl.org/spar/datacite/> 

157 PREFIX literal: <http://www.essepuntato.it/2010/06/literalreification/> 

158 PREFIX ns1: <http://purl.org/vocab/frbr/core#> 

159 PREFIX fabio: <http://purl.org/spar/fabio/> 

160 SELECT ?br { 

161 { ?identifier literal:hasLiteralValue '"""+s+"""'^^<http://www.w3.org/2001/XMLSchema#string>. } 

162 UNION 

163 {?identifier literal:hasLiteralValue '"""+s+"""'.} 

164 ?venue datacite:hasIdentifier ?identifier . 

165 {?br ns1:partOf ?venue .} 

166 UNION { ?br ns1:partOf/ns1:partOf ?venue . } 

167 UNION { ?br ns1:partOf/ns1:partOf/ns1:partOf ?venue . } 

168 UNION { ?br ns1:partOf/ns1:partOf/ns1:partOf/ns1:partOf ?venue .} 

169 ?br a fabio:JournalArticle . 

170 } 

171 """ 

172 

173 headers = {"Accept": "application/sparql-results+json", "Content-Type": "application/sparql-query"} 

174 try: 

175 response = post(sparql_endpoint, headers=headers, data=sparql_query, timeout=45) 

176 response.raise_for_status() 

177 except RequestException: 

178 return "" 

179 r = loads(response.text) 

180 results = r["results"]["bindings"] 

181 omid_l = [elem["br"]["value"].split("meta/br/")[1] for elem in results] 

182 

183 if len(omid_l) == 0: 

184 return "" 

185 

186 sparql_values = [] 

187 for i in range(0, len(omid_l), MULTI_VAL_MAX): 

188 sparql_values.append( " ".join(["<https://w3id.org/oc/meta/br/"+e+">" for e in omid_l[i:i + MULTI_VAL_MAX]]) ) 

189 return sparql_values 

190 

191def __get_all_pids(elem, uri_omid): 

192 str_omid = "omid:br/"+get_id_val(uri_omid) 

193 str_ids = [str_omid] 

194 if "ids" in elem: 

195 for id in elem["ids"].split(" __ "): 

196 str_ids.append(id) 

197 

198 return " ".join(str_ids)