Coverage for src/api/indexapi_v2.py: 100%
88 statements
« prev ^ index » next coverage.py v7.10.0, created at 2026-04-03 13:54 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2026-04-03 13:54 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2023, Silvio Peroni <essepuntato@gmail.com>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17__author__ = 'Arcangelo Massari & Ivan Heibi'
19from requests import RequestException, post
20from json import loads
21from indexapi_common import (
22 lower, # noqa: F401 - used by ramose via getattr
23 env_config,
24 get_unique_brs_metadata,
25 get_pub_date,
26 get_source,
27 get_author,
28 get_id_val,
29 cit_journal_sc,
30 cit_author_sc,
31 cit_duration,
32)
35def id2omids(s):
36 if "omid" in s:
37 return s.replace("omid:br/","<https://w3id.org/oc/meta/br/") +">",
38 return __get_omid_of(s),
40def count_unique_cits(res, *args):
41 header = res[0]
42 citing_idx = header.index(args[1])
43 cited_idx = header.index(args[2])
44 set_oci = set()
46 # build
47 if len(res) > 1:
48 citing_to_dedup = []
49 cited_to_dedup = []
50 for row in res[1:]:
51 citing_to_dedup.extend(row[citing_idx])
52 cited_to_dedup.extend(row[cited_idx])
54 citing_to_dedup_meta = get_unique_brs_metadata( list(set(citing_to_dedup)), ids_only=True )
55 cited_to_dedup_meta = get_unique_brs_metadata( list(set(cited_to_dedup)), ids_only=True )
56 for _k_citing in citing_to_dedup_meta.keys():
57 for _k_cited in cited_to_dedup_meta.keys():
58 set_oci.add( (_k_citing,_k_cited) )
60 return [["count"],[ len( set_oci ) ]], True
62# args must contain the <citing> and <cited>
63def citations_info(res, *args):
65 header = res[0]
66 citing_idx = header.index(args[1])
67 cited_idx = header.index(args[2])
69 # build
70 f_res = [
71 ["oci", "citing", "cited", "creation", "timespan", "journal_sc", "author_sc"]
72 ]
74 if len(res) > 1:
75 citing_to_dedup = []
76 cited_to_dedup = []
77 for row in res[1:]:
78 citing_to_dedup.extend(row[citing_idx])
79 cited_to_dedup.extend(row[cited_idx])
81 citing_to_dedup_meta = get_unique_brs_metadata( list(set(citing_to_dedup)) )
82 cited_to_dedup_meta = get_unique_brs_metadata( list(set(cited_to_dedup)) )
84 for citing_entity in citing_to_dedup_meta:
85 for cited_entity in cited_to_dedup_meta:
87 _citing = citing_to_dedup_meta[citing_entity]
88 _cited = cited_to_dedup_meta[cited_entity]
90 res_row = [
91 # oci value
92 get_id_val(citing_entity)+"-"+get_id_val(cited_entity),
93 # citing
94 __get_all_pids(_citing,citing_entity),
95 # cited
96 __get_all_pids(_cited,cited_entity),
97 # creation = citing[pub_date]
98 get_pub_date(_citing),
99 # timespan = citing[pub_date] - cited[pub_date]
100 cit_duration(get_pub_date(_citing),get_pub_date(_cited)),
101 # journal_sc = compare citing[source_id] and cited[source_id]
102 cit_journal_sc(get_source(_citing),get_source(_cited)),
103 # author_sc = compare citing[source_id] and cited[source_id]
104 cit_author_sc(get_author(_citing),get_author(_cited))
105 ]
106 f_res.append(res_row)
108 return f_res, True
110# args must contain the <count>
111def sum_all(res, *args):
113 header = res[0]
114 count_idx = header.index(args[0])
116 tot_count = 0
117 for row in res[1:]:
118 tot_count += int(row[count_idx][1])
120 res = [header, [str(tot_count)]]
121 return res, True
124# ---
125# Local methods
126# ---
128def __get_omid_of(s):
129 MULTI_VAL_MAX = 9000
130 sparql_endpoint = env_config["sparql_endpoint_meta"]
132 # SPARQL query
133 is_journal = False
134 br_pre_l = ["doi","issn","isbn","pmid","pmcid","url","wikidata","wikipedia","jid","arxiv"]
135 for br_pre in br_pre_l:
136 if s.startswith(br_pre+":"):
137 s = s.replace(br_pre+":","")
138 # check if is journal
139 is_journal = br_pre in ["issn"]
140 break
142 sparql_query = """
143 PREFIX datacite: <http://purl.org/spar/datacite/>
144 PREFIX literal: <http://www.essepuntato.it/2010/06/literalreification/>
145 SELECT ?br {
146 { ?identifier literal:hasLiteralValue '"""+s+"""'^^<http://www.w3.org/2001/XMLSchema#string>. }
147 UNION
148 {?identifier literal:hasLiteralValue '"""+s+"""'.}
149 ?br datacite:hasIdentifier ?identifier
150 }
151 """
153 # in case is a journal the SAPRQL query retrieves all associated BRs
154 if is_journal:
155 sparql_query = """
156 PREFIX datacite: <http://purl.org/spar/datacite/>
157 PREFIX literal: <http://www.essepuntato.it/2010/06/literalreification/>
158 PREFIX ns1: <http://purl.org/vocab/frbr/core#>
159 PREFIX fabio: <http://purl.org/spar/fabio/>
160 SELECT ?br {
161 { ?identifier literal:hasLiteralValue '"""+s+"""'^^<http://www.w3.org/2001/XMLSchema#string>. }
162 UNION
163 {?identifier literal:hasLiteralValue '"""+s+"""'.}
164 ?venue datacite:hasIdentifier ?identifier .
165 {?br ns1:partOf ?venue .}
166 UNION { ?br ns1:partOf/ns1:partOf ?venue . }
167 UNION { ?br ns1:partOf/ns1:partOf/ns1:partOf ?venue . }
168 UNION { ?br ns1:partOf/ns1:partOf/ns1:partOf/ns1:partOf ?venue .}
169 ?br a fabio:JournalArticle .
170 }
171 """
173 headers = {"Accept": "application/sparql-results+json", "Content-Type": "application/sparql-query"}
174 try:
175 response = post(sparql_endpoint, headers=headers, data=sparql_query, timeout=45)
176 response.raise_for_status()
177 except RequestException:
178 return ""
179 r = loads(response.text)
180 results = r["results"]["bindings"]
181 omid_l = [elem["br"]["value"].split("meta/br/")[1] for elem in results]
183 if len(omid_l) == 0:
184 return ""
186 sparql_values = []
187 for i in range(0, len(omid_l), MULTI_VAL_MAX):
188 sparql_values.append( " ".join(["<https://w3id.org/oc/meta/br/"+e+">" for e in omid_l[i:i + MULTI_VAL_MAX]]) )
189 return sparql_values
191def __get_all_pids(elem, uri_omid):
192 str_omid = "omid:br/"+get_id_val(uri_omid)
193 str_ids = [str_omid]
194 if "ids" in elem:
195 for id in elem["ids"].split(" __ "):
196 str_ids.append(id)
198 return " ".join(str_ids)