@@ -912,83 +912,14 @@ mod endpoint_tests {
912912 #[ tokio:: test]
913913 #[ cfg( feature = "experimental-concurrency" ) ]
914914 async fn test_concurrent_structured_logging_isolation ( ) -> Result < ( ) , Error > {
915- use std:: {
916- collections:: { HashMap , HashSet } ,
917- sync:: Mutex ,
918- } ;
919- use tracing:: { info, subscriber:: set_global_default} ;
920- use tracing_subscriber:: { layer:: SubscriberExt , Layer } ;
921-
922- #[ derive( Clone ) ]
923- struct LogCapture {
924- logs : Arc < Mutex < Vec < HashMap < String , String > > > > ,
925- span_fields : Arc < Mutex < HashMap < tracing:: Id , HashMap < String , String > > > > ,
926- }
927-
928- impl LogCapture {
929- fn new ( ) -> Self {
930- Self {
931- logs : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
932- span_fields : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
933- }
934- }
935- }
936-
937- impl < S > Layer < S > for LogCapture
938- where
939- S : tracing:: Subscriber + for < ' lookup > tracing_subscriber:: registry:: LookupSpan < ' lookup > ,
940- {
941- fn on_new_span (
942- & self ,
943- attrs : & tracing:: span:: Attributes < ' _ > ,
944- id : & tracing:: Id ,
945- _ctx : tracing_subscriber:: layer:: Context < ' _ , S > ,
946- ) {
947- if attrs. metadata ( ) . name ( ) == "Lambda runtime invoke" {
948- let mut fields = HashMap :: new ( ) ;
949- struct FieldVisitor < ' a > ( & ' a mut HashMap < String , String > ) ;
950- impl < ' a > tracing:: field:: Visit for FieldVisitor < ' a > {
951- fn record_debug ( & mut self , field : & tracing:: field:: Field , value : & dyn std:: fmt:: Debug ) {
952- self . 0 . insert (
953- field. name ( ) . to_string ( ) ,
954- format ! ( "{:?}" , value) . trim_matches ( '"' ) . to_string ( ) ,
955- ) ;
956- }
957- }
958- attrs. record ( & mut FieldVisitor ( & mut fields) ) ;
959- self . span_fields . lock ( ) . unwrap ( ) . insert ( id. clone ( ) , fields) ;
960- }
961- }
915+ use std:: collections:: HashSet ;
916+ use tracing:: info;
917+ use tracing_capture:: { CaptureLayer , SharedStorage } ;
918+ use tracing_subscriber:: layer:: SubscriberExt ;
962919
963- fn on_event ( & self , event : & tracing:: Event < ' _ > , ctx : tracing_subscriber:: layer:: Context < ' _ , S > ) {
964- let mut fields = HashMap :: new ( ) ;
965- struct FieldVisitor < ' a > ( & ' a mut HashMap < String , String > ) ;
966- impl < ' a > tracing:: field:: Visit for FieldVisitor < ' a > {
967- fn record_debug ( & mut self , field : & tracing:: field:: Field , value : & dyn std:: fmt:: Debug ) {
968- self . 0 . insert (
969- field. name ( ) . to_string ( ) ,
970- format ! ( "{:?}" , value) . trim_matches ( '"' ) . to_string ( ) ,
971- ) ;
972- }
973- }
974- event. record ( & mut FieldVisitor ( & mut fields) ) ;
975-
976- // Add span requestId if we're in a Lambda runtime invoke span
977- if let Some ( span) = ctx. lookup_current ( ) {
978- if let Some ( span_fields) = self . span_fields . lock ( ) . unwrap ( ) . get ( & span. id ( ) ) {
979- if let Some ( request_id) = span_fields. get ( "requestId" ) {
980- fields. insert ( "span_request_id" . to_string ( ) , request_id. clone ( ) ) ;
981- }
982- }
983- }
984-
985- self . logs . lock ( ) . unwrap ( ) . push ( fields) ;
986- }
987- }
988-
989- let log_capture = LogCapture :: new ( ) ;
990- let subscriber = tracing_subscriber:: registry ( ) . with ( log_capture. clone ( ) ) ;
991- set_global_default ( subscriber) . unwrap ( ) ;
920+ let storage = SharedStorage :: default ( ) ;
921+ let subscriber = tracing_subscriber:: registry ( ) . with ( CaptureLayer :: new ( & storage) ) ;
922+ tracing:: subscriber:: set_global_default ( subscriber) . unwrap ( ) ;
992923
993924 let request_count = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
994925 let listener = TcpListener :: bind ( "127.0.0.1:0" ) . await ?;
@@ -1104,27 +1035,37 @@ mod endpoint_tests {
11041035 runtime_handle. abort ( ) ;
11051036 server_handle. abort ( ) ;
11061037
1107- let logs = log_capture. logs . lock ( ) . unwrap ( ) ;
1108- let relevant_logs: Vec < _ > = logs. iter ( ) . filter ( |l| l. contains_key ( "observed_request_id" ) ) . collect ( ) ;
1038+ let storage = storage. lock ( ) ;
1039+ let events: Vec < _ > = storage
1040+ . all_events ( )
1041+ . filter ( |e| e. value ( "observed_request_id" ) . is_some ( ) )
1042+ . collect ( ) ;
11091043
11101044 assert ! (
1111- relevant_logs . len( ) >= 300 ,
1045+ events . len( ) >= 300 ,
11121046 "Should have at least 300 log entries, got {}" ,
1113- relevant_logs . len( )
1047+ events . len( )
11141048 ) ;
11151049
11161050 let mut seen_ids = HashSet :: new ( ) ;
1117- for log in & relevant_logs {
1118- let observed_id = log. get ( "observed_request_id" ) . unwrap ( ) ;
1119- let span_request_id = log. get ( "span_request_id" ) . unwrap ( ) ;
1051+ for event in & events {
1052+ let observed_id = event[ "observed_request_id" ] . as_str ( ) . unwrap ( ) ;
1053+
1054+ // Find the parent "Lambda runtime invoke" span and get its requestId
1055+ let span_request_id = event
1056+ . ancestors ( )
1057+ . find ( |s| s. metadata ( ) . name ( ) == "Lambda runtime invoke" )
1058+ . and_then ( |s| s. value ( "requestId" ) )
1059+ . and_then ( |v| v. as_str ( ) )
1060+ . expect ( "Event should have a Lambda runtime invoke ancestor with requestId" ) ;
11201061
11211062 assert ! (
11221063 observed_id. starts_with( "test-request-" ) ,
11231064 "Request ID should match pattern: {}" ,
11241065 observed_id
11251066 ) ;
11261067 assert ! (
1127- seen_ids. insert( observed_id. clone ( ) ) ,
1068+ seen_ids. insert( observed_id. to_string ( ) ) ,
11281069 "Request ID should be unique: {}" ,
11291070 observed_id
11301071 ) ;
@@ -1137,10 +1078,6 @@ mod endpoint_tests {
11371078 ) ;
11381079 }
11391080
1140- println ! (
1141- "✅ Concurrent structured logging test passed with {} unique request IDs" ,
1142- seen_ids. len( )
1143- ) ;
11441081 Ok ( ( ) )
11451082 }
11461083}
0 commit comments