Coverage for oc_botwatch / classify.py: 100%

58 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 16:49 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import json 

6import logging 

7import re 

8from pathlib import Path 

9 

10import polars as pl 

11 

12logger = logging.getLogger(__name__) 

13 

14BASE_DIR = Path(__file__).resolve().parent.parent 

15INPUT_DIR = BASE_DIR / "input" 

16OUTPUT_DIR = BASE_DIR / "output" 

17 

18_SKIP_LLM_NAMES: frozenset[str] = frozenset({"Spider", "Code"}) 

19 

20_SUPPLEMENTARY_BOT_FILE = BASE_DIR / "supplementary_bots.txt" 

21 

22_SPARQL_HOST = "sparql.opencitations.net" 

23_SPARQL_PATH_RE = r"/sparql(/|$|\?)" 

24_SPARQL_QUERY_RE = r"\?query=" 

25_API_VERSIONED_PATH_RE = r"^/(index(/api)?|meta(/api)?)/v\d+/.+" 

26_REDIRECT_CODES: frozenset[str] = frozenset({"301", "302", "303", "307", "308"}) 

27 

28 

29def _build_llm_pattern() -> str: 

30 with (BASE_DIR / "ai-robots-txt" / "robots.json").open() as f: 

31 data: dict[str, object] = json.load(f) 

32 parts: list[str] = [] 

33 for name in data: 

34 if name in _SKIP_LLM_NAMES: 

35 continue 

36 parts.append(rf"\b{re.escape(name)}\b") 

37 return "(?i)" + "|".join(parts) 

38 

39 

40def _build_generic_bot_pattern() -> str: 

41 with (BASE_DIR / "crawler-user-agents" / "crawler-user-agents.json").open() as f: 

42 crawlers = json.load(f) 

43 with (BASE_DIR / "COUNTER-Robots" / "COUNTER_Robots_list.json").open() as f: 

44 counter = json.load(f) 

45 patterns: list[str] = [] 

46 for entry in crawlers: 

47 if "tags" in entry and "ai-crawler" in entry["tags"]: 

48 continue 

49 patterns.append(entry["pattern"]) 

50 patterns.extend(entry["pattern"] for entry in counter) 

51 patterns.extend(line for line in _SUPPLEMENTARY_BOT_FILE.read_text().splitlines() if line.strip()) 

52 return "(?i)" + "|".join(patterns) 

53 

54 

55def _classify_service(host: pl.Expr, path: pl.Expr, method: pl.Expr) -> pl.Expr: 

56 return ( 

57 pl.when(path.str.contains(_SPARQL_PATH_RE)) 

58 .then(pl.lit("sparql")) 

59 .when((host == _SPARQL_HOST) & (path.str.contains(_SPARQL_QUERY_RE) | (method == "POST"))) 

60 .then(pl.lit("sparql")) 

61 .when(path.str.contains(_API_VERSIONED_PATH_RE)) 

62 .then(pl.lit("api")) 

63 .otherwise(pl.lit("web")) 

64 ) 

65 

66 

67def classify_traffic(input_dir: Path = INPUT_DIR) -> pl.DataFrame: 

68 llm_pat = _build_llm_pattern() 

69 generic_pat = _build_generic_bot_pattern() 

70 

71 frames = [ 

72 pl.scan_csv( 

73 f, 

74 schema_overrides={ 

75 "user_agent": pl.Utf8, 

76 "date": pl.Utf8, 

77 "request_host": pl.Utf8, 

78 "request_path": pl.Utf8, 

79 "request_method": pl.Utf8, 

80 "http_response_code": pl.Utf8, 

81 }, 

82 ).select("date", "user_agent", "request_host", "request_path", "request_method", "http_response_code") 

83 for f in sorted(input_dir.glob("*.csv")) 

84 ] 

85 

86 return ( 

87 pl.concat(frames) 

88 .with_columns(pl.col("date").str.slice(0, 10).alias("date")) 

89 .filter(pl.col("date").str.contains(r"^\d{4}-\d{2}-\d{2}$")) 

90 .filter(~pl.col("http_response_code").is_in(_REDIRECT_CODES)) 

91 .with_columns( 

92 pl.when(pl.col("user_agent").str.contains(llm_pat)) 

93 .then(pl.lit("llm_bot")) 

94 .when(pl.col("user_agent").str.contains(generic_pat)) 

95 .then(pl.lit("generic_bot")) 

96 .otherwise(pl.lit("human")) 

97 .alias("category"), 

98 _classify_service( 

99 pl.col("request_host"), 

100 pl.col("request_path"), 

101 pl.col("request_method"), 

102 ).alias("service"), 

103 ) 

104 .group_by("date", "category", "service") 

105 .len() 

106 .rename({"len": "count"}) 

107 .collect(engine="streaming") 

108 .sort("date", "service", "category") 

109 .select("date", "category", "service", "count") 

110 ) 

111 

112 

113def _wide_from_long(long_df: pl.DataFrame) -> pl.DataFrame: 

114 return ( 

115 long_df.group_by("date", "category") 

116 .agg(pl.col("count").sum()) 

117 .pivot(on="category", index="date", values="count") 

118 .fill_null(0) 

119 .sort("date") 

120 .select("date", "human", "generic_bot", "llm_bot") 

121 ) 

122 

123 

124def main(input_dir: Path = INPUT_DIR, output_dir: Path = OUTPUT_DIR) -> None: 

125 logging.basicConfig(level=logging.INFO) 

126 long_df = classify_traffic(input_dir) 

127 long_path = output_dir / "daily_traffic_by_service.csv" 

128 long_df.write_csv(long_path) 

129 logger.info("Output: %s", long_path) 

130 

131 wide_df = _wide_from_long(long_df) 

132 wide_path = output_dir / "daily_traffic.csv" 

133 wide_df.write_csv(wide_path) 

134 logger.info("Output: %s", wide_path) 

135 

136 

137if __name__ == "__main__": 

138 main() # pragma: no cover