Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pkg/scraper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,18 @@ func (e ScrapeType) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}

// HTTPError represents an HTTP response error with its status code.
// Callers can inspect the StatusCode to decide whether to skip an item
// and continue with the rest of a batch (e.g. 404, 500) or whether the
// error was a rate-limit that exhausted all retries (429).
type HTTPError struct {
StatusCode int
}

func (e *HTTPError) Error() string {
return fmt.Sprintf("http error %d: %s", e.StatusCode, http.StatusText(e.StatusCode))
}

var (
// ErrMaxRedirects is returned if the max number of HTTP redirects are reached.
ErrMaxRedirects = errors.New("maximum number of HTTP redirects reached")
Expand Down
166 changes: 134 additions & 32 deletions pkg/scraper/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"os"
"regexp"
"strconv"
"strings"
"time"

Expand All @@ -25,18 +26,27 @@ import (

const scrapeDefaultSleep = time.Second * 2

const (
// maxRateLimitRetries is the maximum number of retries when receiving HTTP 429 responses.
maxRateLimitRetries = 5

// rateLimitBaseDelay is the initial backoff delay for 429 retries.
rateLimitBaseDelay = time.Second * 2

// rateLimitMaxDelay caps the exponential backoff to prevent excessively long waits.
rateLimitMaxDelay = time.Minute

// rateLimitTotalTimeout bounds the total wall-clock time for a single loadURL call
// including all retry delays, so that rate-limit retries don't run indefinitely.
rateLimitTotalTimeout = 5 * time.Minute
)

func loadURL(ctx context.Context, loadURL string, client *http.Client, def Definition, globalConfig GlobalConfig) (io.Reader, error) {
driverOptions := def.DriverOptions
if driverOptions != nil && driverOptions.UseCDP {
// get the page using chrome dp
return urlFromCDP(ctx, loadURL, *driverOptions, globalConfig)
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, loadURL, nil)
if err != nil {
return nil, err
}

jar, err := def.jar()
if err != nil {
return nil, fmt.Errorf("error creating cookie jar: %w", err)
Expand All @@ -47,44 +57,136 @@ func loadURL(ctx context.Context, loadURL string, client *http.Client, def Defin
return nil, fmt.Errorf("error parsing url %s: %w", loadURL, err)
}

// Fetch relevant cookies from the jar for url u and add them to the request
cookies := jar.Cookies(u)
for _, cookie := range cookies {
req.AddCookie(cookie)
}

userAgent := globalConfig.GetScraperUserAgent()
if userAgent != "" {
req.Header.Set("User-Agent", userAgent)
}

if driverOptions != nil { // setting the Headers after the UA allows us to override it from inside the scraper
for _, h := range driverOptions.Headers {
if h.Key != "" {
req.Header.Set(h.Key, h.Value)
logger.Debugf("[scraper] adding header <%s:%s>", h.Key, h.Value)
// Apply an overall deadline so retry delays don't run indefinitely.
ctx, cancel := context.WithTimeout(ctx, rateLimitTotalTimeout)
defer cancel()

for attempt := 0; ; attempt++ {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, loadURL, nil)
if err != nil {
return nil, err
}

// Fetch relevant cookies from the jar for url u and add them to the request
cookies := jar.Cookies(u)
for _, cookie := range cookies {
req.AddCookie(cookie)
}

if userAgent != "" {
req.Header.Set("User-Agent", userAgent)
}

if driverOptions != nil { // setting the Headers after the UA allows us to override it from inside the scraper
for _, h := range driverOptions.Headers {
if h.Key != "" {
req.Header.Set(h.Key, h.Value)
logger.Debugf("[scraper] adding header <%s:%s>", h.Key, h.Value)
}
}
}

resp, err := client.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode == http.StatusTooManyRequests {
resp.Body.Close()

// attempt counts from 0: attempt 0 is the initial request,
// attempts 1..maxRateLimitRetries are retries.
if attempt >= maxRateLimitRetries {
logger.Warnf("[scraper] rate limited on %s, all %d retries exhausted", loadURL, maxRateLimitRetries)
return nil, &HTTPError{StatusCode: resp.StatusCode}
}

delay, ok := rateLimitBackoff(resp, attempt)
if !ok {
logger.Warnf("[scraper] rate limited on %s, server requested wait exceeds maximum", loadURL)
return nil, &HTTPError{StatusCode: resp.StatusCode}
}
logger.Infof("[scraper] rate limited on %s (retry %d/%d), waiting %v", loadURL, attempt+1, maxRateLimitRetries, delay)

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(delay):
continue
}
}

if resp.StatusCode >= 400 {
resp.Body.Close()
return nil, &HTTPError{StatusCode: resp.StatusCode}
}

body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, err
}

bodyReader := bytes.NewReader(body)
printCookies(jar, def, "Jar cookies found for scraper urls")
return charset.NewReader(bodyReader, resp.Header.Get("Content-Type"))
}
}

resp, err := client.Do(req)
if err != nil {
return nil, err
// rateLimitBackoff calculates the delay before retrying a rate-limited request.
// The delay is the sum of the parsed Retry-After value (defaulting to
// rateLimitBaseDelay when absent) and an exponential backoff (2s, 4s, 8s, ...,
// capped at rateLimitMaxDelay). Returns ok=false if the server's Retry-After
// exceeds rateLimitMaxDelay, signalling that the caller should stop retrying.
func rateLimitBackoff(resp *http.Response, attempt int) (time.Duration, bool) {
retryAfter := parseRetryAfter(resp)

// If the server asks us to wait longer than our max, give up immediately.
if retryAfter > rateLimitMaxDelay {
return 0, false
}
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("http error %d:%s", resp.StatusCode, http.StatusText(resp.StatusCode))

// Exponential backoff: 2s, 4s, 8s, 16s, 32s, ...
// Guard against int64 overflow for large attempt values.
if attempt >= 30 {
return rateLimitMaxDelay, true
}
backoff := rateLimitBaseDelay << uint(attempt)

defer resp.Body.Close()
return clampDelay(retryAfter + backoff), true
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
// parseRetryAfter extracts a duration from the Retry-After header.
// Returns rateLimitBaseDelay if the header is absent or unparseable.
func parseRetryAfter(resp *http.Response) time.Duration {
retryAfter := resp.Header.Get("Retry-After")
if retryAfter == "" {
return rateLimitBaseDelay
}

// Try parsing as seconds
if seconds, err := strconv.Atoi(retryAfter); err == nil && seconds >= 0 {
return time.Duration(seconds) * time.Second
}

bodyReader := bytes.NewReader(body)
printCookies(jar, def, "Jar cookies found for scraper urls")
return charset.NewReader(bodyReader, resp.Header.Get("Content-Type"))
// Try parsing as HTTP-date
if t, err := http.ParseTime(retryAfter); err == nil {
if d := time.Until(t); d > 0 {
return d
}
}

return rateLimitBaseDelay
}

// clampDelay caps a duration to rateLimitMaxDelay.
func clampDelay(d time.Duration) time.Duration {
if d > rateLimitMaxDelay {
return rateLimitMaxDelay
}
return d
}

// func urlFromCDP uses chrome cdp and DOM to load and process the url
Expand Down
Loading