Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/binaries.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
needs:
- draft-release
env:
X_GO_DISTRIBUTION: "https://go.dev/dl/go1.23.8.linux-amd64.tar.gz"
X_GO_DISTRIBUTION: "https://go.dev/dl/go1.23.10.linux-amd64.tar.gz"
APIFIREWALL_NAMESPACE: "github.com/wallarm/api-firewall"
strategy:
matrix:
Expand Down Expand Up @@ -162,7 +162,7 @@ jobs:
needs:
- draft-release
env:
X_GO_VERSION: "1.23.8"
X_GO_VERSION: "1.23.10"
APIFIREWALL_NAMESPACE: "github.com/wallarm/api-firewall"
strategy:
matrix:
Expand Down Expand Up @@ -272,19 +272,19 @@ jobs:
include:
- arch: armv6
distro: bookworm
go_distribution: https://go.dev/dl/go1.23.8.linux-armv6l.tar.gz
go_distribution: https://go.dev/dl/go1.23.10.linux-armv6l.tar.gz
artifact: armv6-libc
- arch: aarch64
distro: bookworm
go_distribution: https://go.dev/dl/go1.23.8.linux-arm64.tar.gz
go_distribution: https://go.dev/dl/go1.23.10.linux-arm64.tar.gz
artifact: arm64-libc
- arch: armv6
distro: alpine_latest
go_distribution: https://go.dev/dl/go1.23.8.linux-armv6l.tar.gz
go_distribution: https://go.dev/dl/go1.23.10.linux-armv6l.tar.gz
artifact: armv6-musl
- arch: aarch64
distro: alpine_latest
go_distribution: https://go.dev/dl/go1.23.8.linux-arm64.tar.gz
go_distribution: https://go.dev/dl/go1.23.10.linux-arm64.tar.gz
artifact: arm64-musl
steps:
- uses: actions/checkout@v4
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VERSION := 0.9.1
VERSION := 0.9.2
NAMESPACE := github.com/wallarm/api-firewall

.DEFAULT_GOAL := build
Expand Down Expand Up @@ -43,7 +43,7 @@ stop_k6_tests:

run_k6_tests: stop_k6_tests
@docker compose -f resources/test/docker-compose-api-mode.yml up --build --detach --force-recreate
docker run --rm -i --network host grafana/k6 run --vus 100 --iterations 1200 -v - <resources/test/specification/script.js || true
docker run --rm -i --network host grafana/k6 run -v - <resources/test/specification/script.js || true
$(MAKE) stop_k6_tests

.PHONY: lint tidy test fmt build genmocks vulncheck run_k6_tests stop_k6_tests
25 changes: 24 additions & 1 deletion cmd/api-firewall/internal/handlers/api/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"strings"
"sync"
"syscall"
"time"

"github.com/google/uuid"
"github.com/rs/zerolog"
"github.com/savsgio/gotils/strconv"
"github.com/valyala/fasthttp"

"github.com/wallarm/api-firewall/internal/platform/metrics"
"github.com/wallarm/api-firewall/internal/platform/router"
"github.com/wallarm/api-firewall/internal/platform/storage"
"github.com/wallarm/api-firewall/internal/platform/web"
Expand All @@ -32,6 +34,7 @@ var (
type App struct {
Routers map[int]*router.Mux
Log zerolog.Logger
Metrics metrics.Metrics
passOPTIONS bool
maxErrorsInResponse int
shutdown chan os.Signal
Expand All @@ -41,7 +44,7 @@ type App struct {
}

// NewApp creates an App value that handle a set of routes for the set of application.
func NewApp(lock *sync.RWMutex, passOPTIONS bool, maxErrorsInResponse int, storedSpecs storage.DBOpenAPILoader, shutdown chan os.Signal, logger zerolog.Logger, mw ...web.Middleware) *App {
func NewApp(lock *sync.RWMutex, passOPTIONS bool, maxErrorsInResponse int, storedSpecs storage.DBOpenAPILoader, shutdown chan os.Signal, logger zerolog.Logger, pMetrics metrics.Metrics, mw ...web.Middleware) *App {

schemaIDs := storedSpecs.SchemaIDs()

Expand All @@ -56,6 +59,7 @@ func NewApp(lock *sync.RWMutex, passOPTIONS bool, maxErrorsInResponse int, store
shutdown: shutdown,
mw: mw,
Log: logger,
Metrics: pMetrics,
storedSpecs: storedSpecs,
lock: lock,
passOPTIONS: passOPTIONS,
Expand Down Expand Up @@ -138,6 +142,9 @@ func (a *App) APIModeMainHandler(ctx *fasthttp.RequestCtx) {
// handle panic
defer func() {
if r := recover(); r != nil {

a.Metrics.IncErrorTypeCounter("request processing error", 0)

a.Log.Error().Msgf("panic: %v", r)

// Log the Go stack trace for this panic'd goroutine.
Expand All @@ -149,10 +156,15 @@ func (a *App) APIModeMainHandler(ctx *fasthttp.RequestCtx) {
// Add request ID
ctx.SetUserValue(web.RequestID, uuid.NewString())

// Request handling start time
start := time.Now()

schemaIDs, notFoundSchemaIDs, err := getWallarmSchemaID(ctx, a.storedSpecs)
if err != nil {
defer web.LogRequestResponseAtTraceLevel(ctx, a.Log)

a.Metrics.IncErrorTypeCounter("schema not found", 0)

a.Log.Error().
Err(err).
Bytes("host", ctx.Request.Header.Host()).
Expand All @@ -161,6 +173,8 @@ func (a *App) APIModeMainHandler(ctx *fasthttp.RequestCtx) {
Interface("request_id", ctx.UserValue(web.RequestID)).
Msg("error while getting schema ID")

a.Metrics.IncHTTPRequestTotalCountOnly(0, fasthttp.StatusInternalServerError)

if err := web.RespondError(ctx, fasthttp.StatusInternalServerError, ""); err != nil {
a.Log.Error().
Err(err).
Expand Down Expand Up @@ -256,6 +270,8 @@ func (a *App) APIModeMainHandler(ctx *fasthttp.RequestCtx) {
continue
}

a.Metrics.IncErrorTypeCounter("request processing error", schemaIDs[i])

// Didn't receive the response code. It means that the router respond to the request because it was not valid.
// The API Firewall should respond by 500 status code in this case.
ctx.Response.Header.Reset()
Expand All @@ -274,6 +290,8 @@ func (a *App) APIModeMainHandler(ctx *fasthttp.RequestCtx) {

// Add schema IDs that were not found in the DB to the response
for i := 0; i < len(notFoundSchemaIDs); i++ {
a.Metrics.IncErrorTypeCounter("schema not found", notFoundSchemaIDs[i])
a.Metrics.IncHTTPRequestTotalCountOnly(notFoundSchemaIDs[i], fasthttp.StatusOK)
responseSummary = append(responseSummary, &validator.ValidationResponseSummary{
SchemaID: &notFoundSchemaIDs[i],
StatusCode: &statusInternalError,
Expand All @@ -288,6 +306,11 @@ func (a *App) APIModeMainHandler(ctx *fasthttp.RequestCtx) {
ctx.Request.Header.SetMethod(fasthttp.MethodGet)
}

// save http request count for each schema ID
for _, schemaID := range schemaIDs {
a.Metrics.IncHTTPRequestStat(start, schemaID, fasthttp.StatusOK)
}

// limit amount of errors to reduce the total size of the response
limitedResponseErrors := validator.SampleSlice(responseErrors, a.maxErrorsInResponse)

Expand Down
4 changes: 3 additions & 1 deletion cmd/api-firewall/internal/handlers/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/wallarm/api-firewall/internal/config"
"github.com/wallarm/api-firewall/internal/platform/loader"
"github.com/wallarm/api-firewall/internal/platform/metrics"
apiMode "github.com/wallarm/api-firewall/internal/platform/validator"
"github.com/wallarm/api-firewall/internal/platform/web"
"github.com/wallarm/api-firewall/pkg/APIMode/validator"
Expand All @@ -21,6 +22,7 @@ type RequestValidator struct {
Log zerolog.Logger
Cfg *config.APIMode
ParserPool *fastjson.ParserPool
Metrics metrics.Metrics
SchemaID int
}

Expand Down Expand Up @@ -55,7 +57,7 @@ func (s *RequestValidator) Handler(ctx *fasthttp.RequestCtx) error {
return nil
}

validationErrors, err := apiMode.APIModeValidateRequest(ctx, s.ParserPool, s.CustomRoute, s.Cfg.UnknownParametersDetection)
validationErrors, err := apiMode.APIModeValidateRequest(ctx, s.Metrics, s.SchemaID, s.ParserPool, s.CustomRoute, s.Cfg.UnknownParametersDetection)
if err != nil {
s.Log.Error().
Err(err).
Expand Down
8 changes: 5 additions & 3 deletions cmd/api-firewall/internal/handlers/api/routes.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package api

import (
"github.com/rs/zerolog"
"net/url"
"os"
"runtime/debug"
"sync"

"github.com/corazawaf/coraza/v3"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/valyala/fasthttp"
"github.com/valyala/fastjson"
Expand All @@ -16,11 +16,12 @@ import (
"github.com/wallarm/api-firewall/internal/mid"
"github.com/wallarm/api-firewall/internal/platform/allowiplist"
"github.com/wallarm/api-firewall/internal/platform/loader"
"github.com/wallarm/api-firewall/internal/platform/metrics"
"github.com/wallarm/api-firewall/internal/platform/storage"
"github.com/wallarm/api-firewall/internal/platform/web"
)

func Handlers(lock *sync.RWMutex, cfg *config.APIMode, shutdown chan os.Signal, logger zerolog.Logger, storedSpecs storage.DBOpenAPILoader, AllowedIPCache *allowiplist.AllowedIPsType, waf coraza.WAF) fasthttp.RequestHandler {
func Handlers(lock *sync.RWMutex, cfg *config.APIMode, shutdown chan os.Signal, logger zerolog.Logger, metrics metrics.Metrics, storedSpecs storage.DBOpenAPILoader, AllowedIPCache *allowiplist.AllowedIPsType, waf coraza.WAF) fasthttp.RequestHandler {

// handle panic
defer func() {
Expand Down Expand Up @@ -52,7 +53,7 @@ func Handlers(lock *sync.RWMutex, cfg *config.APIMode, shutdown chan os.Signal,
}

// Construct the App which holds all routes as well as common Middleware.
apps := NewApp(lock, cfg.PassOptionsRequests, cfg.MaxErrorsInResponse, storedSpecs, shutdown, logger, mid.IPAllowlist(&ipAllowlistOptions), mid.WAFModSecurity(&modSecOptions), mid.Logger(logger), mid.MIMETypeIdentifier(logger), mid.Errors(logger), mid.Panics(logger))
apps := NewApp(lock, cfg.PassOptionsRequests, cfg.MaxErrorsInResponse, storedSpecs, shutdown, logger, metrics, mid.IPAllowlist(&ipAllowlistOptions), mid.WAFModSecurity(&modSecOptions), mid.Logger(logger), mid.MIMETypeIdentifier(logger), mid.Errors(logger), mid.Panics(logger))

for _, schemaID := range schemaIDs {

Expand Down Expand Up @@ -89,6 +90,7 @@ func Handlers(lock *sync.RWMutex, cfg *config.APIMode, shutdown chan os.Signal,
ParserPool: &parserPool,
OpenAPIRouter: newSwagRouter,
SchemaID: schemaID,
Metrics: metrics,
}
updRoutePathEsc, err := url.JoinPath(serverURL.Path, newSwagRouter.Routes[i].Path)
if err != nil {
Expand Down
35 changes: 33 additions & 2 deletions cmd/api-firewall/internal/handlers/api/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/wallarm/api-firewall/internal/config"
"github.com/wallarm/api-firewall/internal/platform/allowiplist"
"github.com/wallarm/api-firewall/internal/platform/metrics"
"github.com/wallarm/api-firewall/internal/platform/storage"
"github.com/wallarm/api-firewall/internal/version"
)
Expand Down Expand Up @@ -105,10 +106,34 @@ func Run(logger zerolog.Logger) error {

zeroLogger := &config.ZerologAdapter{Logger: logger}

// =========================================================================
// Init Metrics

// make a channel to listen for errors coming from the metrics listener. Use a
// buffered channel so the goroutine can exit if we don't collect this error.
metricsErrors := make(chan error, 1)

options := metrics.Options{
EndpointName: cfg.Metrics.EndpointName,
Host: cfg.Metrics.Host,
ReadTimeout: cfg.Metrics.ReadTimeout,
WriteTimeout: cfg.Metrics.WriteTimeout,
}

metricsController := metrics.NewPrometheusMetrics(cfg.Metrics.Enabled)

if cfg.Metrics.Enabled {
go func() {
// Start the service listening for requests.
logger.Info().Msgf("Prometheus metrics: API listening on %s/%s", options.Host, options.EndpointName)
metricsErrors <- metricsController.StartService(&logger, &options)
}()
}

// =========================================================================
// Init Handlers

requestHandlers := Handlers(&dbLock, &cfg, shutdown, logger, specStorage, allowedIPCache, waf)
requestHandlers := Handlers(&dbLock, &cfg, shutdown, logger, metricsController, specStorage, allowedIPCache, waf)

// =========================================================================
// Start Health API Service
Expand Down Expand Up @@ -184,6 +209,9 @@ func Run(logger zerolog.Logger) error {
Bytes("method", ctx.Request.Header.Method()).
Msg("request processing error")

metricsController.IncHTTPRequestTotalCountOnly(0, fasthttp.StatusInternalServerError)
metricsController.IncErrorTypeCounter("request processing error", 0)

ctx.Error("", fasthttp.StatusInternalServerError)
},
Logger: zeroLogger,
Expand All @@ -195,7 +223,7 @@ func Run(logger zerolog.Logger) error {

updSpecErrors := make(chan error, 1)

updOpenAPISpec := NewHandlerUpdater(&dbLock, logger, specStorage, &cfg, &api, shutdown, &healthData, allowedIPCache, waf)
updOpenAPISpec := NewHandlerUpdater(&dbLock, logger, metricsController, specStorage, &cfg, &api, shutdown, &healthData, allowedIPCache, waf)

// disable updater if SpecificationUpdatePeriod == 0
if cfg.SpecificationUpdatePeriod.Seconds() > 0 {
Expand Down Expand Up @@ -228,6 +256,9 @@ func Run(logger zerolog.Logger) error {
case err := <-updSpecErrors:
return errors.Wrap(err, "regular updater error")

case err := <-metricsErrors:
return errors.Wrap(err, "metrics error")

case sig := <-shutdown:
logger.Info().Msgf("%s: %v: Start shutdown", logPrefix, sig)

Expand Down
7 changes: 5 additions & 2 deletions cmd/api-firewall/internal/handlers/api/updater.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"github.com/wallarm/api-firewall/internal/platform/metrics"
"os"
"runtime/debug"
"sync"
Expand Down Expand Up @@ -35,10 +36,11 @@ type Specification struct {
health *Health
lock *sync.RWMutex
allowedIPCache *allowiplist.AllowedIPsType
metrics metrics.Metrics
}

// NewHandlerUpdater function defines configuration updater controller
func NewHandlerUpdater(lock *sync.RWMutex, logger zerolog.Logger, sqlLiteStorage storage.DBOpenAPILoader, cfg *config.APIMode, api *fasthttp.Server, shutdown chan os.Signal, health *Health, allowedIPCache *allowiplist.AllowedIPsType, waf coraza.WAF) updater.Updater {
func NewHandlerUpdater(lock *sync.RWMutex, logger zerolog.Logger, metrics metrics.Metrics, sqlLiteStorage storage.DBOpenAPILoader, cfg *config.APIMode, api *fasthttp.Server, shutdown chan os.Signal, health *Health, allowedIPCache *allowiplist.AllowedIPsType, waf coraza.WAF) updater.Updater {
return &Specification{
logger: logger,
waf: waf,
Expand All @@ -51,6 +53,7 @@ func NewHandlerUpdater(lock *sync.RWMutex, logger zerolog.Logger, sqlLiteStorage
health: health,
lock: lock,
allowedIPCache: allowedIPCache,
metrics: metrics,
}
}

Expand Down Expand Up @@ -100,7 +103,7 @@ func (s *Specification) Run() {

s.lock.Lock()
s.sqlLiteStorage = newSpecDB
s.api.Handler = Handlers(s.lock, s.cfg, s.shutdown, s.logger, s.sqlLiteStorage, s.allowedIPCache, s.waf)
s.api.Handler = Handlers(s.lock, s.cfg, s.shutdown, s.logger, s.metrics, s.sqlLiteStorage, s.allowedIPCache, s.waf)
s.health.OpenAPIDB = s.sqlLiteStorage
if err := s.sqlLiteStorage.AfterLoad(s.cfg.PathToSpecDB); err != nil {
s.logger.Error().Err(err).Msgf("%s: error in after specification loading function", logPrefix)
Expand Down
3 changes: 2 additions & 1 deletion cmd/api-firewall/tests/main_api_mode_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/valyala/fasthttp"

handlersAPI "github.com/wallarm/api-firewall/cmd/api-firewall/internal/handlers/api"
"github.com/wallarm/api-firewall/internal/platform/metrics"
"github.com/wallarm/api-firewall/internal/platform/storage"
"github.com/wallarm/api-firewall/internal/platform/web"
)
Expand All @@ -35,7 +36,7 @@ func BenchmarkAPIModeBasic(b *testing.B) {
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)

handler := handlersAPI.Handlers(&lock, &cfg, shutdown, logger, specStorage, nil, nil)
handler := handlersAPI.Handlers(&lock, &cfg, shutdown, logger, metrics.NewPrometheusMetrics(false), specStorage, nil, nil)

p, err := json.Marshal(map[string]any{
"firstname": "test",
Expand Down
Loading