-
-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathjson2traefik.py
More file actions
154 lines (124 loc) · 5.81 KB
/
json2traefik.py
File metadata and controls
154 lines (124 loc) · 5.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import os
import json
import re
import logging
from pathlib import Path
from typing import List, Dict, Set, Tuple, Optional
from functools import lru_cache
# --- Configuration ---
LOG_LEVEL = logging.INFO # DEBUG, INFO, WARNING, ERROR
INPUT_FILE = Path(os.getenv("INPUT_FILE", "owasp_rules.json"))
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "waf_patterns/traefik"))
MIDDLEWARE_FILE = OUTPUT_DIR / "middleware.toml"
# Unsupported patterns (for Traefik's badbot plugin, which uses regex)
UNSUPPORTED_PATTERNS = [
"@pmFromFile", # No file lookups
# Add other unsupported operators/patterns here.
]
# --- Logging Setup ---
logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
@lru_cache(maxsize=256)
def validate_regex(pattern: str) -> bool:
"""Validates a regex pattern."""
try:
re.compile(pattern)
return True
except re.error as e:
logger.warning(f"Invalid regex: {pattern} - {e}")
return False
def _sanitize_pattern(pattern: str) -> str:
"""Internal helper for pattern sanitization."""
pattern = pattern.replace("@rx ", "").strip()
pattern = re.sub(r"\(\?i\)", "", pattern) # Remove case-insensitive flag
# Convert $ to \$
pattern = pattern.replace("$", r"\$")
# Convert { or { to {
pattern = re.sub(r"&l(?:brace|cub);?", r"{", pattern)
pattern = re.sub(r"&r(?:brace|cub);?", r"}", pattern)
# Remove unnecessary \.*
pattern = re.sub(r"\\\.\*", r"\.*", pattern)
pattern = re.sub(r"(?<!\\)\.(?![\w])", r"\.", pattern) # Escape dots
# Replace non-capturing groups (?:...) with capturing groups (...)
pattern = re.sub(r"\(\?:", "(", pattern)
return pattern
def sanitize_pattern(pattern: str) -> Optional[str]:
"""Sanitizes a pattern for use with Traefik's badbot plugin."""
for unsupported in UNSUPPORTED_PATTERNS:
if unsupported in pattern:
logger.warning(f"Skipping unsupported pattern: {pattern}")
return None
# if it is not a string comparison we use regex
if not any(op in pattern for op in ["@streq", "@contains", "!@eq", "!@within", "@lt", "@ge", "@gt", "@eq", "@ipMatch", "@endsWith"]):
return _sanitize_pattern(pattern) # return the regex
else: # if it is not a regex
return None
def generate_traefik_conf(rules: List[Dict]) -> None:
"""Generates the Traefik middleware configuration (middleware.toml)."""
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
try:
with open(MIDDLEWARE_FILE, "w", encoding="utf-8") as f:
f.write("[http.middlewares]\n\n")
# Group rules by category AND location. This is important!
categorized_rules: Dict[str, Dict[str, Set[str]]] = {}
for rule in rules:
rule_id = rule.get("id", "no_id")
category = rule.get("category", "generic").lower()
location = rule.get("location", "user-agent").lower() # default value!
pattern = rule["pattern"]
severity = rule.get("severity", "medium").lower() # default
# Sanitize, but *only* if the location is User-Agent.
# We *don't* want to apply regexes to other locations here.
if location == "user-agent":
sanitized_pattern = sanitize_pattern(pattern)
if not sanitized_pattern or not validate_regex(sanitized_pattern):
continue # skip
else:
logger.warning(f"Skipping rule with unsupported location '{location}' for Traefik: {rule_id}")
continue
# Initialize category/location if needed
if category not in categorized_rules:
categorized_rules[category] = {}
if location not in categorized_rules[category]:
categorized_rules[category][location] = set() # Use a set
# Add the *escaped* pattern to the set.
categorized_rules[category][location].add(sanitized_pattern)
# Write the configuration
for category, location_rules in categorized_rules.items():
for location, patterns in location_rules.items():
# Create a unique middleware name
middleware_name = f"waf_{category}_{location}".replace("-", "_")
f.write(f"[http.middlewares.{middleware_name}]\n")
f.write(f" [http.middlewares.{middleware_name}.plugin.badbot]\n")
f.write(" userAgent = [\n")
# Properly escape for TOML (and for regex within the string)
for pattern in patterns:
# No extra escape for TOML, because we write the full regex
f.write(f' "{pattern}",\n')
f.write(" ]\n\n")
logger.info(f"Generated Traefik middleware file: {MIDDLEWARE_FILE}")
except OSError as e:
logger.error(f"Error writing to {MIDDLEWARE_FILE}: {e}")
raise
def load_owasp_rules(file_path: Path) -> List[Dict]:
"""Loads OWASP rules from a JSON file."""
try:
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError, OSError) as e:
logger.error(f"Error loading rules from {file_path}: {e}")
raise
def main():
"""Main function."""
try:
logger.info("Loading OWASP rules...")
owasp_rules = load_owasp_rules(INPUT_FILE)
logger.info(f"Loaded {len(owasp_rules)} rules.")
logger.info("Generating Traefik WAF configuration...")
generate_traefik_conf(owasp_rules)
logger.info("Traefik WAF generation complete.")
except Exception as e:
logger.critical(f"Script failed: {e}")
exit(1)
if __name__ == "__main__":
main()