Coverage for ramose / operation.py: 99%

696 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-15 15:58 +0000

1# SPDX-FileCopyrightText: 2018-2021 Silvio Peroni <silvio.peroni@unibo.it> 

2# SPDX-FileCopyrightText: 2020-2021 Marilena Daquino <marilena.daquino2@unibo.it> 

3# SPDX-FileCopyrightText: 2022 Davide Brembilla 

4# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it> 

5# SPDX-FileCopyrightText: 2025 Sergei Slinkin 

6# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

7# 

8# SPDX-License-Identifier: ISC 

9 

10import time 

11from csv import DictReader, reader, writer 

12from io import StringIO 

13from itertools import product 

14from json import dumps 

15from math import ceil 

16from operator import eq, gt, itemgetter, lt 

17from re import findall, match, search, sub 

18from urllib.parse import parse_qs, quote, urlsplit 

19 

20from requests.exceptions import RequestException 

21 

22from ramose._constants import DEFAULT_HTTP_TIMEOUT, FIELD_TYPE_RE, _http_session 

23from ramose.datatype import DataType 

24from ramose.paging import PaginationInfo, build_link_header, build_pagination_info 

25 

26 

27class Operation: 

28 def __init__( 

29 self, 

30 op_complete_url, 

31 op_key, 

32 i, 

33 tp, 

34 sparql_http_method, 

35 addon, 

36 format_map=None, 

37 sources_map=None, 

38 engine="sparql", 

39 custom_params=None, 

40 disabled_params=None, 

41 cache=None, 

42 default_cache_ttl=86400, 

43 ): 

44 """This class is responsible for materialising a API operation to be run against a SPARQL endpoint 

45 (or, depending on configuration, through the SPARQL.Anything engine). 

46 

47 It takes in input a full URL referring to a call to an operation (parameter 'op_complete_url'), 

48 the particular shape representing an operation (parameter 'op_key'), the definition (in JSON) of such 

49 operation (parameter 'i'), the URL of the triplestore to contact (parameter 'tp'), the HTTP method 

50 to use for the SPARQL request (parameter 'sparql_http_method', set to either 'get' or 'post'), the path 

51 of the Python file which defines additional functions for use in the operation (parameter 'addon'), and formats 

52 with the names of the corresponding functions responsible for converting CSV data into the specified formats 

53 (parameter 'format'). 

54 It also accepts a mapping of named sources to endpoint URLs referenced by @@with directives 

55 (parameter 'sources_map') and the engine identifier selecting the execution 

56 backend (parameter 'engine').""" 

57 self.url_parsed = urlsplit(op_complete_url) 

58 self.op_url = self.url_parsed.path 

59 self.op = op_key 

60 self.i = i 

61 self.tp = tp 

62 self.sparql_http_method = sparql_http_method 

63 self.addon = addon 

64 self.format = format_map or {} 

65 self.sources_map = sources_map or {} 

66 self.engine = engine 

67 self.custom_params = custom_params or {} 

68 self.disabled_params = disabled_params or set() 

69 self._sa_engine = None 

70 self._cache = cache 

71 self._default_cache_ttl = default_cache_ttl 

72 self.pagination_info: PaginationInfo | None = None 

73 

74 self.operation = {"=": eq, "<": lt, ">": gt} 

75 

76 self.dt = DataType() 

77 

78 @staticmethod 

79 def get_content_type(ct): 

80 """It returns the mime type of a given textual representation of a format, being it either 

81 'csv' or 'json.""" 

82 content_type = ct 

83 

84 if ct == "csv": 

85 content_type = "text/csv" 

86 elif ct == "json": 

87 content_type = "application/json" 

88 

89 return content_type 

90 

91 def conv(self, s, query_string, c_type="text/csv"): 

92 """This method takes a string representing a CSV document and converts it in the requested format according 

93 to what content type is specified as input.""" 

94 

95 content_type = Operation.get_content_type(c_type) 

96 if self.pagination_info is not None: 

97 request_url = self.pagination_info.self_url 

98 elif self.url_parsed.query: 

99 request_url = f"{self.op_url}?{self.url_parsed.query}" 

100 else: 

101 request_url = self.op_url 

102 

103 if "format" in query_string and "format" not in self.disabled_params: 

104 req_formats = query_string["format"] 

105 

106 for req_format in req_formats: 

107 content_type = Operation.get_content_type(req_format) 

108 

109 if req_format in self.format: 

110 converter_func = getattr(self.addon, self.format[req_format]) 

111 return converter_func(s, request_url=request_url), content_type 

112 elif "default_format" in self.i: 

113 default_fmt = self.i["default_format"].strip() 

114 content_type = Operation.get_content_type(default_fmt) 

115 if default_fmt in self.format: 

116 converter_func = getattr(self.addon, self.format[default_fmt]) 

117 return converter_func(s, request_url=request_url), content_type 

118 

119 if content_type not in ("text/csv", "application/json"): 

120 content_type = "text/csv" 

121 

122 if "application/json" in content_type: 

123 with StringIO(s) as f: 

124 r = [dict(i) for i in DictReader(f)] 

125 

126 if "json" not in self.disabled_params: 

127 r = Operation.structured(query_string, r) 

128 

129 return dumps(r, ensure_ascii=False, indent=4), content_type 

130 else: 

131 return s, content_type 

132 

133 @staticmethod 

134 def pv(i, r=None): 

135 """This method returns the plain value of a particular item 'i' of the result returned by the SPARQL query. 

136 

137 In case 'r' is specified (i.e. a row containing a set of results), then 'i' must be the index of the item 

138 within that row.""" 

139 if r is None: 

140 return i[1] 

141 return Operation.pv(r[i]) 

142 

143 @staticmethod 

144 def tv(i, r=None): 

145 """This method returns the typed value of a particular item 'i' of the result returned by the SPARQL query. 

146 The type associated to that value is actually specified by means of the particular configuration provided 

147 in the specification file of the API - field 'field_type'. 

148 

149 In case 'r' is specified (i.e. a row containing a set of results), then 'i' must be the index of the item 

150 within that row.""" 

151 if r is None: 

152 return i[0] 

153 return Operation.tv(r[i]) 

154 

155 @staticmethod 

156 def do_overlap(r1, r2): 

157 """This method returns a boolean that says if the two ranges (i.e. two pairs of integers) passed as inputs 

158 actually overlap one with the other.""" 

159 r1_s, r1_e = r1 

160 r2_s, r2_e = r2 

161 

162 return r1_s <= r2_s <= r1_e or r2_s <= r1_s <= r2_e 

163 

164 @staticmethod 

165 def get_item_in_dict(d_or_l, key_list, prev=None): 

166 """This method takes as input a dictionary or a list of dictionaries and browses it until the value 

167 specified following the chain indicated in 'key_list' is not found. It returns a list of all the 

168 values that matched with such search.""" 

169 res = [] if prev is None else prev.copy() 

170 

171 d_list = [d_or_l] if isinstance(d_or_l, dict) else d_or_l 

172 

173 for d in d_list: 

174 key_list_len = len(key_list) 

175 

176 if key_list_len >= 1: 

177 key = key_list[0] 

178 if key in d: 

179 if key_list_len == 1: 

180 res.append(d[key]) 

181 else: 

182 res = Operation.get_item_in_dict(d[key], key_list[1:], res) 

183 

184 return res 

185 

186 @staticmethod 

187 def add_item_in_dict(d_or_l, key_list, item, idx): 

188 """This method takes as input a dictionary or a list of dictionaries, browses it until the value 

189 specified following the chain indicated in 'key_list' is not found, and then substitutes it with 'item'. 

190 In case the final object retrieved is a list, it selects the object in position 'idx' before the 

191 substitution.""" 

192 key_list_len = len(key_list) 

193 

194 if key_list_len >= 1: 

195 key = key_list[0] 

196 

197 if isinstance(d_or_l, list): 

198 if key_list_len == 1: 

199 d_or_l[idx][key] = item 

200 else: 

201 for i in d_or_l: 

202 Operation.add_item_in_dict(i, key_list, item, idx) 

203 elif key in d_or_l: 

204 if key_list_len == 1: 

205 d_or_l[key] = item 

206 else: 

207 Operation.add_item_in_dict(d_or_l[key], key_list[1:], item, idx) 

208 

209 @staticmethod 

210 def structured(params, json_table): 

211 """This method checks if there are particular transformation rules specified in 'params' for a JSON output, 

212 and convert each row of the input table ('json_table') according to these rules. 

213 There are two specific rules that can be applied: 

214 

215 1. array("<separator>",<field>): it converts the string value associated to the field name '<field>' into 

216 an array by splitting the various textual parts by means of '<separator>'. For instance, consider the 

217 following JSON structure: 

218 

219 [ 

220 { "names": "Doe, John; Doe, Jane" }, 

221 { "names": "Doe, John; Smith, John" } 

222 ] 

223 

224 Executing the rule 'array("; ",names)' returns the following new JSON structure: 

225 

226 [ 

227 { "names": [ "Doe, John", "Doe, Jane" ], 

228 { "names": [ "Doe, John", "Smith, John" ] 

229 ] 

230 

231 2. dict("separator",<field>,<new_field_1>,<new_field_2>,...): it converts the string value associated to 

232 the field name '<field>' into an dictionary by splitting the various textual parts by means of 

233 '<separator>' and by associating the new fields '<new_field_1>', '<new_field_2>', etc., to these new 

234 parts. For instance, consider the following JSON structure: 

235 

236 [ 

237 { "name": "Doe, John" }, 

238 { "name": "Smith, John" } 

239 ] 

240 

241 Executing the rule 'array(", ",name,family_name,given_name)' returns the following new JSON structure: 

242 

243 [ 

244 { "name": { "family_name": "Doe", "given_name: "John" } }, 

245 { "name": { "family_name": "Smith", "given_name: "John" } } 

246 ] 

247 

248 Each of the specified rules is applied in order, and it works on the JSON structure returned after 

249 the execution of the previous rule.""" 

250 if "json" in params: 

251 fields = params["json"] 

252 for field in fields: 

253 ops = findall(r'([a-z]+)\(("[^"]+"),([^\)]+)\)', field) 

254 for op_type, s, es in ops: 

255 separator = sub('"(.+)"', "\\1", s) 

256 entries = [i.strip() for i in es.split(",")] 

257 keys = entries[0].split(".") 

258 

259 for row in json_table: 

260 v_list = Operation.get_item_in_dict(row, keys) 

261 for idx, v in enumerate(v_list): 

262 if op_type == "array": 

263 if isinstance(v, str): 

264 Operation.add_item_in_dict(row, keys, v.split(separator) if v != "" else [], idx) 

265 elif op_type == "dict": 

266 new_fields = entries[1:] 

267 new_fields_max_split = len(new_fields) - 1 

268 if isinstance(v, str): 

269 new_values = v.split(separator, new_fields_max_split) 

270 Operation.add_item_in_dict( 

271 row, 

272 keys, 

273 dict(zip(new_fields, new_values, strict=False)) if v != "" else {}, 

274 idx, 

275 ) 

276 elif isinstance(v, list): 

277 new_list = [] 

278 for i in v: 

279 new_values = i.split(separator, new_fields_max_split) 

280 new_list.append(dict(zip(new_fields, new_values, strict=False))) 

281 Operation.add_item_in_dict(row, keys, new_list, idx) 

282 

283 return json_table 

284 

285 def preprocess(self, par_dict, op_item, addon): 

286 """This method takes the a dictionary of parameters with the current typed values associated to them and 

287 the item of the API specification defining the behaviour of that operation, and preprocesses the parameters 

288 according to the functions specified in the '#preprocess' field (e.g. "#preprocess lower(doi)"), which is 

289 applied to the specified parameters as input of the function in consideration (e.g. 

290 "/api/v1/citations/10.1108/jd-12-2013-0166", converting the DOI in lowercase). 

291 

292 It is possible to run multiple functions sequentially by concatenating them with "-->" in the API 

293 specification document. In this case the output of the function f_i will becomes the input operation URL 

294 of the function f_i+1. 

295 

296 Finally, it is worth mentioning that all the functions specified in the "#preprocess" field must return 

297 a tuple of values defining how the particular value passed in the dictionary must be changed.""" 

298 result = par_dict 

299 

300 if "preprocess" in op_item: 

301 for pre in [sub(r"\s+", "", i) for i in op_item["preprocess"].split(" --> ")]: 

302 func_name = sub(r"^([^\(\)]+)\(.+$", r"\1", pre).strip() 

303 params_name = sub(r"^.+\(([^\(\)]+)\).*", r"\1", pre).split(",") 

304 

305 param_list = tuple(result[param_name] for param_name in params_name) 

306 

307 # run function 

308 func = getattr(addon, func_name) 

309 res = func(*param_list) 

310 

311 # substitute res to the current parameter in result 

312 for idx, val in enumerate(res): 

313 result[params_name[idx]] = val 

314 

315 return result 

316 

317 def postprocess(self, res, op_item, addon): 

318 """This method takes the result table returned by running the SPARQL query in an API operation (specified 

319 as input) and change some of such results according to the functions specified in the '#postprocess' 

320 field (e.g. "#postprocess remove_date("2018")"). These functions can take parameters as input, while the first 

321 unspecified parameters will be always the result table. It is worth mentioning that this result table (i.e. 

322 a list of tuples) actually contains, in each cell, a tuple defining the plain value as well as the typed 

323 value for enabling better comparisons and operations if needed. An example of this table of result is shown as 

324 follows: 

325 

326 [ 

327 ("id", "date"), 

328 ("my_id_1", "my_id_1"), (datetime(2018, 3, 2), "2018-03-02"), 

329 ... 

330 ] 

331 

332 Note that the typed value and the plain value of each cell can be selected by using the methods "tv" and "pv" 

333 respectively. In addition, it is possible to run multiple functions sequentially by concatenating them 

334 with "-->" in the API specification document. In this case the output of the function f_i will becomes 

335 the input result table of the function f_i+1.""" 

336 result = res 

337 

338 if "postprocess" in op_item: 

339 for post in [i.strip() for i in op_item["postprocess"].split(" --> ")]: 

340 func_name = sub(r"^([^\(\)]+)\(.+$", r"\1", post).strip() 

341 param_str = sub(r"^.+\(([^\(\)]*)\).*", r"\1", post) 

342 params_values = () if param_str == "" else next(reader(param_str.splitlines(), skipinitialspace=True)) 

343 

344 func = getattr(addon, func_name) 

345 func_params = (result, *tuple(params_values)) 

346 result, do_type_fields = func(*func_params) 

347 if do_type_fields: 

348 result = self.type_fields(result, op_item) 

349 

350 return result 

351 

352 @staticmethod 

353 def _apply_require(header, result, fields): 

354 """Exclude rows with empty values in the specified fields.""" 

355 for field in fields: 

356 field_idx = header.index(field) 

357 result = [row for row in result if Operation.pv(field_idx, row) not in (None, "")] 

358 return result 

359 

360 def _apply_filter(self, header, result, fields): 

361 """Filter rows by comparison operators or regex patterns.""" 

362 for field in fields: 

363 field_name, field_value = field.split(":", 1) 

364 try: 

365 field_idx = header.index(field_name) 

366 flag = field_value[0] 

367 if flag in ("<", ">", "="): 

368 value = field_value[1:].lower() 

369 result = [ 

370 row 

371 for row in result 

372 if self.operation[flag]( 

373 Operation.tv(field_idx, row), 

374 self.dt.get_func(type(Operation.tv(field_idx, row)).__name__)(value), 

375 ) 

376 ] 

377 else: 

378 pattern = field_value.lower() 

379 result = [row for row in result if search(pattern, Operation.pv(field_idx, row).lower())] 

380 except ValueError: 

381 pass 

382 return result 

383 

384 @staticmethod 

385 def _apply_sort(header, result, fields): 

386 """Sort rows by the specified fields and directions.""" 

387 for field in sorted(fields, reverse=True): 

388 order_names = findall(r"^(desc|asc)\(([^\(\)]+)\)$", field) 

389 if order_names: 

390 direction, field_name = order_names[0] 

391 else: 

392 direction, field_name = "asc", field 

393 try: 

394 field_idx = header.index(field_name) 

395 result = sorted(result, key=itemgetter(field_idx), reverse=(direction == "desc")) 

396 except ValueError: 

397 pass 

398 return result 

399 

400 def handling_params(self, params, table): 

401 """This method is used for filtering the results that are returned after the post-processing 

402 phase. In particular, it is possible to: 

403 

404 1. [require=<field_name>] exclude all the rows that have an empty value in the field specified - e.g. the 

405 "require=doi" remove all the rows that do not have any string specified in the "doi" field; 

406 

407 2. [filter=<field_name>:<operator><value>] consider only the rows where the string in the input field 

408 is compliant with the value specified. If no operation is specified, the value is interpreted as a 

409 regular expression, otherwise it is compared according to the particular type associated to that field. 

410 Possible operators are "=", "<", and ">" - e.g. "filter=title:semantics?" returns all the rows that contain 

411 the string "semantic" or "semantics" in the field title, while "filter=date:>2016-05" returns all the rows 

412 that have a date greater than May 2016; 

413 

414 3. [sort=<order>(<field_name>)] sort all the results according to the value and type of the particular 

415 field specified in input. It is possible to sort the rows either in ascending ("asc") or descending 

416 ("desc") order - e.g. "sort=desc(date)" sort all the rows according to the value specified in the 

417 field "date" in descending order. 

418 

419 Note that these filtering operations are applied in the order presented above - first the "require", then 

420 the "filter", and finally the "sort". It is possible to specify one or more filtering operation of the 

421 same kind (e.g. "require=doi&require=title"). 

422 """ 

423 header = table[0] 

424 result = table[1:] 

425 

426 overridden = set(self.custom_params) | self.disabled_params 

427 

428 if ("exclude" in params or "require" in params) and "require" not in overridden and "exclude" not in overridden: 

429 fields = params["exclude"] if "exclude" in params else params["require"] 

430 result = self._apply_require(header, result, fields) 

431 

432 if "filter" in params and "filter" not in overridden: 

433 result = self._apply_filter(header, result, params["filter"]) 

434 

435 if "sort" in params and "sort" not in overridden: 

436 result = self._apply_sort(header, result, params["sort"]) 

437 

438 return [header, *result] 

439 

440 def type_fields(self, res, op_item): 

441 """It creates a version of the results 'res' that adds, to each value of the fields, the same value interpreted 

442 with the type specified in the specification file (field 'field_type'). Note that 'str' is used as default in 

443 case no further specifications are provided.""" 

444 result = [] 

445 cast_func = {} 

446 header = res[0] 

447 for heading in header: 

448 cast_func[heading] = DataType.str 

449 

450 if "field_type" in op_item: 

451 for f, p in findall(FIELD_TYPE_RE, op_item["field_type"]): 

452 cast_func[p] = self.dt.get_func(f) 

453 

454 for row in res[1:]: 

455 new_row = [] 

456 for idx, heading in enumerate(header): 

457 cur_value = row[idx] 

458 if isinstance(cur_value, tuple): 

459 cur_value = cur_value[1] 

460 new_row.append((cast_func[heading](cur_value), cur_value)) 

461 result.append(new_row) 

462 

463 return [header, *result] 

464 

465 def remove_types(self, res): 

466 """This method takes the results 'res' that include also the typed value and returns a version of such 

467 results without the types that is ready to be stored on the file system.""" 

468 result = [res[0]] 

469 result.extend(tuple(Operation.pv(idx, row) for idx in range(len(row))) for row in res[1:]) 

470 return result 

471 

472 @staticmethod 

473 def _is_directive(line): 

474 return line.strip().startswith("@@") 

475 

476 @staticmethod 

477 def _parse_directive_args(tokens, param_names, defaults=None): 

478 defaults = defaults or {} 

479 all_names = set(param_names) | set(defaults) 

480 result = {} 

481 positional_index = 0 

482 seen_keyword = False 

483 

484 for token in tokens: 

485 if "=" in token: 

486 key, value = token.split("=", 1) 

487 if key in all_names: 

488 if key in result: 

489 raise ValueError(f"Duplicate parameter {key!r}") 

490 seen_keyword = True 

491 result[key] = value 

492 continue 

493 if seen_keyword: 

494 raise ValueError(f"Positional argument {token!r} cannot follow keyword argument") 

495 if positional_index >= len(param_names): 

496 raise ValueError(f"Unexpected argument {token!r}") 

497 result[param_names[positional_index]] = token 

498 positional_index += 1 

499 

500 for name, default in defaults.items(): 

501 if name not in result: 

502 result[name] = default 

503 

504 missing = [name for name in param_names if name not in result] 

505 if missing: 

506 raise ValueError(f"Missing required parameter(s): {', '.join(missing)}") 

507 

508 return result 

509 

510 def _handle_directive_with(self, parts): 

511 args = Operation._parse_directive_args(parts[1:], ["source"]) 

512 name = args["source"] 

513 if name not in self.sources_map: 

514 raise ValueError(f"Unknown source '{name}' in @@with; declare it in #sources.") 

515 return self.sources_map[name], None 

516 

517 @staticmethod 

518 def _handle_directive_endpoint(parts): 

519 args = Operation._parse_directive_args(parts[1:], ["target"]) 

520 return args["target"], None 

521 

522 @staticmethod 

523 def _handle_directive_join(parts): 

524 args = Operation._parse_directive_args(parts[1:], ["left_var", "right_var"], defaults={"type": "inner"}) 

525 return None, ("JOIN", args["left_var"], args["right_var"], args["type"].lower()) 

526 

527 @staticmethod 

528 def _handle_directive_values(parts): 

529 tokens = parts[1:] 

530 if not tokens: 

531 raise ValueError("@@values needs at least one variable") 

532 return None, ("VALUES_INJECT", tokens) 

533 

534 @staticmethod 

535 def _handle_directive_foreach(parts): 

536 args = Operation._parse_directive_args(parts[1:], ["variable", "placeholder"], defaults={"wait": "0"}) 

537 var_name = args["variable"] 

538 if not var_name.startswith("?"): 

539 raise ValueError(f"@@foreach variable must start with '?', got {var_name!r}") 

540 try: 

541 delay = float(args["wait"]) 

542 except ValueError: 

543 raise ValueError(f"Invalid wait value in @@foreach: {args['wait']!r}") from None 

544 return None, ("FOREACH", var_name, args["placeholder"], delay) 

545 

546 def _parse_steps(self, text, default_endpoint, params): 

547 """ 

548 Returns a list of steps: 

549 - ("QUERY", endpoint_url, query_text) 

550 - ("JOIN", left_var, right_var, how) # how in {"inner","left"} 

551 - ("REMOVE", [vars]) 

552 - ("VALUES_INJECT", [vars]) # @@values ?var1 ?var2 ... 

553 - ("FOREACH", var_name, placeholder, delay) # @@foreach ?var placeholder [wait=N] 

554 """ 

555 for p, v in params.items(): 

556 text = text.replace(f"[[{p}]]", str(v)) 

557 steps = [] 

558 cur_query = [] 

559 current_endpoint = default_endpoint 

560 

561 directive_handlers = { 

562 "with": self._handle_directive_with, 

563 "endpoint": self._handle_directive_endpoint, 

564 "join": self._handle_directive_join, 

565 "remove": lambda parts: (None, ("REMOVE", parts[1:])), 

566 "values": self._handle_directive_values, 

567 "foreach": self._handle_directive_foreach, 

568 } 

569 

570 def flush_query(): 

571 if cur_query: 

572 q = "\n".join(cur_query).strip() 

573 if not q: 

574 cur_query.clear() 

575 return 

576 for p, v in params.items(): 

577 q = q.replace(f"[[{p}]]", str(v)) 

578 steps.append(("QUERY", current_endpoint, q)) 

579 cur_query.clear() 

580 

581 for raw in text.splitlines(): 

582 line = raw.rstrip("\n") 

583 if not self._is_directive(line): 

584 cur_query.append(line) 

585 continue 

586 

587 flush_query() 

588 

589 body = line.strip()[2:].strip() 

590 parts = body.split() 

591 cmd = parts[0].lower() 

592 

593 handler = directive_handlers.get(cmd) 

594 if handler is None: 

595 raise ValueError(f"Unknown directive @@{cmd}") 

596 

597 new_endpoint, step = handler(parts) 

598 if new_endpoint is not None: 

599 current_endpoint = new_endpoint 

600 if step is not None: 

601 steps.append(step) 

602 

603 flush_query() 

604 return steps 

605 

606 def _run_sparql_dicts(self, endpoint_url, query_text): 

607 """Run a SELECT query against a SPARQL endpoint and return a list of dict rows. 

608 

609 This always requests CSV and parses it via DictReader, to stay consistent 

610 with RAMOSE's legacy pipeline. 

611 """ 

612 try: 

613 if self.sparql_http_method == "get": 

614 r = _http_session.get( 

615 endpoint_url + "?query=" + quote(query_text), 

616 headers={ 

617 "Accept": "text/csv", 

618 "User-Agent": "RAMOSE/2.0.0", 

619 }, 

620 timeout=DEFAULT_HTTP_TIMEOUT, 

621 ) 

622 else: 

623 r = _http_session.post( 

624 endpoint_url, 

625 data=query_text, 

626 headers={ 

627 "Accept": "text/csv", 

628 "Content-Type": "application/sparql-query", 

629 "User-Agent": "RAMOSE/2.0.0", 

630 }, 

631 timeout=DEFAULT_HTTP_TIMEOUT, 

632 ) 

633 except RequestException as e: 

634 raise RuntimeError(f"SPARQL request failed: {e}") from e 

635 

636 r.encoding = "utf-8" 

637 if r.status_code != 200: 

638 raise RuntimeError(f"SPARQL {r.status_code}: {r.reason}") 

639 text = r.content.decode("utf-8-sig", errors="replace") 

640 list_of_lines = text.splitlines() 

641 return list(DictReader(list_of_lines)) 

642 

643 @staticmethod 

644 def _normalize_sparql_json_resultset(result): 

645 """Convert a SPARQL JSON ResultSet dict to a list of flat dicts.""" 

646 vars_ = result["head"].get("vars") or [] 

647 return [ 

648 {v: (b[v].get("value") if isinstance(b.get(v), dict) else b.get(v)) for v in vars_} 

649 for b in result["results"].get("bindings", []) 

650 ] 

651 

652 @staticmethod 

653 def _normalize_columnar_dict(result): 

654 """Convert a column-oriented dict {col: [values]} to a list of row dicts.""" 

655 cols = list(result.keys()) 

656 max_len = max((len(v) for v in result.values() if isinstance(v, (list, tuple))), default=0) 

657 

658 if not max_len: 

659 return [result] 

660 

661 rows = [] 

662 for i in range(max_len): 

663 row = {} 

664 for c in cols: 

665 v = result[c] 

666 row[c] = ( 

667 v[i] 

668 if isinstance(v, (list, tuple)) and i < len(v) 

669 else (v if not isinstance(v, (list, tuple)) else None) 

670 ) 

671 rows.append(row) 

672 return rows 

673 

674 def _run_sparql_anything_dicts(self, query_text, values=None): 

675 """ 

676 Execute a SPARQL Anything SELECT query via PySPARQL-Anything and return 

677 a list of dicts (one per row), in the same shape as _run_sparql_dicts. 

678 

679 query_text: full SPARQL (Anything) query string 

680 (typically containing SERVICE <x-sparql-anything:...>). 

681 values: optional dict of template parameters for the query 

682 (name -> value), passed to SPARQL Anything's `values=`. 

683 """ 

684 if self._sa_engine is None: 

685 import pysparql_anything # noqa: PLC0415 # type: ignore[import-not-found] 

686 

687 self._sa_engine = pysparql_anything.SparqlAnything() 

688 

689 kwargs = {"query": query_text} 

690 if values: 

691 kwargs["values"] = {str(k): str(v) for k, v in values.items()} 

692 

693 result = self._sa_engine.select(output_type=dict, **kwargs) 

694 

695 # Normalize to list[dict] 

696 if isinstance(result, list): 

697 if result and isinstance(result[0], dict): 

698 return result 

699 return [dict(row) for row in result] 

700 

701 if not isinstance(result, dict): 

702 return [{"result": result}] 

703 

704 # Standard SPARQL JSON ResultSet shape 

705 head = result.get("head") 

706 results_obj = result.get("results") 

707 if isinstance(head, dict) and isinstance(results_obj, dict) and "bindings" in results_obj: 

708 return self._normalize_sparql_json_resultset(result) 

709 

710 # Column-oriented dict or single-row fallback 

711 return self._normalize_columnar_dict(result) 

712 

713 def _run_query_dicts(self, endpoint_url, query_text): 

714 """ 

715 Dispatch query execution to the appropriate backend, with support 

716 for per-query engine selection in multi-source mode. 

717 

718 Rules: 

719 - If endpoint_url is the special string "sparql-anything" (case-insensitive), 

720 then always use SPARQL.ANYTHING (PySPARQL-Anything) for this query. 

721 - Otherwise, fall back to the operation-level engine: 

722 * engine == "sparql-anything" -> SPARQL.ANYTHING 

723 * else -> standard HTTP SPARQL 

724 """ 

725 

726 # Per-query override: @@endpoint sparql-anything 

727 if endpoint_url and str(endpoint_url).strip().lower() == "sparql-anything": 

728 return self._run_sparql_anything_dicts(query_text) 

729 

730 # Default behaviour: operation-level engine 

731 if self.engine == "sparql-anything": 

732 return self._run_sparql_anything_dicts(query_text) 

733 return self._run_sparql_dicts(endpoint_url, query_text) 

734 

735 def _inject_values_clause(self, query_text, vars_, acc_rows): 

736 # build distinct tuples for requested vars from the accumulator 

737 cols = [v.lstrip("?") for v in vars_] 

738 tuples, seen = [], set() 

739 for row in acc_rows or []: 

740 tup = tuple(row.get(c, "") for c in cols) 

741 if all(tup) and tup not in seen: 

742 seen.add(tup) 

743 tuples.append(tup) 

744 if not tuples: 

745 return query_text # nothing to inject 

746 

747 # format literals vs IRIs 

748 def fmt(x): 

749 s = str(x) 

750 if s.startswith(("http://", "https://")): 

751 return f"<{s}>" 

752 return '"' + s.replace("\\", "\\\\").replace('"', '\\"') + '"' 

753 

754 head = "VALUES (" + " ".join(vars_) + ") {\n" 

755 body = "\n".join(" (" + " ".join(fmt(v) for v in tup) + ")" for tup in tuples) 

756 tail = "\n}\n" 

757 

758 i = query_text.find("{") 

759 if i == -1: 

760 # no WHERE brace: put VALUES at top (legal SPARQL) 

761 return head + body + tail + query_text 

762 j = i + 1 

763 return query_text[:j] + "\n" + head + body + tail + query_text[j:] 

764 

765 @staticmethod 

766 def _drop_columns(rows, vars_): 

767 if not rows: 

768 return rows 

769 vars_set = {v.lstrip("?") for v in vars_} 

770 return [{k: v for k, v in r.items() if k not in vars_set and ("?" + k) not in vars_set} for r in rows] 

771 

772 def _norm_join_key(self, v): 

773 if v is None: 

774 return None 

775 s = str(v).strip() 

776 # unify scheme for w3id IRIs (and similar) 

777 if s.startswith("http://"): 

778 s = "https://" + s[len("http://") :] 

779 # drop a single trailing slash for stability 

780 return s.removesuffix("/") 

781 

782 def _join(self, left_rows, right_rows, lkey, rkey, how="inner"): 

783 """ 

784 Merge two row sets on lkey (from left_rows) and rkey (from right_rows). 

785 - lkey/rkey may be passed as '?var' or 'var' -> we normalize to bare names. 

786 - Keys are normalized with _norm_join_key (e.g., http -> https, trim slash). 

787 - When 'left', all left rows are preserved even if no match on the right. 

788 - Right-hand columns are copied into the merged row; collisions are avoided. 

789 """ 

790 # 1) Normalize column names (strip leading '?') 

791 lcol = lkey.lstrip("?") 

792 rcol = rkey.lstrip("?") 

793 

794 left_rows = left_rows or [] 

795 right_rows = right_rows or [] 

796 

797 # 2) Build an index for right_rows on normalized rcol values 

798 rindex = {} 

799 for r in right_rows: 

800 rk = self._norm_join_key(r.get(rcol)) 

801 if rk is None: 

802 continue 

803 rindex.setdefault(rk, []).append(r) 

804 

805 # determine right columns to copy (excluding the join key) 

806 right_cols = [c for c in (right_rows[0].keys() if right_rows else []) if c != rcol] 

807 

808 out = [] 

809 for left_row in left_rows: 

810 lk = self._norm_join_key(left_row.get(lcol)) 

811 matches = rindex.get(lk, []) 

812 if matches: 

813 for r in matches: 

814 merged = dict(left_row) 

815 for c in right_cols: 

816 rv = r.get(c) 

817 if rv is None: 

818 continue 

819 if c not in merged or merged[c] in ("", None): 

820 merged[c] = rv 

821 else: 

822 alt = f"{c}_r" 

823 if alt not in merged or merged[alt] in ("", None): 

824 merged[alt] = rv 

825 out.append(merged) 

826 elif how == "left": 

827 out.append(dict(left_row)) 

828 return out 

829 

830 def _apply_custom_postprocess_params(self, table, q_string): 

831 for param_name, param_conf in self.custom_params.items(): 

832 if param_conf["phase"] != "postprocess": 

833 continue 

834 if param_name in q_string: 

835 handler = getattr(self.addon, param_conf["handler"]) 

836 table = handler(table, q_string[param_name]) 

837 return table 

838 

839 @property 

840 def _cache_ttl(self): 

841 if "cache_duration" in self.i: 

842 return int(self.i["cache_duration"]) 

843 return self._default_cache_ttl 

844 

845 def _build_cache_key(self, q_string): 

846 presentation_params = {"page", "page_size", "format", "json"} 

847 data_params = sorted((name, values) for name, values in q_string.items() if name not in presentation_params) 

848 if data_params: 

849 query_string = "&".join(f"{name}={value}" for name, values in data_params for value in values) 

850 return f"{self.tp}:{self.op_url}?{query_string}" 

851 return f"{self.tp}:{self.op_url}" 

852 

853 def _extract_pagination_params(self, q_string): 

854 if "page_size" not in q_string or "page_size" in self.disabled_params: 

855 return None 

856 page_size = int(q_string["page_size"][0]) 

857 if page_size < 1: 

858 msg = f"page_size must be >= 1, got {page_size}" 

859 raise ValueError(msg) 

860 page = 1 

861 if "page" in q_string and "page" not in self.disabled_params: 

862 page = int(q_string["page"][0]) 

863 if page < 1: 

864 msg = f"page must be >= 1, got {page}" 

865 raise ValueError(msg) 

866 return page, page_size 

867 

868 def _has_custom_converter(self, q_string): 

869 if "format" in q_string and "format" not in self.disabled_params: 

870 for req_format in q_string["format"]: 

871 if req_format in self.format: 

872 return True 

873 elif "default_format" in self.i and self.i["default_format"].strip() in self.format: 

874 return True 

875 return False 

876 

877 def _paginate_and_format(self, table, q_string, content_type): 

878 if self._has_custom_converter(q_string): 

879 self.pagination_info = None 

880 else: 

881 page_params = self._extract_pagination_params(q_string) 

882 if page_params is not None: 

883 page, page_size = page_params 

884 total_items = len(table) - 1 

885 total_pages = ceil(total_items / page_size) 

886 if page > total_pages: 

887 msg = f"page {page} exceeds total pages {total_pages}" 

888 raise ValueError(msg) 

889 start = (page - 1) * page_size 

890 end = start + page_size 

891 table = [table[0], *table[1 + start : 1 + end]] 

892 self.pagination_info = build_pagination_info(self.op_url, q_string, page, page_size, total_items) 

893 else: 

894 self.pagination_info = None 

895 

896 s_res = StringIO() 

897 writer(s_res).writerows(table) 

898 body, ctype = self.conv(s_res.getvalue(), q_string, content_type) 

899 

900 return 200, body, ctype 

901 

902 def _finalize_result(self, csv_rows, content_type): 

903 """Run the shared pipeline: type fields, postprocess, filter, remove types, cache, paginate, format.""" 

904 q_string = parse_qs(quote(self.url_parsed.query, safe="&=")) 

905 res = self.type_fields(csv_rows, self.i) 

906 if self.addon is not None: 

907 res = self.postprocess(res, self.i, self.addon) 

908 res = self.handling_params(q_string, res) 

909 res = self.remove_types(res) 

910 if self.custom_params: 

911 res = self._apply_custom_postprocess_params(res, q_string) 

912 if self._cache is not None and "cache_disable" not in self.i: 

913 self._cache.set(self._build_cache_key(q_string), res, expire=self._cache_ttl) 

914 return self._paginate_and_format(res, q_string, content_type) 

915 

916 @staticmethod 

917 def _header_from_field_type(op_item, acc): 

918 # Respect #field_type order if provided, else derive from data 

919 if "field_type" in op_item: 

920 # FIELD_TYPE_RE is global in this file 

921 return [f for (_, f) in findall(FIELD_TYPE_RE, op_item["field_type"])] 

922 # fallback to keys of first row 

923 return list(acc[0].keys()) if acc else [] 

924 

925 @staticmethod 

926 def _to_csv_rows(header, acc): 

927 rows = [header] 

928 rows.extend([d.get(h, "") for h in header] for d in acc) 

929 return rows 

930 

931 def _extract_params(self): 

932 """Extract URL parameters and apply type conversions based on the operation spec.""" 

933 par_dict = {} 

934 par_man = match(self.op, self.op_url).groups() # type: ignore[union-attr] 

935 for idx, par in enumerate(findall("{([^{}]+)}", self.i["url"])): 

936 try: 

937 par_type = self.i[par].split("(")[0] 

938 par_value = par_man[idx] if par_type == "str" else self.dt.get_func(par_type)(par_man[idx]) 

939 except KeyError: 

940 par_value = par_man[idx] 

941 par_dict[par] = par_value 

942 return par_dict 

943 

944 def _apply_custom_preprocess_params(self, par_dict): 

945 q_string = parse_qs(quote(self.url_parsed.query, safe="&=")) 

946 for param_name, param_conf in self.custom_params.items(): 

947 if param_conf["phase"] != "preprocess": 

948 continue 

949 if param_name in q_string: 

950 handler = getattr(self.addon, param_conf["handler"]) 

951 par_dict.update(handler(q_string[param_name])) 

952 elif param_name not in par_dict: 

953 par_dict[param_name] = "" 

954 for placeholder in findall(r"\[\[(\w+)\]\]", self.i["sparql"]): 

955 if placeholder not in par_dict: 

956 par_dict[placeholder] = "" 

957 

958 def _exec_sparql_anything_single(self, par_dict, content_type): 

959 """Execute a single SPARQL Anything query and return the finalized result.""" 

960 query = self.i["sparql"] 

961 for param, val in par_dict.items(): 

962 query = query.replace(f"[[{param}]]", str(val)) 

963 rows = self._run_sparql_anything_dicts(query) 

964 header = self._header_from_field_type(self.i, rows or []) 

965 csv_rows = self._to_csv_rows(header, rows or []) 

966 return self._finalize_result(csv_rows, content_type) 

967 

968 def _exec_standard_sparql(self, par_dict, content_type): 

969 """Execute standard SPARQL queries, handling parameter combinations via cartesian product.""" 

970 # Wrap scalar values in lists for cartesian product 

971 par_dict = {k: v if isinstance(v, list) else [v] for k, v in par_dict.items()} 

972 

973 parameters_comb = [ 

974 dict(zip(par_dict.keys(), combination, strict=False)) for combination in product(*par_dict.values()) 

975 ] 

976 

977 # Example: {"id":"5","area":["A1","A2"]} -> [{"id":"5","area":"A1"}, {"id":"5","area":"A2"}] 

978 

979 list_of_res = [] 

980 include_header_line = True 

981 for comb in parameters_comb: 

982 query = self.i["sparql"] 

983 for param, val in comb.items(): 

984 query = query.replace(f"[[{param}]]", str(val)) 

985 

986 if self.sparql_http_method == "get": 

987 r = _http_session.get( 

988 self.tp + "?query=" + quote(query), 

989 headers={"Accept": "text/csv"}, 

990 timeout=DEFAULT_HTTP_TIMEOUT, 

991 ) 

992 else: 

993 r = _http_session.post( 

994 self.tp, 

995 data=query, 

996 headers={"Accept": "text/csv", "Content-Type": "application/sparql-query"}, 

997 timeout=DEFAULT_HTTP_TIMEOUT, 

998 ) 

999 r.encoding = "utf-8" 

1000 

1001 if r.status_code != 200: 

1002 return r.status_code, f"HTTP status code {r.status_code}: {r.reason}", "text/plain" 

1003 

1004 # Re-encode to handle non-UTF8 characters in splitlines 

1005 list_of_lines = [line.decode("utf-8") for line in r.text.encode("utf-8").splitlines()] 

1006 

1007 # Include the CSV header only from the first response 

1008 if not include_header_line: 

1009 list_of_lines = list_of_lines[1:] 

1010 include_header_line = False 

1011 

1012 list_of_res += list_of_lines 

1013 

1014 return self._finalize_result(list(reader(list_of_res)), content_type) 

1015 

1016 def _exec_foreach_query(self, endpoint_url, qtxt, var_name, placeholder, delay, acc): 

1017 """Run one query per distinct value collected from the accumulator (@@foreach).""" 

1018 column = var_name.lstrip("?") 

1019 

1020 values = [] 

1021 seen = set() 

1022 for row in acc or []: 

1023 v = row.get(column) 

1024 if v and v not in seen: 

1025 seen.add(v) 

1026 values.append(v) 

1027 

1028 all_rows = [] 

1029 for idx_val, val in enumerate(values): 

1030 q_one = qtxt.replace(f"[[{placeholder}]]", str(val)) 

1031 sub_rows = self._run_query_dicts(endpoint_url, q_one) 

1032 if sub_rows: 

1033 all_rows.extend(sub_rows) 

1034 if delay and idx_val + 1 < len(values): 

1035 time.sleep(delay) 

1036 

1037 return all_rows 

1038 

1039 def _exec_multi_source_query_step(self, endpoint_url, qtxt, state): 

1040 """Handle a QUERY step in the multi-source pipeline.""" 

1041 if state["pending_foreach"] is not None: 

1042 var_name, placeholder, delay = state["pending_foreach"] 

1043 rows = self._exec_foreach_query(endpoint_url, qtxt, var_name, placeholder, delay, state["acc"]) 

1044 state["pending_foreach"] = None 

1045 state["pending_values_vars"] = None 

1046 else: 

1047 if state["pending_values_vars"]: 

1048 qtxt = self._inject_values_clause(qtxt, state["pending_values_vars"], state["acc"]) 

1049 state["pending_values_vars"] = None 

1050 rows = self._run_query_dicts(endpoint_url, qtxt) 

1051 

1052 if state["acc"] is None: 

1053 state["acc"] = rows 

1054 elif state["pending_join"]: 

1055 lvar, rvar, how = state["pending_join"] 

1056 state["acc"] = self._join(state["acc"], rows, lvar, rvar, how) 

1057 state["pending_join"] = None 

1058 else: 

1059 raise ValueError("Multiple QUERY steps without an explicit @@join directive") 

1060 

1061 def _exec_multi_source(self, par_dict, content_type): 

1062 """Execute a multi-source query pipeline with @@ directives.""" 

1063 steps = self._parse_steps(self.i["sparql"], self.tp, par_dict) 

1064 

1065 state: dict[str, object] = { 

1066 "acc": None, 

1067 "pending_join": None, 

1068 "pending_values_vars": None, 

1069 "pending_foreach": None, 

1070 } 

1071 

1072 for st in steps: 

1073 tag = st[0] 

1074 

1075 if tag == "QUERY": 

1076 self._exec_multi_source_query_step(st[1], st[2], state) 

1077 elif tag == "JOIN": 

1078 state["pending_join"] = (st[1], st[2], st[3]) 

1079 elif tag == "REMOVE": 

1080 state["acc"] = self._drop_columns(state["acc"] or [], st[1]) 

1081 elif tag == "VALUES_INJECT": 

1082 state["pending_values_vars"] = st[1] 

1083 elif tag == "FOREACH": 

1084 state["pending_foreach"] = (st[1], st[2], st[3]) 

1085 else: 

1086 raise RuntimeError(f"Unknown step tag {tag}") 

1087 

1088 header = self._header_from_field_type(self.i, state["acc"] or []) 

1089 csv_rows = self._to_csv_rows(header, state["acc"] or []) 

1090 return self._finalize_result(csv_rows, content_type) 

1091 

1092 @staticmethod 

1093 def _format_error(sc, e, prefix=""): 

1094 """Format an error response tuple with traceback line info.""" 

1095 tb = e.__traceback__ 

1096 line = tb.tb_lineno if tb else "?" 

1097 msg = f"HTTP status code {sc}: {prefix}{type(e).__name__}: {e} (line {line})" 

1098 return sc, msg, "text/plain" 

1099 

1100 def exec(self, method="get", content_type="application/json"): 

1101 """This method takes in input the HTTP method to use for the call 

1102 and the content type to return, and execute the operation as indicated 

1103 in the specification file, by running (in the following order): 

1104 

1105 1. the methods to preprocess the query; 

1106 2. the SPARQL query related to the operation called, by using the parameters indicated in the URL; 

1107 3. the specification of all the types of the various rows returned; 

1108 4. the methods to postprocess the result; 

1109 5. the application of the filter to remove, filter, sort the result; 

1110 6. the removal of the types added at the step 3, so as to have a data structure ready to be returned; 

1111 7. the conversion in the format requested by the user.""" 

1112 str_method = method.lower() 

1113 if str_method not in self.i["method"].split(): 

1114 return 405, f"HTTP status code 405: '{str_method}' method not allowed", "text/plain", {} 

1115 

1116 try: 

1117 status, body, ctype = self._dispatch_exec(content_type) 

1118 except TimeoutError as e: 

1119 return *self._format_error(408, e, "request timeout - "), {} 

1120 except (TypeError, ValueError) as e: 

1121 return *self._format_error(400, e, "parameter in the request not compliant with the type specified - "), {} 

1122 except Exception as e: # noqa: BLE001 

1123 return *self._format_error(500, e, "something unexpected happened - "), {} 

1124 

1125 headers = {} 

1126 if self.pagination_info is not None: 

1127 link_header = build_link_header(self.pagination_info) 

1128 if link_header: 

1129 headers["Link"] = link_header 

1130 return status, body, ctype, headers 

1131 

1132 def _dispatch_exec(self, content_type): 

1133 """Dispatch to the appropriate execution path based on SPARQL text content.""" 

1134 par_dict = self._extract_params() 

1135 if self.addon is not None: 

1136 self.preprocess(par_dict, self.i, self.addon) 

1137 if self.custom_params: 

1138 self._apply_custom_preprocess_params(par_dict) 

1139 

1140 if self._cache is not None and "cache_disable" not in self.i: 

1141 q_string = parse_qs(quote(self.url_parsed.query, safe="&=")) 

1142 cached_table = self._cache.get(self._build_cache_key(q_string)) 

1143 if cached_table is not None: 

1144 return self._paginate_and_format(cached_table, q_string, content_type) 

1145 

1146 sparql_text = self.i["sparql"] 

1147 resolved_text = sparql_text 

1148 for param, val in par_dict.items(): 

1149 resolved_text = resolved_text.replace(f"[[{param}]]", str(val)) 

1150 

1151 if "@@" not in resolved_text: 

1152 if self.engine == "sparql-anything": 

1153 return self._exec_sparql_anything_single(par_dict, content_type) 

1154 return self._exec_standard_sparql(par_dict, content_type) 

1155 

1156 try: 

1157 return self._exec_multi_source(par_dict, content_type) 

1158 except ValueError as ve: 

1159 return 400, f"HTTP status code 400: {ve}", "text/plain" 

1160 except RuntimeError as re_err: 

1161 return 502, f"HTTP status code 502: {re_err}", "text/plain"