Coverage for oc_botwatch / classify.py: 100%
58 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 16:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 16:49 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import json
6import logging
7import re
8from pathlib import Path
10import polars as pl
12logger = logging.getLogger(__name__)
14BASE_DIR = Path(__file__).resolve().parent.parent
15INPUT_DIR = BASE_DIR / "input"
16OUTPUT_DIR = BASE_DIR / "output"
18_SKIP_LLM_NAMES: frozenset[str] = frozenset({"Spider", "Code"})
20_SUPPLEMENTARY_BOT_FILE = BASE_DIR / "supplementary_bots.txt"
22_SPARQL_HOST = "sparql.opencitations.net"
23_SPARQL_PATH_RE = r"/sparql(/|$|\?)"
24_SPARQL_QUERY_RE = r"\?query="
25_API_VERSIONED_PATH_RE = r"^/(index(/api)?|meta(/api)?)/v\d+/.+"
26_REDIRECT_CODES: frozenset[str] = frozenset({"301", "302", "303", "307", "308"})
29def _build_llm_pattern() -> str:
30 with (BASE_DIR / "ai-robots-txt" / "robots.json").open() as f:
31 data: dict[str, object] = json.load(f)
32 parts: list[str] = []
33 for name in data:
34 if name in _SKIP_LLM_NAMES:
35 continue
36 parts.append(rf"\b{re.escape(name)}\b")
37 return "(?i)" + "|".join(parts)
40def _build_generic_bot_pattern() -> str:
41 with (BASE_DIR / "crawler-user-agents" / "crawler-user-agents.json").open() as f:
42 crawlers = json.load(f)
43 with (BASE_DIR / "COUNTER-Robots" / "COUNTER_Robots_list.json").open() as f:
44 counter = json.load(f)
45 patterns: list[str] = []
46 for entry in crawlers:
47 if "tags" in entry and "ai-crawler" in entry["tags"]:
48 continue
49 patterns.append(entry["pattern"])
50 patterns.extend(entry["pattern"] for entry in counter)
51 patterns.extend(line for line in _SUPPLEMENTARY_BOT_FILE.read_text().splitlines() if line.strip())
52 return "(?i)" + "|".join(patterns)
55def _classify_service(host: pl.Expr, path: pl.Expr, method: pl.Expr) -> pl.Expr:
56 return (
57 pl.when(path.str.contains(_SPARQL_PATH_RE))
58 .then(pl.lit("sparql"))
59 .when((host == _SPARQL_HOST) & (path.str.contains(_SPARQL_QUERY_RE) | (method == "POST")))
60 .then(pl.lit("sparql"))
61 .when(path.str.contains(_API_VERSIONED_PATH_RE))
62 .then(pl.lit("api"))
63 .otherwise(pl.lit("web"))
64 )
67def classify_traffic(input_dir: Path = INPUT_DIR) -> pl.DataFrame:
68 llm_pat = _build_llm_pattern()
69 generic_pat = _build_generic_bot_pattern()
71 frames = [
72 pl.scan_csv(
73 f,
74 schema_overrides={
75 "user_agent": pl.Utf8,
76 "date": pl.Utf8,
77 "request_host": pl.Utf8,
78 "request_path": pl.Utf8,
79 "request_method": pl.Utf8,
80 "http_response_code": pl.Utf8,
81 },
82 ).select("date", "user_agent", "request_host", "request_path", "request_method", "http_response_code")
83 for f in sorted(input_dir.glob("*.csv"))
84 ]
86 return (
87 pl.concat(frames)
88 .with_columns(pl.col("date").str.slice(0, 10).alias("date"))
89 .filter(pl.col("date").str.contains(r"^\d{4}-\d{2}-\d{2}$"))
90 .filter(~pl.col("http_response_code").is_in(_REDIRECT_CODES))
91 .with_columns(
92 pl.when(pl.col("user_agent").str.contains(llm_pat))
93 .then(pl.lit("llm_bot"))
94 .when(pl.col("user_agent").str.contains(generic_pat))
95 .then(pl.lit("generic_bot"))
96 .otherwise(pl.lit("human"))
97 .alias("category"),
98 _classify_service(
99 pl.col("request_host"),
100 pl.col("request_path"),
101 pl.col("request_method"),
102 ).alias("service"),
103 )
104 .group_by("date", "category", "service")
105 .len()
106 .rename({"len": "count"})
107 .collect(engine="streaming")
108 .sort("date", "service", "category")
109 .select("date", "category", "service", "count")
110 )
113def _wide_from_long(long_df: pl.DataFrame) -> pl.DataFrame:
114 return (
115 long_df.group_by("date", "category")
116 .agg(pl.col("count").sum())
117 .pivot(on="category", index="date", values="count")
118 .fill_null(0)
119 .sort("date")
120 .select("date", "human", "generic_bot", "llm_bot")
121 )
124def main(input_dir: Path = INPUT_DIR, output_dir: Path = OUTPUT_DIR) -> None:
125 logging.basicConfig(level=logging.INFO)
126 long_df = classify_traffic(input_dir)
127 long_path = output_dir / "daily_traffic_by_service.csv"
128 long_df.write_csv(long_path)
129 logger.info("Output: %s", long_path)
131 wide_df = _wide_from_long(long_df)
132 wide_path = output_dir / "daily_traffic.csv"
133 wide_df.write_csv(wide_path)
134 logger.info("Output: %s", wide_path)
137if __name__ == "__main__":
138 main() # pragma: no cover