Coverage for ramose / operation.py: 99%
696 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-15 15:58 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-15 15:58 +0000
1# SPDX-FileCopyrightText: 2018-2021 Silvio Peroni <silvio.peroni@unibo.it>
2# SPDX-FileCopyrightText: 2020-2021 Marilena Daquino <marilena.daquino2@unibo.it>
3# SPDX-FileCopyrightText: 2022 Davide Brembilla
4# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it>
5# SPDX-FileCopyrightText: 2025 Sergei Slinkin
6# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
7#
8# SPDX-License-Identifier: ISC
10import time
11from csv import DictReader, reader, writer
12from io import StringIO
13from itertools import product
14from json import dumps
15from math import ceil
16from operator import eq, gt, itemgetter, lt
17from re import findall, match, search, sub
18from urllib.parse import parse_qs, quote, urlsplit
20from requests.exceptions import RequestException
22from ramose._constants import DEFAULT_HTTP_TIMEOUT, FIELD_TYPE_RE, _http_session
23from ramose.datatype import DataType
24from ramose.paging import PaginationInfo, build_link_header, build_pagination_info
27class Operation:
28 def __init__(
29 self,
30 op_complete_url,
31 op_key,
32 i,
33 tp,
34 sparql_http_method,
35 addon,
36 format_map=None,
37 sources_map=None,
38 engine="sparql",
39 custom_params=None,
40 disabled_params=None,
41 cache=None,
42 default_cache_ttl=86400,
43 ):
44 """This class is responsible for materialising a API operation to be run against a SPARQL endpoint
45 (or, depending on configuration, through the SPARQL.Anything engine).
47 It takes in input a full URL referring to a call to an operation (parameter 'op_complete_url'),
48 the particular shape representing an operation (parameter 'op_key'), the definition (in JSON) of such
49 operation (parameter 'i'), the URL of the triplestore to contact (parameter 'tp'), the HTTP method
50 to use for the SPARQL request (parameter 'sparql_http_method', set to either 'get' or 'post'), the path
51 of the Python file which defines additional functions for use in the operation (parameter 'addon'), and formats
52 with the names of the corresponding functions responsible for converting CSV data into the specified formats
53 (parameter 'format').
54 It also accepts a mapping of named sources to endpoint URLs referenced by @@with directives
55 (parameter 'sources_map') and the engine identifier selecting the execution
56 backend (parameter 'engine')."""
57 self.url_parsed = urlsplit(op_complete_url)
58 self.op_url = self.url_parsed.path
59 self.op = op_key
60 self.i = i
61 self.tp = tp
62 self.sparql_http_method = sparql_http_method
63 self.addon = addon
64 self.format = format_map or {}
65 self.sources_map = sources_map or {}
66 self.engine = engine
67 self.custom_params = custom_params or {}
68 self.disabled_params = disabled_params or set()
69 self._sa_engine = None
70 self._cache = cache
71 self._default_cache_ttl = default_cache_ttl
72 self.pagination_info: PaginationInfo | None = None
74 self.operation = {"=": eq, "<": lt, ">": gt}
76 self.dt = DataType()
78 @staticmethod
79 def get_content_type(ct):
80 """It returns the mime type of a given textual representation of a format, being it either
81 'csv' or 'json."""
82 content_type = ct
84 if ct == "csv":
85 content_type = "text/csv"
86 elif ct == "json":
87 content_type = "application/json"
89 return content_type
91 def conv(self, s, query_string, c_type="text/csv"):
92 """This method takes a string representing a CSV document and converts it in the requested format according
93 to what content type is specified as input."""
95 content_type = Operation.get_content_type(c_type)
96 if self.pagination_info is not None:
97 request_url = self.pagination_info.self_url
98 elif self.url_parsed.query:
99 request_url = f"{self.op_url}?{self.url_parsed.query}"
100 else:
101 request_url = self.op_url
103 if "format" in query_string and "format" not in self.disabled_params:
104 req_formats = query_string["format"]
106 for req_format in req_formats:
107 content_type = Operation.get_content_type(req_format)
109 if req_format in self.format:
110 converter_func = getattr(self.addon, self.format[req_format])
111 return converter_func(s, request_url=request_url), content_type
112 elif "default_format" in self.i:
113 default_fmt = self.i["default_format"].strip()
114 content_type = Operation.get_content_type(default_fmt)
115 if default_fmt in self.format:
116 converter_func = getattr(self.addon, self.format[default_fmt])
117 return converter_func(s, request_url=request_url), content_type
119 if content_type not in ("text/csv", "application/json"):
120 content_type = "text/csv"
122 if "application/json" in content_type:
123 with StringIO(s) as f:
124 r = [dict(i) for i in DictReader(f)]
126 if "json" not in self.disabled_params:
127 r = Operation.structured(query_string, r)
129 return dumps(r, ensure_ascii=False, indent=4), content_type
130 else:
131 return s, content_type
133 @staticmethod
134 def pv(i, r=None):
135 """This method returns the plain value of a particular item 'i' of the result returned by the SPARQL query.
137 In case 'r' is specified (i.e. a row containing a set of results), then 'i' must be the index of the item
138 within that row."""
139 if r is None:
140 return i[1]
141 return Operation.pv(r[i])
143 @staticmethod
144 def tv(i, r=None):
145 """This method returns the typed value of a particular item 'i' of the result returned by the SPARQL query.
146 The type associated to that value is actually specified by means of the particular configuration provided
147 in the specification file of the API - field 'field_type'.
149 In case 'r' is specified (i.e. a row containing a set of results), then 'i' must be the index of the item
150 within that row."""
151 if r is None:
152 return i[0]
153 return Operation.tv(r[i])
155 @staticmethod
156 def do_overlap(r1, r2):
157 """This method returns a boolean that says if the two ranges (i.e. two pairs of integers) passed as inputs
158 actually overlap one with the other."""
159 r1_s, r1_e = r1
160 r2_s, r2_e = r2
162 return r1_s <= r2_s <= r1_e or r2_s <= r1_s <= r2_e
164 @staticmethod
165 def get_item_in_dict(d_or_l, key_list, prev=None):
166 """This method takes as input a dictionary or a list of dictionaries and browses it until the value
167 specified following the chain indicated in 'key_list' is not found. It returns a list of all the
168 values that matched with such search."""
169 res = [] if prev is None else prev.copy()
171 d_list = [d_or_l] if isinstance(d_or_l, dict) else d_or_l
173 for d in d_list:
174 key_list_len = len(key_list)
176 if key_list_len >= 1:
177 key = key_list[0]
178 if key in d:
179 if key_list_len == 1:
180 res.append(d[key])
181 else:
182 res = Operation.get_item_in_dict(d[key], key_list[1:], res)
184 return res
186 @staticmethod
187 def add_item_in_dict(d_or_l, key_list, item, idx):
188 """This method takes as input a dictionary or a list of dictionaries, browses it until the value
189 specified following the chain indicated in 'key_list' is not found, and then substitutes it with 'item'.
190 In case the final object retrieved is a list, it selects the object in position 'idx' before the
191 substitution."""
192 key_list_len = len(key_list)
194 if key_list_len >= 1:
195 key = key_list[0]
197 if isinstance(d_or_l, list):
198 if key_list_len == 1:
199 d_or_l[idx][key] = item
200 else:
201 for i in d_or_l:
202 Operation.add_item_in_dict(i, key_list, item, idx)
203 elif key in d_or_l:
204 if key_list_len == 1:
205 d_or_l[key] = item
206 else:
207 Operation.add_item_in_dict(d_or_l[key], key_list[1:], item, idx)
209 @staticmethod
210 def structured(params, json_table):
211 """This method checks if there are particular transformation rules specified in 'params' for a JSON output,
212 and convert each row of the input table ('json_table') according to these rules.
213 There are two specific rules that can be applied:
215 1. array("<separator>",<field>): it converts the string value associated to the field name '<field>' into
216 an array by splitting the various textual parts by means of '<separator>'. For instance, consider the
217 following JSON structure:
219 [
220 { "names": "Doe, John; Doe, Jane" },
221 { "names": "Doe, John; Smith, John" }
222 ]
224 Executing the rule 'array("; ",names)' returns the following new JSON structure:
226 [
227 { "names": [ "Doe, John", "Doe, Jane" ],
228 { "names": [ "Doe, John", "Smith, John" ]
229 ]
231 2. dict("separator",<field>,<new_field_1>,<new_field_2>,...): it converts the string value associated to
232 the field name '<field>' into an dictionary by splitting the various textual parts by means of
233 '<separator>' and by associating the new fields '<new_field_1>', '<new_field_2>', etc., to these new
234 parts. For instance, consider the following JSON structure:
236 [
237 { "name": "Doe, John" },
238 { "name": "Smith, John" }
239 ]
241 Executing the rule 'array(", ",name,family_name,given_name)' returns the following new JSON structure:
243 [
244 { "name": { "family_name": "Doe", "given_name: "John" } },
245 { "name": { "family_name": "Smith", "given_name: "John" } }
246 ]
248 Each of the specified rules is applied in order, and it works on the JSON structure returned after
249 the execution of the previous rule."""
250 if "json" in params:
251 fields = params["json"]
252 for field in fields:
253 ops = findall(r'([a-z]+)\(("[^"]+"),([^\)]+)\)', field)
254 for op_type, s, es in ops:
255 separator = sub('"(.+)"', "\\1", s)
256 entries = [i.strip() for i in es.split(",")]
257 keys = entries[0].split(".")
259 for row in json_table:
260 v_list = Operation.get_item_in_dict(row, keys)
261 for idx, v in enumerate(v_list):
262 if op_type == "array":
263 if isinstance(v, str):
264 Operation.add_item_in_dict(row, keys, v.split(separator) if v != "" else [], idx)
265 elif op_type == "dict":
266 new_fields = entries[1:]
267 new_fields_max_split = len(new_fields) - 1
268 if isinstance(v, str):
269 new_values = v.split(separator, new_fields_max_split)
270 Operation.add_item_in_dict(
271 row,
272 keys,
273 dict(zip(new_fields, new_values, strict=False)) if v != "" else {},
274 idx,
275 )
276 elif isinstance(v, list):
277 new_list = []
278 for i in v:
279 new_values = i.split(separator, new_fields_max_split)
280 new_list.append(dict(zip(new_fields, new_values, strict=False)))
281 Operation.add_item_in_dict(row, keys, new_list, idx)
283 return json_table
285 def preprocess(self, par_dict, op_item, addon):
286 """This method takes the a dictionary of parameters with the current typed values associated to them and
287 the item of the API specification defining the behaviour of that operation, and preprocesses the parameters
288 according to the functions specified in the '#preprocess' field (e.g. "#preprocess lower(doi)"), which is
289 applied to the specified parameters as input of the function in consideration (e.g.
290 "/api/v1/citations/10.1108/jd-12-2013-0166", converting the DOI in lowercase).
292 It is possible to run multiple functions sequentially by concatenating them with "-->" in the API
293 specification document. In this case the output of the function f_i will becomes the input operation URL
294 of the function f_i+1.
296 Finally, it is worth mentioning that all the functions specified in the "#preprocess" field must return
297 a tuple of values defining how the particular value passed in the dictionary must be changed."""
298 result = par_dict
300 if "preprocess" in op_item:
301 for pre in [sub(r"\s+", "", i) for i in op_item["preprocess"].split(" --> ")]:
302 func_name = sub(r"^([^\(\)]+)\(.+$", r"\1", pre).strip()
303 params_name = sub(r"^.+\(([^\(\)]+)\).*", r"\1", pre).split(",")
305 param_list = tuple(result[param_name] for param_name in params_name)
307 # run function
308 func = getattr(addon, func_name)
309 res = func(*param_list)
311 # substitute res to the current parameter in result
312 for idx, val in enumerate(res):
313 result[params_name[idx]] = val
315 return result
317 def postprocess(self, res, op_item, addon):
318 """This method takes the result table returned by running the SPARQL query in an API operation (specified
319 as input) and change some of such results according to the functions specified in the '#postprocess'
320 field (e.g. "#postprocess remove_date("2018")"). These functions can take parameters as input, while the first
321 unspecified parameters will be always the result table. It is worth mentioning that this result table (i.e.
322 a list of tuples) actually contains, in each cell, a tuple defining the plain value as well as the typed
323 value for enabling better comparisons and operations if needed. An example of this table of result is shown as
324 follows:
326 [
327 ("id", "date"),
328 ("my_id_1", "my_id_1"), (datetime(2018, 3, 2), "2018-03-02"),
329 ...
330 ]
332 Note that the typed value and the plain value of each cell can be selected by using the methods "tv" and "pv"
333 respectively. In addition, it is possible to run multiple functions sequentially by concatenating them
334 with "-->" in the API specification document. In this case the output of the function f_i will becomes
335 the input result table of the function f_i+1."""
336 result = res
338 if "postprocess" in op_item:
339 for post in [i.strip() for i in op_item["postprocess"].split(" --> ")]:
340 func_name = sub(r"^([^\(\)]+)\(.+$", r"\1", post).strip()
341 param_str = sub(r"^.+\(([^\(\)]*)\).*", r"\1", post)
342 params_values = () if param_str == "" else next(reader(param_str.splitlines(), skipinitialspace=True))
344 func = getattr(addon, func_name)
345 func_params = (result, *tuple(params_values))
346 result, do_type_fields = func(*func_params)
347 if do_type_fields:
348 result = self.type_fields(result, op_item)
350 return result
352 @staticmethod
353 def _apply_require(header, result, fields):
354 """Exclude rows with empty values in the specified fields."""
355 for field in fields:
356 field_idx = header.index(field)
357 result = [row for row in result if Operation.pv(field_idx, row) not in (None, "")]
358 return result
360 def _apply_filter(self, header, result, fields):
361 """Filter rows by comparison operators or regex patterns."""
362 for field in fields:
363 field_name, field_value = field.split(":", 1)
364 try:
365 field_idx = header.index(field_name)
366 flag = field_value[0]
367 if flag in ("<", ">", "="):
368 value = field_value[1:].lower()
369 result = [
370 row
371 for row in result
372 if self.operation[flag](
373 Operation.tv(field_idx, row),
374 self.dt.get_func(type(Operation.tv(field_idx, row)).__name__)(value),
375 )
376 ]
377 else:
378 pattern = field_value.lower()
379 result = [row for row in result if search(pattern, Operation.pv(field_idx, row).lower())]
380 except ValueError:
381 pass
382 return result
384 @staticmethod
385 def _apply_sort(header, result, fields):
386 """Sort rows by the specified fields and directions."""
387 for field in sorted(fields, reverse=True):
388 order_names = findall(r"^(desc|asc)\(([^\(\)]+)\)$", field)
389 if order_names:
390 direction, field_name = order_names[0]
391 else:
392 direction, field_name = "asc", field
393 try:
394 field_idx = header.index(field_name)
395 result = sorted(result, key=itemgetter(field_idx), reverse=(direction == "desc"))
396 except ValueError:
397 pass
398 return result
400 def handling_params(self, params, table):
401 """This method is used for filtering the results that are returned after the post-processing
402 phase. In particular, it is possible to:
404 1. [require=<field_name>] exclude all the rows that have an empty value in the field specified - e.g. the
405 "require=doi" remove all the rows that do not have any string specified in the "doi" field;
407 2. [filter=<field_name>:<operator><value>] consider only the rows where the string in the input field
408 is compliant with the value specified. If no operation is specified, the value is interpreted as a
409 regular expression, otherwise it is compared according to the particular type associated to that field.
410 Possible operators are "=", "<", and ">" - e.g. "filter=title:semantics?" returns all the rows that contain
411 the string "semantic" or "semantics" in the field title, while "filter=date:>2016-05" returns all the rows
412 that have a date greater than May 2016;
414 3. [sort=<order>(<field_name>)] sort all the results according to the value and type of the particular
415 field specified in input. It is possible to sort the rows either in ascending ("asc") or descending
416 ("desc") order - e.g. "sort=desc(date)" sort all the rows according to the value specified in the
417 field "date" in descending order.
419 Note that these filtering operations are applied in the order presented above - first the "require", then
420 the "filter", and finally the "sort". It is possible to specify one or more filtering operation of the
421 same kind (e.g. "require=doi&require=title").
422 """
423 header = table[0]
424 result = table[1:]
426 overridden = set(self.custom_params) | self.disabled_params
428 if ("exclude" in params or "require" in params) and "require" not in overridden and "exclude" not in overridden:
429 fields = params["exclude"] if "exclude" in params else params["require"]
430 result = self._apply_require(header, result, fields)
432 if "filter" in params and "filter" not in overridden:
433 result = self._apply_filter(header, result, params["filter"])
435 if "sort" in params and "sort" not in overridden:
436 result = self._apply_sort(header, result, params["sort"])
438 return [header, *result]
440 def type_fields(self, res, op_item):
441 """It creates a version of the results 'res' that adds, to each value of the fields, the same value interpreted
442 with the type specified in the specification file (field 'field_type'). Note that 'str' is used as default in
443 case no further specifications are provided."""
444 result = []
445 cast_func = {}
446 header = res[0]
447 for heading in header:
448 cast_func[heading] = DataType.str
450 if "field_type" in op_item:
451 for f, p in findall(FIELD_TYPE_RE, op_item["field_type"]):
452 cast_func[p] = self.dt.get_func(f)
454 for row in res[1:]:
455 new_row = []
456 for idx, heading in enumerate(header):
457 cur_value = row[idx]
458 if isinstance(cur_value, tuple):
459 cur_value = cur_value[1]
460 new_row.append((cast_func[heading](cur_value), cur_value))
461 result.append(new_row)
463 return [header, *result]
465 def remove_types(self, res):
466 """This method takes the results 'res' that include also the typed value and returns a version of such
467 results without the types that is ready to be stored on the file system."""
468 result = [res[0]]
469 result.extend(tuple(Operation.pv(idx, row) for idx in range(len(row))) for row in res[1:])
470 return result
472 @staticmethod
473 def _is_directive(line):
474 return line.strip().startswith("@@")
476 @staticmethod
477 def _parse_directive_args(tokens, param_names, defaults=None):
478 defaults = defaults or {}
479 all_names = set(param_names) | set(defaults)
480 result = {}
481 positional_index = 0
482 seen_keyword = False
484 for token in tokens:
485 if "=" in token:
486 key, value = token.split("=", 1)
487 if key in all_names:
488 if key in result:
489 raise ValueError(f"Duplicate parameter {key!r}")
490 seen_keyword = True
491 result[key] = value
492 continue
493 if seen_keyword:
494 raise ValueError(f"Positional argument {token!r} cannot follow keyword argument")
495 if positional_index >= len(param_names):
496 raise ValueError(f"Unexpected argument {token!r}")
497 result[param_names[positional_index]] = token
498 positional_index += 1
500 for name, default in defaults.items():
501 if name not in result:
502 result[name] = default
504 missing = [name for name in param_names if name not in result]
505 if missing:
506 raise ValueError(f"Missing required parameter(s): {', '.join(missing)}")
508 return result
510 def _handle_directive_with(self, parts):
511 args = Operation._parse_directive_args(parts[1:], ["source"])
512 name = args["source"]
513 if name not in self.sources_map:
514 raise ValueError(f"Unknown source '{name}' in @@with; declare it in #sources.")
515 return self.sources_map[name], None
517 @staticmethod
518 def _handle_directive_endpoint(parts):
519 args = Operation._parse_directive_args(parts[1:], ["target"])
520 return args["target"], None
522 @staticmethod
523 def _handle_directive_join(parts):
524 args = Operation._parse_directive_args(parts[1:], ["left_var", "right_var"], defaults={"type": "inner"})
525 return None, ("JOIN", args["left_var"], args["right_var"], args["type"].lower())
527 @staticmethod
528 def _handle_directive_values(parts):
529 tokens = parts[1:]
530 if not tokens:
531 raise ValueError("@@values needs at least one variable")
532 return None, ("VALUES_INJECT", tokens)
534 @staticmethod
535 def _handle_directive_foreach(parts):
536 args = Operation._parse_directive_args(parts[1:], ["variable", "placeholder"], defaults={"wait": "0"})
537 var_name = args["variable"]
538 if not var_name.startswith("?"):
539 raise ValueError(f"@@foreach variable must start with '?', got {var_name!r}")
540 try:
541 delay = float(args["wait"])
542 except ValueError:
543 raise ValueError(f"Invalid wait value in @@foreach: {args['wait']!r}") from None
544 return None, ("FOREACH", var_name, args["placeholder"], delay)
546 def _parse_steps(self, text, default_endpoint, params):
547 """
548 Returns a list of steps:
549 - ("QUERY", endpoint_url, query_text)
550 - ("JOIN", left_var, right_var, how) # how in {"inner","left"}
551 - ("REMOVE", [vars])
552 - ("VALUES_INJECT", [vars]) # @@values ?var1 ?var2 ...
553 - ("FOREACH", var_name, placeholder, delay) # @@foreach ?var placeholder [wait=N]
554 """
555 for p, v in params.items():
556 text = text.replace(f"[[{p}]]", str(v))
557 steps = []
558 cur_query = []
559 current_endpoint = default_endpoint
561 directive_handlers = {
562 "with": self._handle_directive_with,
563 "endpoint": self._handle_directive_endpoint,
564 "join": self._handle_directive_join,
565 "remove": lambda parts: (None, ("REMOVE", parts[1:])),
566 "values": self._handle_directive_values,
567 "foreach": self._handle_directive_foreach,
568 }
570 def flush_query():
571 if cur_query:
572 q = "\n".join(cur_query).strip()
573 if not q:
574 cur_query.clear()
575 return
576 for p, v in params.items():
577 q = q.replace(f"[[{p}]]", str(v))
578 steps.append(("QUERY", current_endpoint, q))
579 cur_query.clear()
581 for raw in text.splitlines():
582 line = raw.rstrip("\n")
583 if not self._is_directive(line):
584 cur_query.append(line)
585 continue
587 flush_query()
589 body = line.strip()[2:].strip()
590 parts = body.split()
591 cmd = parts[0].lower()
593 handler = directive_handlers.get(cmd)
594 if handler is None:
595 raise ValueError(f"Unknown directive @@{cmd}")
597 new_endpoint, step = handler(parts)
598 if new_endpoint is not None:
599 current_endpoint = new_endpoint
600 if step is not None:
601 steps.append(step)
603 flush_query()
604 return steps
606 def _run_sparql_dicts(self, endpoint_url, query_text):
607 """Run a SELECT query against a SPARQL endpoint and return a list of dict rows.
609 This always requests CSV and parses it via DictReader, to stay consistent
610 with RAMOSE's legacy pipeline.
611 """
612 try:
613 if self.sparql_http_method == "get":
614 r = _http_session.get(
615 endpoint_url + "?query=" + quote(query_text),
616 headers={
617 "Accept": "text/csv",
618 "User-Agent": "RAMOSE/2.0.0",
619 },
620 timeout=DEFAULT_HTTP_TIMEOUT,
621 )
622 else:
623 r = _http_session.post(
624 endpoint_url,
625 data=query_text,
626 headers={
627 "Accept": "text/csv",
628 "Content-Type": "application/sparql-query",
629 "User-Agent": "RAMOSE/2.0.0",
630 },
631 timeout=DEFAULT_HTTP_TIMEOUT,
632 )
633 except RequestException as e:
634 raise RuntimeError(f"SPARQL request failed: {e}") from e
636 r.encoding = "utf-8"
637 if r.status_code != 200:
638 raise RuntimeError(f"SPARQL {r.status_code}: {r.reason}")
639 text = r.content.decode("utf-8-sig", errors="replace")
640 list_of_lines = text.splitlines()
641 return list(DictReader(list_of_lines))
643 @staticmethod
644 def _normalize_sparql_json_resultset(result):
645 """Convert a SPARQL JSON ResultSet dict to a list of flat dicts."""
646 vars_ = result["head"].get("vars") or []
647 return [
648 {v: (b[v].get("value") if isinstance(b.get(v), dict) else b.get(v)) for v in vars_}
649 for b in result["results"].get("bindings", [])
650 ]
652 @staticmethod
653 def _normalize_columnar_dict(result):
654 """Convert a column-oriented dict {col: [values]} to a list of row dicts."""
655 cols = list(result.keys())
656 max_len = max((len(v) for v in result.values() if isinstance(v, (list, tuple))), default=0)
658 if not max_len:
659 return [result]
661 rows = []
662 for i in range(max_len):
663 row = {}
664 for c in cols:
665 v = result[c]
666 row[c] = (
667 v[i]
668 if isinstance(v, (list, tuple)) and i < len(v)
669 else (v if not isinstance(v, (list, tuple)) else None)
670 )
671 rows.append(row)
672 return rows
674 def _run_sparql_anything_dicts(self, query_text, values=None):
675 """
676 Execute a SPARQL Anything SELECT query via PySPARQL-Anything and return
677 a list of dicts (one per row), in the same shape as _run_sparql_dicts.
679 query_text: full SPARQL (Anything) query string
680 (typically containing SERVICE <x-sparql-anything:...>).
681 values: optional dict of template parameters for the query
682 (name -> value), passed to SPARQL Anything's `values=`.
683 """
684 if self._sa_engine is None:
685 import pysparql_anything # noqa: PLC0415 # type: ignore[import-not-found]
687 self._sa_engine = pysparql_anything.SparqlAnything()
689 kwargs = {"query": query_text}
690 if values:
691 kwargs["values"] = {str(k): str(v) for k, v in values.items()}
693 result = self._sa_engine.select(output_type=dict, **kwargs)
695 # Normalize to list[dict]
696 if isinstance(result, list):
697 if result and isinstance(result[0], dict):
698 return result
699 return [dict(row) for row in result]
701 if not isinstance(result, dict):
702 return [{"result": result}]
704 # Standard SPARQL JSON ResultSet shape
705 head = result.get("head")
706 results_obj = result.get("results")
707 if isinstance(head, dict) and isinstance(results_obj, dict) and "bindings" in results_obj:
708 return self._normalize_sparql_json_resultset(result)
710 # Column-oriented dict or single-row fallback
711 return self._normalize_columnar_dict(result)
713 def _run_query_dicts(self, endpoint_url, query_text):
714 """
715 Dispatch query execution to the appropriate backend, with support
716 for per-query engine selection in multi-source mode.
718 Rules:
719 - If endpoint_url is the special string "sparql-anything" (case-insensitive),
720 then always use SPARQL.ANYTHING (PySPARQL-Anything) for this query.
721 - Otherwise, fall back to the operation-level engine:
722 * engine == "sparql-anything" -> SPARQL.ANYTHING
723 * else -> standard HTTP SPARQL
724 """
726 # Per-query override: @@endpoint sparql-anything
727 if endpoint_url and str(endpoint_url).strip().lower() == "sparql-anything":
728 return self._run_sparql_anything_dicts(query_text)
730 # Default behaviour: operation-level engine
731 if self.engine == "sparql-anything":
732 return self._run_sparql_anything_dicts(query_text)
733 return self._run_sparql_dicts(endpoint_url, query_text)
735 def _inject_values_clause(self, query_text, vars_, acc_rows):
736 # build distinct tuples for requested vars from the accumulator
737 cols = [v.lstrip("?") for v in vars_]
738 tuples, seen = [], set()
739 for row in acc_rows or []:
740 tup = tuple(row.get(c, "") for c in cols)
741 if all(tup) and tup not in seen:
742 seen.add(tup)
743 tuples.append(tup)
744 if not tuples:
745 return query_text # nothing to inject
747 # format literals vs IRIs
748 def fmt(x):
749 s = str(x)
750 if s.startswith(("http://", "https://")):
751 return f"<{s}>"
752 return '"' + s.replace("\\", "\\\\").replace('"', '\\"') + '"'
754 head = "VALUES (" + " ".join(vars_) + ") {\n"
755 body = "\n".join(" (" + " ".join(fmt(v) for v in tup) + ")" for tup in tuples)
756 tail = "\n}\n"
758 i = query_text.find("{")
759 if i == -1:
760 # no WHERE brace: put VALUES at top (legal SPARQL)
761 return head + body + tail + query_text
762 j = i + 1
763 return query_text[:j] + "\n" + head + body + tail + query_text[j:]
765 @staticmethod
766 def _drop_columns(rows, vars_):
767 if not rows:
768 return rows
769 vars_set = {v.lstrip("?") for v in vars_}
770 return [{k: v for k, v in r.items() if k not in vars_set and ("?" + k) not in vars_set} for r in rows]
772 def _norm_join_key(self, v):
773 if v is None:
774 return None
775 s = str(v).strip()
776 # unify scheme for w3id IRIs (and similar)
777 if s.startswith("http://"):
778 s = "https://" + s[len("http://") :]
779 # drop a single trailing slash for stability
780 return s.removesuffix("/")
782 def _join(self, left_rows, right_rows, lkey, rkey, how="inner"):
783 """
784 Merge two row sets on lkey (from left_rows) and rkey (from right_rows).
785 - lkey/rkey may be passed as '?var' or 'var' -> we normalize to bare names.
786 - Keys are normalized with _norm_join_key (e.g., http -> https, trim slash).
787 - When 'left', all left rows are preserved even if no match on the right.
788 - Right-hand columns are copied into the merged row; collisions are avoided.
789 """
790 # 1) Normalize column names (strip leading '?')
791 lcol = lkey.lstrip("?")
792 rcol = rkey.lstrip("?")
794 left_rows = left_rows or []
795 right_rows = right_rows or []
797 # 2) Build an index for right_rows on normalized rcol values
798 rindex = {}
799 for r in right_rows:
800 rk = self._norm_join_key(r.get(rcol))
801 if rk is None:
802 continue
803 rindex.setdefault(rk, []).append(r)
805 # determine right columns to copy (excluding the join key)
806 right_cols = [c for c in (right_rows[0].keys() if right_rows else []) if c != rcol]
808 out = []
809 for left_row in left_rows:
810 lk = self._norm_join_key(left_row.get(lcol))
811 matches = rindex.get(lk, [])
812 if matches:
813 for r in matches:
814 merged = dict(left_row)
815 for c in right_cols:
816 rv = r.get(c)
817 if rv is None:
818 continue
819 if c not in merged or merged[c] in ("", None):
820 merged[c] = rv
821 else:
822 alt = f"{c}_r"
823 if alt not in merged or merged[alt] in ("", None):
824 merged[alt] = rv
825 out.append(merged)
826 elif how == "left":
827 out.append(dict(left_row))
828 return out
830 def _apply_custom_postprocess_params(self, table, q_string):
831 for param_name, param_conf in self.custom_params.items():
832 if param_conf["phase"] != "postprocess":
833 continue
834 if param_name in q_string:
835 handler = getattr(self.addon, param_conf["handler"])
836 table = handler(table, q_string[param_name])
837 return table
839 @property
840 def _cache_ttl(self):
841 if "cache_duration" in self.i:
842 return int(self.i["cache_duration"])
843 return self._default_cache_ttl
845 def _build_cache_key(self, q_string):
846 presentation_params = {"page", "page_size", "format", "json"}
847 data_params = sorted((name, values) for name, values in q_string.items() if name not in presentation_params)
848 if data_params:
849 query_string = "&".join(f"{name}={value}" for name, values in data_params for value in values)
850 return f"{self.tp}:{self.op_url}?{query_string}"
851 return f"{self.tp}:{self.op_url}"
853 def _extract_pagination_params(self, q_string):
854 if "page_size" not in q_string or "page_size" in self.disabled_params:
855 return None
856 page_size = int(q_string["page_size"][0])
857 if page_size < 1:
858 msg = f"page_size must be >= 1, got {page_size}"
859 raise ValueError(msg)
860 page = 1
861 if "page" in q_string and "page" not in self.disabled_params:
862 page = int(q_string["page"][0])
863 if page < 1:
864 msg = f"page must be >= 1, got {page}"
865 raise ValueError(msg)
866 return page, page_size
868 def _has_custom_converter(self, q_string):
869 if "format" in q_string and "format" not in self.disabled_params:
870 for req_format in q_string["format"]:
871 if req_format in self.format:
872 return True
873 elif "default_format" in self.i and self.i["default_format"].strip() in self.format:
874 return True
875 return False
877 def _paginate_and_format(self, table, q_string, content_type):
878 if self._has_custom_converter(q_string):
879 self.pagination_info = None
880 else:
881 page_params = self._extract_pagination_params(q_string)
882 if page_params is not None:
883 page, page_size = page_params
884 total_items = len(table) - 1
885 total_pages = ceil(total_items / page_size)
886 if page > total_pages:
887 msg = f"page {page} exceeds total pages {total_pages}"
888 raise ValueError(msg)
889 start = (page - 1) * page_size
890 end = start + page_size
891 table = [table[0], *table[1 + start : 1 + end]]
892 self.pagination_info = build_pagination_info(self.op_url, q_string, page, page_size, total_items)
893 else:
894 self.pagination_info = None
896 s_res = StringIO()
897 writer(s_res).writerows(table)
898 body, ctype = self.conv(s_res.getvalue(), q_string, content_type)
900 return 200, body, ctype
902 def _finalize_result(self, csv_rows, content_type):
903 """Run the shared pipeline: type fields, postprocess, filter, remove types, cache, paginate, format."""
904 q_string = parse_qs(quote(self.url_parsed.query, safe="&="))
905 res = self.type_fields(csv_rows, self.i)
906 if self.addon is not None:
907 res = self.postprocess(res, self.i, self.addon)
908 res = self.handling_params(q_string, res)
909 res = self.remove_types(res)
910 if self.custom_params:
911 res = self._apply_custom_postprocess_params(res, q_string)
912 if self._cache is not None and "cache_disable" not in self.i:
913 self._cache.set(self._build_cache_key(q_string), res, expire=self._cache_ttl)
914 return self._paginate_and_format(res, q_string, content_type)
916 @staticmethod
917 def _header_from_field_type(op_item, acc):
918 # Respect #field_type order if provided, else derive from data
919 if "field_type" in op_item:
920 # FIELD_TYPE_RE is global in this file
921 return [f for (_, f) in findall(FIELD_TYPE_RE, op_item["field_type"])]
922 # fallback to keys of first row
923 return list(acc[0].keys()) if acc else []
925 @staticmethod
926 def _to_csv_rows(header, acc):
927 rows = [header]
928 rows.extend([d.get(h, "") for h in header] for d in acc)
929 return rows
931 def _extract_params(self):
932 """Extract URL parameters and apply type conversions based on the operation spec."""
933 par_dict = {}
934 par_man = match(self.op, self.op_url).groups() # type: ignore[union-attr]
935 for idx, par in enumerate(findall("{([^{}]+)}", self.i["url"])):
936 try:
937 par_type = self.i[par].split("(")[0]
938 par_value = par_man[idx] if par_type == "str" else self.dt.get_func(par_type)(par_man[idx])
939 except KeyError:
940 par_value = par_man[idx]
941 par_dict[par] = par_value
942 return par_dict
944 def _apply_custom_preprocess_params(self, par_dict):
945 q_string = parse_qs(quote(self.url_parsed.query, safe="&="))
946 for param_name, param_conf in self.custom_params.items():
947 if param_conf["phase"] != "preprocess":
948 continue
949 if param_name in q_string:
950 handler = getattr(self.addon, param_conf["handler"])
951 par_dict.update(handler(q_string[param_name]))
952 elif param_name not in par_dict:
953 par_dict[param_name] = ""
954 for placeholder in findall(r"\[\[(\w+)\]\]", self.i["sparql"]):
955 if placeholder not in par_dict:
956 par_dict[placeholder] = ""
958 def _exec_sparql_anything_single(self, par_dict, content_type):
959 """Execute a single SPARQL Anything query and return the finalized result."""
960 query = self.i["sparql"]
961 for param, val in par_dict.items():
962 query = query.replace(f"[[{param}]]", str(val))
963 rows = self._run_sparql_anything_dicts(query)
964 header = self._header_from_field_type(self.i, rows or [])
965 csv_rows = self._to_csv_rows(header, rows or [])
966 return self._finalize_result(csv_rows, content_type)
968 def _exec_standard_sparql(self, par_dict, content_type):
969 """Execute standard SPARQL queries, handling parameter combinations via cartesian product."""
970 # Wrap scalar values in lists for cartesian product
971 par_dict = {k: v if isinstance(v, list) else [v] for k, v in par_dict.items()}
973 parameters_comb = [
974 dict(zip(par_dict.keys(), combination, strict=False)) for combination in product(*par_dict.values())
975 ]
977 # Example: {"id":"5","area":["A1","A2"]} -> [{"id":"5","area":"A1"}, {"id":"5","area":"A2"}]
979 list_of_res = []
980 include_header_line = True
981 for comb in parameters_comb:
982 query = self.i["sparql"]
983 for param, val in comb.items():
984 query = query.replace(f"[[{param}]]", str(val))
986 if self.sparql_http_method == "get":
987 r = _http_session.get(
988 self.tp + "?query=" + quote(query),
989 headers={"Accept": "text/csv"},
990 timeout=DEFAULT_HTTP_TIMEOUT,
991 )
992 else:
993 r = _http_session.post(
994 self.tp,
995 data=query,
996 headers={"Accept": "text/csv", "Content-Type": "application/sparql-query"},
997 timeout=DEFAULT_HTTP_TIMEOUT,
998 )
999 r.encoding = "utf-8"
1001 if r.status_code != 200:
1002 return r.status_code, f"HTTP status code {r.status_code}: {r.reason}", "text/plain"
1004 # Re-encode to handle non-UTF8 characters in splitlines
1005 list_of_lines = [line.decode("utf-8") for line in r.text.encode("utf-8").splitlines()]
1007 # Include the CSV header only from the first response
1008 if not include_header_line:
1009 list_of_lines = list_of_lines[1:]
1010 include_header_line = False
1012 list_of_res += list_of_lines
1014 return self._finalize_result(list(reader(list_of_res)), content_type)
1016 def _exec_foreach_query(self, endpoint_url, qtxt, var_name, placeholder, delay, acc):
1017 """Run one query per distinct value collected from the accumulator (@@foreach)."""
1018 column = var_name.lstrip("?")
1020 values = []
1021 seen = set()
1022 for row in acc or []:
1023 v = row.get(column)
1024 if v and v not in seen:
1025 seen.add(v)
1026 values.append(v)
1028 all_rows = []
1029 for idx_val, val in enumerate(values):
1030 q_one = qtxt.replace(f"[[{placeholder}]]", str(val))
1031 sub_rows = self._run_query_dicts(endpoint_url, q_one)
1032 if sub_rows:
1033 all_rows.extend(sub_rows)
1034 if delay and idx_val + 1 < len(values):
1035 time.sleep(delay)
1037 return all_rows
1039 def _exec_multi_source_query_step(self, endpoint_url, qtxt, state):
1040 """Handle a QUERY step in the multi-source pipeline."""
1041 if state["pending_foreach"] is not None:
1042 var_name, placeholder, delay = state["pending_foreach"]
1043 rows = self._exec_foreach_query(endpoint_url, qtxt, var_name, placeholder, delay, state["acc"])
1044 state["pending_foreach"] = None
1045 state["pending_values_vars"] = None
1046 else:
1047 if state["pending_values_vars"]:
1048 qtxt = self._inject_values_clause(qtxt, state["pending_values_vars"], state["acc"])
1049 state["pending_values_vars"] = None
1050 rows = self._run_query_dicts(endpoint_url, qtxt)
1052 if state["acc"] is None:
1053 state["acc"] = rows
1054 elif state["pending_join"]:
1055 lvar, rvar, how = state["pending_join"]
1056 state["acc"] = self._join(state["acc"], rows, lvar, rvar, how)
1057 state["pending_join"] = None
1058 else:
1059 raise ValueError("Multiple QUERY steps without an explicit @@join directive")
1061 def _exec_multi_source(self, par_dict, content_type):
1062 """Execute a multi-source query pipeline with @@ directives."""
1063 steps = self._parse_steps(self.i["sparql"], self.tp, par_dict)
1065 state: dict[str, object] = {
1066 "acc": None,
1067 "pending_join": None,
1068 "pending_values_vars": None,
1069 "pending_foreach": None,
1070 }
1072 for st in steps:
1073 tag = st[0]
1075 if tag == "QUERY":
1076 self._exec_multi_source_query_step(st[1], st[2], state)
1077 elif tag == "JOIN":
1078 state["pending_join"] = (st[1], st[2], st[3])
1079 elif tag == "REMOVE":
1080 state["acc"] = self._drop_columns(state["acc"] or [], st[1])
1081 elif tag == "VALUES_INJECT":
1082 state["pending_values_vars"] = st[1]
1083 elif tag == "FOREACH":
1084 state["pending_foreach"] = (st[1], st[2], st[3])
1085 else:
1086 raise RuntimeError(f"Unknown step tag {tag}")
1088 header = self._header_from_field_type(self.i, state["acc"] or [])
1089 csv_rows = self._to_csv_rows(header, state["acc"] or [])
1090 return self._finalize_result(csv_rows, content_type)
1092 @staticmethod
1093 def _format_error(sc, e, prefix=""):
1094 """Format an error response tuple with traceback line info."""
1095 tb = e.__traceback__
1096 line = tb.tb_lineno if tb else "?"
1097 msg = f"HTTP status code {sc}: {prefix}{type(e).__name__}: {e} (line {line})"
1098 return sc, msg, "text/plain"
1100 def exec(self, method="get", content_type="application/json"):
1101 """This method takes in input the HTTP method to use for the call
1102 and the content type to return, and execute the operation as indicated
1103 in the specification file, by running (in the following order):
1105 1. the methods to preprocess the query;
1106 2. the SPARQL query related to the operation called, by using the parameters indicated in the URL;
1107 3. the specification of all the types of the various rows returned;
1108 4. the methods to postprocess the result;
1109 5. the application of the filter to remove, filter, sort the result;
1110 6. the removal of the types added at the step 3, so as to have a data structure ready to be returned;
1111 7. the conversion in the format requested by the user."""
1112 str_method = method.lower()
1113 if str_method not in self.i["method"].split():
1114 return 405, f"HTTP status code 405: '{str_method}' method not allowed", "text/plain", {}
1116 try:
1117 status, body, ctype = self._dispatch_exec(content_type)
1118 except TimeoutError as e:
1119 return *self._format_error(408, e, "request timeout - "), {}
1120 except (TypeError, ValueError) as e:
1121 return *self._format_error(400, e, "parameter in the request not compliant with the type specified - "), {}
1122 except Exception as e: # noqa: BLE001
1123 return *self._format_error(500, e, "something unexpected happened - "), {}
1125 headers = {}
1126 if self.pagination_info is not None:
1127 link_header = build_link_header(self.pagination_info)
1128 if link_header:
1129 headers["Link"] = link_header
1130 return status, body, ctype, headers
1132 def _dispatch_exec(self, content_type):
1133 """Dispatch to the appropriate execution path based on SPARQL text content."""
1134 par_dict = self._extract_params()
1135 if self.addon is not None:
1136 self.preprocess(par_dict, self.i, self.addon)
1137 if self.custom_params:
1138 self._apply_custom_preprocess_params(par_dict)
1140 if self._cache is not None and "cache_disable" not in self.i:
1141 q_string = parse_qs(quote(self.url_parsed.query, safe="&="))
1142 cached_table = self._cache.get(self._build_cache_key(q_string))
1143 if cached_table is not None:
1144 return self._paginate_and_format(cached_table, q_string, content_type)
1146 sparql_text = self.i["sparql"]
1147 resolved_text = sparql_text
1148 for param, val in par_dict.items():
1149 resolved_text = resolved_text.replace(f"[[{param}]]", str(val))
1151 if "@@" not in resolved_text:
1152 if self.engine == "sparql-anything":
1153 return self._exec_sparql_anything_single(par_dict, content_type)
1154 return self._exec_standard_sparql(par_dict, content_type)
1156 try:
1157 return self._exec_multi_source(par_dict, content_type)
1158 except ValueError as ve:
1159 return 400, f"HTTP status code 400: {ve}", "text/plain"
1160 except RuntimeError as re_err:
1161 return 502, f"HTTP status code 502: {re_err}", "text/plain"