77// NOLINTNEXTLINE(misc-header-include-cycle)
88#include <components-rs/ddtrace.h>
99#include <php.h>
10+ #include <stdatomic.h>
1011#include <stdbool.h>
12+ #include <sys/mman.h>
1113
1214#define HELPER_PROCESS_C_INCLUDES
1315#include "compatibility.h"
2123#include "php_objects.h"
2224#include "version.h"
2325
26+ #define MAX_WAIT_TIME_MS (1ULL << 55)
27+ typedef struct _dd_helper_shared_state {
28+ uint8_t failed_count : 7 ;
29+ bool try_in_progress : 1 ; // only for log messages
30+ uint64_t suppressed_until_ms : 56 ;
31+ } dd_helper_shared_state ;
32+ #define MAX_FAILED_COUNT ((uint8_t)((1U << 7) - 1))
33+ #define MAX_SUPPRESSION_TIME_MS ((1ULL << 12) * 1000ULL) // a little over 1 hour
34+ _Static_assert (sizeof (dd_helper_shared_state ) == sizeof (uint64_t ),
35+ "dd_helper_shared_state should be 8 bytes" );
36+
2437typedef struct _dd_helper_mgr {
2538 dd_conn conn ;
2639
27- struct timespec next_retry ;
28- uint16_t failed_count ;
2940 bool connected_this_req ;
41+ dd_helper_shared_state hss ;
3042
31- pid_t pid ;
3243 char * nonnull socket_path ;
3344 char * nonnull lock_path ;
3445} dd_helper_mgr ;
3546
47+ static _Atomic (dd_helper_shared_state ) * _shared_state ;
48+
3649static THREAD_LOCAL_ON_ZTS dd_helper_mgr _mgr ;
3750
3851static const double _backoff_initial = 3.0 ;
@@ -41,8 +54,8 @@ static const double _backoff_base = 2.0;
4154static const double _backoff_max_exponent = 10.0 ;
4255
4356static const int timeout_send = 500 ;
44- static const int timeout_recv_initial = 7500 ;
45- static const int timeout_recv_subseq = 2000 ;
57+ static const int timeout_recv_initial = 1250 ;
58+ static const int timeout_recv_subseq = 750 ;
4659
4760#define DD_PATH_FORMAT "%s%sddappsec_" PHP_DDAPPSEC_VERSION "_%u"
4861#define DD_SOCK_PATH_FORMAT DD_PATH_FORMAT ".sock"
@@ -53,12 +66,22 @@ static void _register_testing_objects(void);
5366#endif
5467
5568static void _read_settings (void );
56- static bool _wait_for_next_retry (void );
57- static void _inc_failed_counter (void );
58- static void _reset_retry_state (void );
69+ static bool _skip_connecting (dd_helper_shared_state * nonnull s );
70+ static bool _try_lock_shared_state (dd_helper_shared_state * nonnull s );
71+ static void _inc_failed_counter (dd_helper_shared_state * nonnull s );
72+ static void _release_shared_state_lock (dd_helper_shared_state * nonnull s );
73+ static void _maybe_reset_failed_counter (void );
5974
6075void dd_helper_startup (void )
6176{
77+ _shared_state = mmap (NULL , sizeof (dd_helper_shared_state ),
78+ PROT_READ | PROT_WRITE , MAP_SHARED | MAP_ANONYMOUS , -1 , 0 );
79+
80+ if (_shared_state == MAP_FAILED ) {
81+ _shared_state = NULL ;
82+ mlog_err (dd_log_error , "Failed to mmap shared state" );
83+ }
84+
6285#ifdef TESTING
6386 _register_testing_objects ();
6487#endif
@@ -70,9 +93,18 @@ void dd_helper_gshutdown(void)
7093{
7194 pefree (_mgr .socket_path , 1 );
7295 pefree (_mgr .lock_path , 1 );
96+ if (_shared_state ) {
97+ munmap (_shared_state , sizeof (dd_helper_shared_state ));
98+ }
7399}
74100
75- void dd_helper_rshutdown (void ) { _mgr .connected_this_req = false; }
101+ void dd_helper_rshutdown (void )
102+ {
103+ _maybe_reset_failed_counter ();
104+
105+ _mgr .connected_this_req = false;
106+ _mgr .hss = (typeof (_mgr .hss )){0 };
107+ }
76108
77109dd_conn * nullable dd_helper_mgr_acquire_conn (
78110 client_init_func nonnull init_func , void * unspecnull ctx )
@@ -81,12 +113,17 @@ dd_conn *nullable dd_helper_mgr_acquire_conn(
81113 if (dd_conn_connected (conn )) {
82114 return conn ;
83115 }
84- if (_wait_for_next_retry ()) {
116+
117+ if (_skip_connecting (& _mgr .hss )) {
85118 return NULL ;
86119 }
87120
88121 _read_settings ();
89122
123+ if (!_try_lock_shared_state (& _mgr .hss )) {
124+ return NULL ;
125+ }
126+
90127 int res = dd_conn_init (conn , _mgr .socket_path , strlen (_mgr .socket_path ));
91128
92129 if (res ) {
@@ -113,12 +150,12 @@ dd_conn *nullable dd_helper_mgr_acquire_conn(
113150 mlog (dd_log_debug , "returning fresh connection" );
114151
115152 _mgr .connected_this_req = true;
116- _reset_retry_state ( );
153+ _release_shared_state_lock ( & _mgr . hss );
117154
118155 return conn ;
119156
120157error :
121- _inc_failed_counter ();
158+ _inc_failed_counter (& _mgr . hss );
122159 return NULL ;
123160}
124161
@@ -227,63 +264,203 @@ void dd_helper_close_conn(void)
227264
228265 /* we treat closing the connection on the request it was opened a failure
229266 * for the purposes of the connection backoff */
230- if (_mgr .connected_this_req ) {
267+ if (_mgr .connected_this_req && _shared_state ) {
231268 mlog (dd_log_debug , "Connection was closed on the same request as it "
232269 "opened. Incrementing backoff counter" );
233- _inc_failed_counter ();
270+
271+ _inc_failed_counter (& _mgr .hss );
234272 }
235273}
236274
237- // returns true if an attempt to connectt should not be made yet
238- static bool _wait_for_next_retry (void )
275+ static uint64_t _gettime_56bit_ms (void )
239276{
240- if (!_mgr .next_retry .tv_sec ) {
241- return false;
242- }
243-
244277 struct timespec cur_time ;
245278 if (clock_gettime (CLOCK_MONOTONIC , & cur_time ) == -1 ) {
246279 mlog_err (dd_log_warning , "Call to clock_gettime() failed" );
280+ return 0 ;
281+ }
282+
283+ #define NS_PER_MS 1000000ULL
284+ #define MS_PER_SEC 1000ULL
285+ #define MASK_56_BITS ((1ULL << 56) - 1)
286+
287+ uint64_t ms_in_sec = (uint64_t )cur_time .tv_nsec / NS_PER_MS ;
288+ uint64_t total_ms = ((uint64_t )cur_time .tv_sec * MS_PER_SEC ) + ms_in_sec ;
289+
290+ return total_ms & MASK_56_BITS ;
291+ }
292+
293+ // returns true if an attempt to connect should not be made yet
294+ static bool _skip_connecting (dd_helper_shared_state * nonnull s )
295+ {
296+ if (!_shared_state ) {
297+ return false;
298+ }
299+
300+ * s = atomic_load_explicit (_shared_state , memory_order_relaxed );
301+
302+ if (!s -> suppressed_until_ms ) {
247303 return false;
248304 }
249- if (cur_time .tv_sec < _mgr .next_retry .tv_sec ||
250- (cur_time .tv_sec == _mgr .next_retry .tv_sec &&
251- cur_time .tv_nsec < _mgr .next_retry .tv_nsec )) {
252- mlog (dd_log_debug , "Next connect retry is not due yet" );
305+
306+ uint64_t cur_time = _gettime_56bit_ms ();
307+ if (cur_time == 0 ) {
308+ return false;
309+ }
310+
311+ uint64_t time_delta_ms = (s -> suppressed_until_ms - cur_time ) & MASK_56_BITS ;
312+ // if cur_time > suppressed_until, then the suppression has expired
313+ // and the value wraps around, the condition becoming false
314+ if (time_delta_ms < MAX_SUPPRESSION_TIME_MS ) {
315+ if (s -> try_in_progress ) {
316+ mlog (dd_log_debug , "A connection attempt after a failure "
317+ "is already in progress in another PHP worker" );
318+ } else {
319+ mlog (dd_log_debug , "Next connect retry is not due yet" );
320+ }
253321 return true;
254322 }
255323
256324 mlog (dd_log_debug , "Backoff time existed, but has expired" );
257325 return false;
258326}
259327
260- static void _inc_failed_counter ( void )
328+ static bool _try_lock_shared_state ( dd_helper_shared_state * nonnull s )
261329{
262- if (_mgr .failed_count != UINT16_MAX ) {
263- _mgr .failed_count ++ ;
330+ // with no failures, every process should try to connect
331+ if (s -> failed_count == 0 ) {
332+ return true;
264333 }
265- mlog (dd_log_debug , "Failed counter is now at %u" , _mgr .failed_count );
266334
267- struct timespec cur_time ;
268- int res = clock_gettime (CLOCK_MONOTONIC , & cur_time );
269- if (res == -1 ) {
270- mlog_err (dd_log_warning , "Call to clock_gettime() failed" );
271- _mgr .next_retry = (struct timespec ){0 };
335+ // lock for up to 3 seconds
336+ // we don't use try_in_progress to lock in case this process is killed
337+ // or otherwise dies
338+ #define MAX_LOCK_TIME_MS 3000
339+
340+ uint64_t cur_time_ms = _gettime_56bit_ms ();
341+ uint64_t lock_time = cur_time_ms + MAX_LOCK_TIME_MS ;
342+
343+ dd_helper_shared_state desired_state = {
344+ .suppressed_until_ms = lock_time ,
345+ .try_in_progress = true,
346+ .failed_count = s -> failed_count ,
347+ };
348+
349+ while (!atomic_compare_exchange_strong_explicit (_shared_state , s ,
350+ desired_state , memory_order_relaxed , memory_order_relaxed )) {
351+ uint64_t time_delta_ms =
352+ (s -> suppressed_until_ms - cur_time_ms ) & MASK_56_BITS ;
353+ if (time_delta_ms < MAX_SUPPRESSION_TIME_MS ) {
354+ mlog (dd_log_debug , "Connecting was suppressed in the meantime" );
355+ return false;
356+ }
357+ if (s -> failed_count == 0 ) {
358+ return true;
359+ }
360+
361+ desired_state .failed_count = s -> failed_count ;
362+ // in theory, we could update suppressed_until_ms, but the 3 seconds
363+ // give enough margin for this to loop a few times and still fully
364+ // make our connection attempt
365+ }
366+
367+ * s = desired_state ;
368+ return true;
369+ }
370+
371+ static void _inc_failed_counter (dd_helper_shared_state * nonnull s )
372+ {
373+ if (!_shared_state ) {
272374 return ;
273375 }
274376
275- double wait =
377+ unsigned new_failed_count = s -> failed_count < MAX_FAILED_COUNT
378+ ? s -> failed_count + 1U
379+ : MAX_FAILED_COUNT ;
380+
381+ double wait_s =
276382 _backoff_initial *
277- pow (_backoff_base , MIN ((_mgr .failed_count - 1 ), _backoff_max_exponent ));
383+ pow (_backoff_base , MIN ((new_failed_count - 1 ), _backoff_max_exponent ));
384+
385+ mlog (dd_log_debug ,
386+ "Failed counter is to be set to %u and wait for %f seconds" ,
387+ new_failed_count , wait_s );
388+
389+ uint64_t new_suppressed_until_ms =
390+ // NOLINTNEXTLINE(cppcoreguidelines-avoid-magic-numbers,readability-magic-numbers)
391+ _gettime_56bit_ms () + (uint64_t )(wait_s * 1000ULL );
392+
393+ dd_helper_shared_state new_state = {
394+ .try_in_progress = false,
395+ .failed_count = new_failed_count ,
396+ .suppressed_until_ms = new_suppressed_until_ms ,
397+ };
398+
399+ // we can call this function without holding the 3 s lock:
400+ // * if failed count is 0
401+ // * or if only failed mid-request on the connecting request
402+
403+ // The interim write might've been a failure, in which case our write would
404+ // likely be duplicative (we'd have to check the suppression time),
405+ // it might've been a sucess (failed_count = 0), in which case we could
406+ // register a new failure with failed_count == 1, or it could be a simple
407+ // write for a lock, in which case we arguably should let the new connecting
408+ // process register success/failure.
409+ // But let's keep it simple, and just give up on our update in all cases
410+ if (!atomic_compare_exchange_strong_explicit (_shared_state , s , new_state ,
411+ memory_order_relaxed , memory_order_relaxed )) {
412+ mlog (dd_log_debug , "Failed to update shared state: concurrent update" );
413+ } else {
414+ mlog (dd_log_debug , "Successfully updated failed counter/wait time" );
415+ }
416+ }
278417
279- _mgr .next_retry = cur_time ;
280- _mgr .next_retry .tv_sec += (time_t )wait ;
418+ static void _release_shared_state_lock (dd_helper_shared_state * nonnull s )
419+ {
420+ if (!_shared_state ) {
421+ return ;
422+ }
423+
424+ // if failed is 0, we did not lock, so there is nothing to reset
425+ if (s -> failed_count == 0 ) {
426+ return ;
427+ }
428+
429+ * s = (dd_helper_shared_state ){
430+ // save the failed count, because we may still fail during this
431+ // request, which will count as a connection failure
432+ // This request may be very long though.
433+ // So we have a compromise: in this interim where we connected but
434+ // things may still fail during the request, we give up the lock;
435+ // still, because failed_count is not 0, only one process may attempt to
436+ // connect at a time.
437+ .failed_count = s -> failed_count ,
438+ };
439+ // we hold exclusivity for up to 3 seconds; write unconditionally
440+ atomic_store_explicit (_shared_state , * s , memory_order_relaxed );
441+ mlog (dd_log_debug , "Released connection lock; other processes can connect "
442+ "(though no more than one at once)" );
281443}
282444
283- static void _reset_retry_state (void )
445+ static void _maybe_reset_failed_counter (void )
284446{
285- _mgr .failed_count = 0 ;
286- _mgr .next_retry = (struct timespec ){0 };
447+ if (_shared_state && _mgr .connected_this_req && _mgr .hss .failed_count > 0 &&
448+ dd_conn_connected (& _mgr .conn )) {
449+ // we can reset the failed counter because we had a full request
450+ // processed successfully
451+ dd_helper_shared_state new_state = {
452+ .failed_count = 0 ,
453+ .suppressed_until_ms = 0 ,
454+ .try_in_progress = false,
455+ };
456+
457+ bool res = atomic_compare_exchange_strong_explicit (_shared_state ,
458+ & _mgr .hss , new_state , memory_order_relaxed , memory_order_relaxed );
459+ if (!res ) {
460+ mlog (
461+ dd_log_debug , "Failed to reset retry state: concurrent update" );
462+ }
463+ }
287464}
288465
289466#ifdef TESTING
@@ -323,11 +500,13 @@ static PHP_FUNCTION(datadog_appsec_testing_backoff_status)
323500
324501 array_init_size (return_value , 2 );
325502
503+ dd_helper_shared_state s =
504+ atomic_load_explicit (_shared_state , memory_order_relaxed );
505+
326506 add_assoc_long_ex (
327- return_value , ZEND_STRL ("failed_count" ), (zend_long )_mgr .failed_count );
328- add_assoc_double_ex (return_value , ZEND_STRL ("next_retry" ),
329- (double )_mgr .next_retry .tv_sec +
330- (double )_mgr .next_retry .tv_nsec / TEN_E9_D );
507+ return_value , ZEND_STRL ("failed_count" ), (zend_long )s .failed_count );
508+ add_assoc_double_ex (
509+ return_value , ZEND_STRL ("next_retry" ), (double )s .suppressed_until_ms );
331510}
332511
333512// clang-format off
0 commit comments