Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ void set_curl_mhandle(CURLM *curl_mhandle, LoopState *lstate){
EREPORT_CURL_MULTI_SETOPT(curl_mhandle, CURLMOPT_TIMERDATA, lstate);
}

void delete_expired_responses(char *ttl, int batch_size){
uint64 delete_expired_responses(char *ttl, int batch_size){
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
Expand All @@ -160,6 +160,8 @@ void delete_expired_responses(char *ttl, int batch_size){
, Int32GetDatum(batch_size)
}, NULL, false, 0);

uint64 affected_rows = SPI_processed;

if (ret_code != SPI_OK_DELETE)
{
ereport(ERROR, errmsg("Error expiring response table rows: %s", SPI_result_code_string(ret_code)));
Expand All @@ -168,6 +170,8 @@ void delete_expired_responses(char *ttl, int batch_size){
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();

return affected_rows;
}

static void insert_failure_response(CURL *ez_handle, CURLcode return_code, int64 id, int32 timeout_milliseconds){
Expand Down Expand Up @@ -233,7 +237,7 @@ static void insert_success_response(CurlData *cdata, long http_status_code, char
CommitTransactionCommand();
}

void consume_request_queue(CURLM *curl_mhandle, int batch_size, MemoryContext curl_memctx){
uint64 consume_request_queue(CURLM *curl_mhandle, int batch_size, MemoryContext curl_memctx){
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
SPI_connect();
Expand All @@ -257,8 +261,9 @@ void consume_request_queue(CURLM *curl_mhandle, int batch_size, MemoryContext cu
if (ret_code != SPI_OK_DELETE_RETURNING)
ereport(ERROR, errmsg("Error getting http request queue: %s", SPI_result_code_string(ret_code)));

uint64 affected_rows = SPI_processed;

for (size_t j = 0; j < SPI_processed; j++) {
for (size_t j = 0; j < affected_rows; j++) {
bool tupIsNull = false;

int64 id = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 1, &tupIsNull));
Expand Down Expand Up @@ -289,6 +294,8 @@ void consume_request_queue(CURLM *curl_mhandle, int batch_size, MemoryContext cu
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();

return affected_rows;
}

static void pfree_curl_data(CurlData *cdata){
Expand Down
4 changes: 2 additions & 2 deletions src/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ typedef struct {
CURLM *curl_mhandle;
} LoopState;

void delete_expired_responses(char *ttl, int batch_size);
uint64 delete_expired_responses(char *ttl, int batch_size);

void consume_request_queue(CURLM *curl_mhandle, int batch_size, MemoryContext curl_memctx);
uint64 consume_request_queue(CURLM *curl_mhandle, int batch_size, MemoryContext curl_memctx);

void insert_curl_responses(LoopState *lstate, MemoryContext curl_memctx);

Expand Down
11 changes: 9 additions & 2 deletions src/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,16 @@ void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
break;
}

delete_expired_responses(guc_ttl, guc_batch_size);
uint64 expired_responses = delete_expired_responses(guc_ttl, guc_batch_size);

consume_request_queue(lstate.curl_mhandle, guc_batch_size, CurlMemContext);
elog(DEBUG1, "Deleted %zu expired rows", expired_responses);

uint64 requests_consumed = consume_request_queue(lstate.curl_mhandle, guc_batch_size, CurlMemContext);

elog(DEBUG1, "Consumed %zu request rows", requests_consumed);

if(requests_consumed == 0)
continue;

int running_handles = 0;
int maxevents = guc_batch_size + 1; // 1 extra for the timer
Expand Down