Source code for time_agnostic_library.agnostic_query

#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2022-2025, Arcangelo Massari <arcangelo.massari@unibo.it>
#
# Permission to use, copy, modify, and/or distribute this software for any purpose
# with or without fee is hereby granted, provided that the above copyright notice
# and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
# SOFTWARE.


import json
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from typing import Dict, List, Set, Tuple, Union

from rdflib import ConjunctiveGraph, Graph, Literal, URIRef, Variable
from rdflib.paths import InvPath
from rdflib.plugins.sparql.parser import parseUpdate
from rdflib.plugins.sparql.parserutils import CompValue
from rdflib.plugins.sparql.processor import prepareQuery
from time_agnostic_library.agnostic_entity import (
    AgnosticEntity, _filter_timestamps_by_interval)
from time_agnostic_library.prov_entity import ProvEntity
from time_agnostic_library.sparql import Sparql
from time_agnostic_library.support import convert_to_datetime
from tqdm import tqdm

CONFIG_PATH = "./config.json"


[docs] class AgnosticQuery(object): def __init__(self, query: str, on_time: Tuple[Union[str, None]] = (None, None), other_snapshots: bool = False, config_path: str = CONFIG_PATH, config_dict: dict = None): self.query = query self.other_snapshots = other_snapshots self.config_path = config_path self.other_snapshots_metadata = dict() if config_dict is not None: self.config = config_dict else: with open(config_path, encoding="utf8") as json_file: self.config = json.load(json_file) self.__init_text_index(self.config) if on_time: after_time = convert_to_datetime(on_time[0], stringify=True) before_time = convert_to_datetime(on_time[1], stringify=True) self.on_time = (after_time, before_time) else: self.on_time = None self.reconstructed_entities = set() self.vars_to_explicit_by_time:Dict[str, Set[Tuple]] = dict() self.relevant_entities_graphs:Dict[URIRef, Dict[str, ConjunctiveGraph]] = dict() self.relevant_graphs:Dict[str, Union[ConjunctiveGraph, Set]] = dict() self.cache_insert_queries = list() self._rebuild_relevant_graphs() def __init_text_index(self, config:dict): for full_text_search in {"blazegraph_full_text_search", "fuseki_full_text_search", "virtuoso_full_text_search"}: ts_full_text_search:str = config[full_text_search] if ts_full_text_search.lower() in {"true", "1", 1, "t", "y", "yes", "ok"}: setattr(self, full_text_search, True) elif ts_full_text_search.lower() in {"false", "0", 0, "n", "f", "no"} or not ts_full_text_search: setattr(self, full_text_search, False) else: raise ValueError(f"Enter a valid value for '{full_text_search}' in the configuration file, for example 'yes' or 'no'.") self.graphdb_connector_name = config["graphdb_connector_name"] if len([index for index in [self.blazegraph_full_text_search, self.fuseki_full_text_search, self.virtuoso_full_text_search, self.graphdb_connector_name] if index]) > 1: raise ValueError("The use of multiple indexing systems simultaneously is currently not supported.") def _process_query(self) -> List[Tuple]: algebra:CompValue = prepareQuery(self.query).algebra if algebra.name != "SelectQuery": raise ValueError("Only SELECT queries are allowed.") triples = list() # The algebra can be extremely variable in case of one or more OPTIONAL in the query: # it is necessary to navigate the dictionary recursively in search of the values of the "triples" keys. self._tree_traverse(algebra, "triples", triples) triples_without_hook = [triple for triple in triples if isinstance(triple[0], Variable) and isinstance(triple[1], Variable) and isinstance(triple[2], Variable)] if triples_without_hook: raise ValueError("Could not perform a generic time agnostic query. Please, specify at least one URI or Literal within the query.") return triples def _tree_traverse(self, tree:dict, key:str, values:List[Tuple]) -> None: for k, v in tree.items(): if k == key: values.extend(v) elif isinstance(v, dict): found = self._tree_traverse(v, key, values) if found is not None: values.extend(found) def _rebuild_relevant_graphs(self) -> None: # First, the graphs of the hooks are reconstructed triples_checked = set() all_isolated = True self.triples = self._process_query() for triple in self.triples: if self._is_isolated(triple) and self._is_a_new_triple(triple, triples_checked): query_to_identify = self._get_query_to_identify(triple) present_results = Sparql(query_to_identify, self.config).run_construct_query() present_entities = {result[0] for result in present_results} self._rebuild_relevant_entity(triple[0]) self._find_entities_in_update_queries(triple, present_entities) else: all_isolated = False self._rebuild_relevant_entity(triple[0]) triples_checked.add(triple) self._align_snapshots() # Then, the graphs of the entities obtained from the hooks are reconstructed if not all_isolated: self._solve_variables() def _is_isolated(self, triple:tuple) -> bool: if isinstance(triple[0], URIRef): return False variables = [el for el in triple if isinstance(el, Variable)] if not variables: return False for variable in variables: other_triples = {t for t in self.triples if t != triple} if self._there_is_transitive_closure(variable, other_triples): return False return True def _there_is_transitive_closure(self, variable:Variable, triples:Set[Tuple]) -> bool: there_is_transitive_closure = False for triple in triples: if variable in triple and triple.index(variable) == 2: if isinstance(triple[0], URIRef): return True elif isinstance(triple[0], Variable): other_triples = {t for t in triples if t != triple} there_is_transitive_closure = self._there_is_transitive_closure(triple[0], other_triples) return there_is_transitive_closure def _rebuild_relevant_entity(self, entity:Union[URIRef, Literal]) -> None: if isinstance(entity, URIRef) and entity not in self.reconstructed_entities: self.reconstructed_entities.add(entity) agnostic_entity = AgnosticEntity(entity, config=self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False) if self.on_time: entity_graphs, entity_snapshots, other_snapshots = agnostic_entity.get_state_at_time(time=self.on_time, include_prov_metadata=self.other_snapshots) if other_snapshots: self.other_snapshots_metadata.update(other_snapshots) if entity_graphs: for relevant_timestamp, cg in entity_graphs.items(): self.relevant_entities_graphs.setdefault(entity, dict())[relevant_timestamp] = cg else: entity_history = agnostic_entity.get_history(include_prov_metadata=True) if entity_history[0][entity]: self.relevant_entities_graphs.update(entity_history[0]) def _get_query_to_identify(self, triple:list) -> str: solvable_triple = [el.n3() for el in triple] if isinstance(triple[1], InvPath): predicate = solvable_triple[1].replace("^", "", 1) query_to_identify = f""" CONSTRUCT {{{solvable_triple[2]} {predicate} {solvable_triple[0]}}} WHERE {{{solvable_triple[0]} {solvable_triple[1]} {solvable_triple[2]}}} """ elif isinstance(triple[1], URIRef) or isinstance(triple[1], Variable): query_to_identify = f""" CONSTRUCT {{{solvable_triple[0]} {solvable_triple[1]} {solvable_triple[2]}}} WHERE {{{solvable_triple[0]} {solvable_triple[1]} {solvable_triple[2]}}} """ return query_to_identify def _is_a_new_triple(self, triple:tuple, triples_checked:set) -> bool: for triple_checked in triples_checked: uris_in_triple = {el for el in triple if isinstance(el, URIRef)} uris_in_triple_checked = {el for el in triple_checked if isinstance(el, URIRef)} new_uris = uris_in_triple.difference(uris_in_triple_checked) if not new_uris: return False return True def _get_query_to_update_queries(self, triple:tuple) -> str: uris_in_triple = {el for el in triple if isinstance(el, URIRef)} query_to_identify = self.get_full_text_search(uris_in_triple) return query_to_identify def _find_entities_in_update_queries(self, triple:tuple, present_entities:set): def _process_triple(processed_triple, uris_in_triple: set, relevant_entities_found: set): processed_triple = [el["string"] if "string" in el else el for el in processed_triple] relevant_entities = {processed_triple[0]} if len(uris_in_triple.intersection(processed_triple)) == len(uris_in_triple) else None if relevant_entities is not None: relevant_entities_found.update(relevant_entities) uris_in_triple = {el for el in triple if isinstance(el, URIRef)} relevant_entities_found = present_entities query_to_identify = self._get_query_to_update_queries(triple) results = Sparql(query_to_identify, self.config).run_select_query() bindings = results['results']['bindings'] if bindings: for result in bindings: update_query = result.get('updateQuery') if update_query and update_query.get('value'): try: update = parseUpdate(update_query['value']) except Exception as e: print(e) print(update_query['value']) raise for request in update["request"]: if "quadsNotTriples" in request["quads"]: for quadsNotTriples in request["quads"]["quadsNotTriples"]: for inner_triple in quadsNotTriples["triples"]: _process_triple(inner_triple, uris_in_triple, relevant_entities_found) elif "triples" in request["quads"]: for inner_triple in request["quads"]["triples"]: _process_triple(inner_triple, uris_in_triple, relevant_entities_found) if relevant_entities_found: print(f"[VersionQuery:INFO] Rebuilding relevant entities' history.") pbar = tqdm(total=len(relevant_entities_found)) for new_entity_found in relevant_entities_found: self._rebuild_relevant_entity(new_entity_found) pbar.update() # with ThreadPoolExecutor() as executor: # results = [executor.submit(self._rebuild_relevant_entity, new_entity_found) for new_entity_found in relevant_entities_found] # for _ in as_completed(results): # pbar.update() pbar.close() def _solve_variables(self) -> None: self.vars_to_explicit_by_time = dict() self._get_vars_to_explicit_by_time() while self._there_are_variables(): solved_variables = self._explicit_solvable_variables() self._align_snapshots() if not solved_variables: return self._update_vars_to_explicit(solved_variables) self._get_vars_to_explicit_by_time() def _there_are_variables(self) -> bool: for _, triples in self.vars_to_explicit_by_time.items(): for triple in triples: vars = [el for el in triple if isinstance(el, Variable)] if vars: return True return False def _explicit_solvable_variables(self) -> Dict[str, Dict[str, str]]: explicit_triples:Dict[str, Dict[str, set]] = dict() for se, triples in self.vars_to_explicit_by_time.items(): for triple in triples: variables = [el for el in triple if isinstance(el, Variable)] if len(variables) == 1: solvable_triple = [el.n3() for el in triple] variable = variables[0] variable_index = triple.index(variable) if variable_index == 2: query_to_identify = f""" CONSTRUCT {{{solvable_triple[0]} {solvable_triple[1]} {solvable_triple[2]}}} WHERE {{{solvable_triple[0]} {solvable_triple[1]} {solvable_triple[2]}.}} """ results = self.relevant_graphs[se].query(query_to_identify) for result in results: explicit_triples.setdefault(se, dict()) explicit_triples[se].setdefault(variable, set()) explicit_triples[se][variable].add(result) with ThreadPoolExecutor() as executor: executor.map(self._rebuild_relevant_entity, [result[variable_index] for result in results]) return explicit_triples def _align_snapshots(self) -> None: # Merge entities based on snapshots for _, snapshots in self.relevant_entities_graphs.items(): for snapshot, graph in snapshots.items(): if snapshot in self.relevant_graphs: for quad in graph.quads(): self.relevant_graphs[snapshot].add(quad) else: self.relevant_graphs[snapshot] = deepcopy(graph) # To copy the entity two conditions must be met: # 1) the entity is present in tn but not in tn+1; # 2) the entity is absent in tn+1 because it has not changed and not because it has been deleted. ordered_data = self._sort_relevant_graphs() for index, se_cg in enumerate(ordered_data): se = se_cg[0] cg = se_cg[1] if index > 0: previous_se = ordered_data[index-1][0] for subject in self.relevant_graphs[previous_se].subjects(): if (subject, None, None, None) not in cg: if subject in self.relevant_entities_graphs: if se not in self.relevant_entities_graphs[subject]: graph_to_cache = Graph() for quad in self.relevant_graphs[previous_se].quads((subject, None, None, None)): self.relevant_graphs[se].add(quad) graph_to_cache.add(quad[:3]) def _sort_relevant_graphs(self): ordered_data: List[Tuple[str, ConjunctiveGraph]] = sorted( self.relevant_graphs.items(), key=lambda x: convert_to_datetime(x[0]), reverse=False # That is, from past to present, assuming that the past influences the present and not the opposite like in Dark ) return ordered_data def _update_vars_to_explicit(self, solved_variables:Dict[str, Dict[Variable, Set[Tuple]]]): vars_to_explicit_by_time:Dict[str, Dict[Variable, set]] = dict() for se, triples in self.vars_to_explicit_by_time.items(): vars_to_explicit_by_time.setdefault(se, set()) new_triples = set() for triple in triples: if se in solved_variables: for solved_var, solved_triples in solved_variables[se].items(): if solved_var in triple: for solved_triple in solved_triples: new_triple = None if solved_triple[0] != triple[0] and solved_triple[1] == triple[1]: continue elif solved_triple[0] == triple[0] and solved_triple[1] == triple[1]: new_triple = solved_triple else: new_triple = (solved_triple[2], triple[1], triple[2]) new_triples.add(new_triple) elif not any(isinstance(el, Variable) for el in triple): new_triples.add(triple) elif not any(var for var in solved_variables[se] if var in triple): new_triples.add(triple) vars_to_explicit_by_time[se] = new_triples self.vars_to_explicit_by_time = vars_to_explicit_by_time def _get_vars_to_explicit_by_time(self) -> None: for se, _ in self.relevant_graphs.items(): if se not in self.vars_to_explicit_by_time: self.vars_to_explicit_by_time[se] = set() for triple in self.triples: if any(el for el in triple if isinstance(el, Variable) and not self._is_a_dead_end(el, triple)) and not self._is_isolated(triple): self.vars_to_explicit_by_time[se].add(triple) def _is_a_dead_end(self, el:Union[URIRef, Variable, Literal], triple:tuple) -> bool: if isinstance(el, Variable): if triple.index(el) == 2 and not any(triple for triple in self.triples if el in triple if triple.index(el) == 0): return True return False
[docs] class VersionQuery(AgnosticQuery): """ This class allows time-travel queries, both on a single version and all versions of the dataset. :param query: The SPARQL query string. :type query: str :param on_time: If you want to query a specific version, specify the time interval here. The format is (START, END). If one of the two values is None, only the other is considered. Finally, the time can be specified using any existing standard. :type on_time: Tuple[Union[str, None]], optional :param config_path: The path to the configuration file. :type config_path: str, optional """ def __init__(self, query:str, on_time:Tuple[Union[str, None]]="", other_snapshots=False, config_path:str=CONFIG_PATH, config_dict=None): super(VersionQuery, self).__init__(query, on_time, other_snapshots, config_path, config_dict) def _query_reconstructed_graph(self, timestamp:str, graph:ConjunctiveGraph) -> tuple: output = set() query_results = graph.query(self.query) vars_list = query_results.vars results = query_results.bindings for result in results: Sparql._get_tuples_set(result, output, vars_list) normalized_timestamp = convert_to_datetime(timestamp, stringify=True) return normalized_timestamp, output
[docs] def run_agnostic_query(self) -> Tuple[Dict[str, Set[Tuple]], dict]: """ Run the query provided as a time-travel query. If the **on_time** argument was specified, it runs on versions within the specified interval, on all versions otherwise. :returns Dict[str, Set[Tuple]] -- The output is a dictionary in which the keys are the snapshots relevant to that query. The values correspond to sets of tuples containing the query results at the time specified by the key. The positional value of the elements in the tuples is equivalent to the variables indicated in the query. """ agnostic_result:dict[str, Set[Tuple]] = dict() with ThreadPoolExecutor() as executor: for future in [executor.submit(self._query_reconstructed_graph, timestamp, graph) for timestamp, graph in self.relevant_graphs.items()]: normalized_timestamp, output = future.result() agnostic_result[normalized_timestamp] = output agnostic_result = {timestamp:{tuple(Literal(el, datatype=None) if isinstance(el, Literal) else el for el in result_tuple) for result_tuple in results} for timestamp, results in agnostic_result.items()} return agnostic_result, {data["generatedAtTime"] for _, data in self.other_snapshots_metadata.items()}
[docs] class DeltaQuery(AgnosticQuery): """ This class allows single time and cross-time delta structured queries. :param query: A SPARQL query string. It is useful to identify the entities whose change you want to investigate. :type query: str :param on_time: If you want to query specific snapshots, specify the time interval here. The format is (START, END). If one of the two values is None, only the other is considered. Finally, the time can be specified using any existing standard. :type on_time: Tuple[Union[str, None]], optional :param changed_properties: A set of properties. It narrows the field to those entities where the properties specified in the set have changed. :type changed_properties: Set[str], optional :param config_path: The path to the configuration file. :type config_path: str, optional """ def __init__(self, query:str, on_time:Tuple[Union[str, None]]=(), changed_properties:Set[str]=set(), config_path:str = CONFIG_PATH, config_dict=None): super(DeltaQuery, self).__init__(query=query, on_time=on_time, config_path=config_path, config_dict=config_dict) self.changed_properties = changed_properties def _rebuild_relevant_graphs(self) -> None: # First, the graphs of the hooks are reconstructed triples_checked = set() self.triples = self._process_query() for triple in self.triples: if self._is_isolated(triple) and self._is_a_new_triple(triple, triples_checked): query_to_identify = self._get_query_to_identify(triple) present_results = Sparql(query_to_identify, self.config).run_construct_query() pbar = tqdm(total=len(present_results)) for result in present_results: if isinstance(result[0], URIRef): self.reconstructed_entities.add(result[0]) pbar.update() pbar.close() if isinstance(triple[0], URIRef): self.reconstructed_entities.add(triple[0]) self._find_entities_in_update_queries(triple) else: self._rebuild_relevant_entity(triple[0]) triples_checked.add(triple) self._align_snapshots() # Then, the graphs of the entities obtained from the hooks are reconstructed self._solve_variables() def _find_entities_in_update_queries(self, triple:tuple): uris_in_triple = {el for el in triple if isinstance(el, URIRef)} relevant_entities_found = set() query_to_identify = self._get_query_to_update_queries(triple) results = Sparql(query_to_identify, self.config).run_select_query() bindings = results['results']['bindings'] if bindings: for result in bindings: update_query = result.get('updateQuery') if update_query and update_query.get('value'): update = parseUpdate(update_query['value']) for request in update["request"]: for quadsNotTriples in request["quads"]["quadsNotTriples"]: for triple in quadsNotTriples["triples"]: triple = [el["string"] if "string" in el else el for el in triple] relevant_entities = set(triple).difference(uris_in_triple) if len(uris_in_triple.intersection(triple)) == len(uris_in_triple) else None if relevant_entities is not None: relevant_entities_found.update(relevant_entities) for relevant_entity_found in relevant_entities_found: if isinstance(relevant_entity_found, URIRef): self.reconstructed_entities.add(relevant_entity_found) def _get_query_to_identify(self, triple:list) -> str: solvable_triple = [el.n3() for el in triple] if isinstance(triple[1], InvPath): predicate = solvable_triple[1].replace("^", "", 1) query_to_identify = f""" CONSTRUCT {{{solvable_triple[2]} {predicate} {solvable_triple[0]}}} WHERE {{{solvable_triple[0]} {solvable_triple[1]} {solvable_triple[2]}}} """ elif isinstance(triple[1], URIRef) or isinstance(triple[1], Variable): query_to_identify = f""" CONSTRUCT {{{solvable_triple[0]} {solvable_triple[1]} {solvable_triple[2]}}} WHERE {{{solvable_triple[0]} {solvable_triple[1]} {solvable_triple[2]}}} """ return query_to_identify def _get_query_to_update_queries(self, triple:tuple) -> str: uris_in_triple = {el for el in triple if isinstance(el, URIRef)} query_to_identify = self.get_full_text_search(uris_in_triple) return query_to_identify def __identify_changed_entities(self, identified_entity: URIRef): output = dict() query = f""" SELECT DISTINCT ?time ?updateQuery ?description WHERE {{ ?se <{ProvEntity.iri_specialization_of}> <{identified_entity}>; <{ProvEntity.iri_generated_at_time}> ?time; <{ProvEntity.iri_description}> ?description. OPTIONAL {{ ?se <{ProvEntity.iri_has_update_query}> ?updateQuery. }} }} """ query_existence = f""" ASK WHERE {{ <{identified_entity}> ?p ?o. }} """ # Run the query and get the bindings results = Sparql(query, self.config).run_select_query() bindings = results['results']['bindings'] if bindings: # Filter timestamps by interval relevant_results = _filter_timestamps_by_interval(self.on_time, bindings, time_index='time') if relevant_results: identified_entity_str = str(identified_entity) output[identified_entity_str] = { "created": None, "modified": dict(), "deleted": None } # Sort the results by time results_sorted = sorted(bindings, key=lambda x: convert_to_datetime(x['time']['value'])) creation_date = convert_to_datetime(results_sorted[0]['time']['value'], stringify=True) # Check if the entity exists exists = Sparql(query_existence, self.config).run_ask_query() if not exists: # Entity has been deleted deletion_time = convert_to_datetime(results_sorted[-1]['time']['value'], stringify=True) output[identified_entity_str]["deleted"] = deletion_time for result_binding in relevant_results: time = convert_to_datetime(result_binding['time']['value'], stringify=True) if time != creation_date: update_query = result_binding.get('updateQuery', {}).get('value') description = result_binding.get('description', {}).get('value') if update_query and self.changed_properties: for changed_property in self.changed_properties: if changed_property in update_query: output[identified_entity_str]["modified"][time] = update_query elif update_query and not self.changed_properties: output[identified_entity_str]["modified"][time] = update_query elif not update_query and not self.changed_properties: output[identified_entity_str]["modified"][time] = description else: output[identified_entity_str]["created"] = creation_date return output
[docs] def run_agnostic_query(self) -> Tuple[Dict[str, Dict[str, str]], dict]: """ Queries the deltas relevant to the query and the properties set in the specified time interval. If no property was indicated, any changes are considered. If no time interval was selected, the whole dataset's history is considered. The output has the following format: :: { RES_URI_1: { "created": TIMESTAMP_CREATION, "modified": { TIMESTAMP_1: UPDATE_QUERY_1, TIMESTAMP_2: UPDATE_QUERY_2, TIMESTAMP_N: UPDATE_QUERY_N }, "deleted": TIMESTAMP_DELETION }, RES_URI_2: { "created": TIMESTAMP_CREATION, "modified": { TIMESTAMP_1: UPDATE_QUERY_1, TIMESTAMP_2: UPDATE_QUERY_2, TIMESTAMP_N: UPDATE_QUERY_N }, "deleted": TIMESTAMP_DELETION }, RES_URI_N: { "created": TIMESTAMP_CREATION, "modified": { TIMESTAMP_1: UPDATE_QUERY_1, TIMESTAMP_2: UPDATE_QUERY_2, TIMESTAMP_N: UPDATE_QUERY_N }, "deleted": TIMESTAMP_DELETION }, } :returns Dict[str, Set[Tuple]] -- The output is a dictionary that reports the modified entities, when they were created, modified, and deleted. Changes are reported as SPARQL UPDATE queries. If the entity was not created or deleted within the indicated range, the "created" or "deleted" value is None. On the other hand, if the entity does not exist within the input interval, the "modified" value is an empty dictionary. """ output = dict() print(f"[DeltaQuery:INFO] Identifying changed entities.") pbar = tqdm(total=len(self.reconstructed_entities)) with ThreadPoolExecutor() as executor: futures = executor.map(self.__identify_changed_entities, self.reconstructed_entities) for future in futures: output.update(future) pbar.update() pbar.close() return output
[docs] def get_insert_query(graph_iri: URIRef, data: Graph) -> Tuple[str, int]: num_of_statements: int = len(data) if num_of_statements <= 0: return "", 0 else: statements: str = data.serialize(format="nt11", encoding="utf-8") \ .decode("utf-8") \ .replace('\n\n', '') insert_string: str = f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}" return insert_string, num_of_statements