@@ -12,6 +12,7 @@ use std::pin::Pin;
1212use std:: sync:: Arc ;
1313use std:: time:: Duration ;
1414
15+ use crate :: core:: cache:: Cache ;
1516use crate :: core:: context:: Context ;
1617use crate :: core:: event:: prelude:: * ;
1718use crate :: core:: event:: resolve_event;
@@ -98,6 +99,7 @@ impl Business {
9899 reconnecting : Mutex :: new ( ( ) ) ,
99100 pending_requests : DashMap :: new ( ) ,
100101 context,
102+ cache : Arc :: new ( Cache :: new ( ) ) ,
101103 } ) ;
102104
103105 Ok ( Self {
@@ -120,7 +122,7 @@ impl Business {
120122 let handle = self . handle . clone ( ) ;
121123 tokio:: spawn ( async move {
122124 if let Err ( e) = handle. dispatch_sso_packet ( packet) . await {
123- tracing:: error!( "Error handling packet: {}" , e) ;
125+ tracing:: error!( "Unhandled error occurred when handling packet: {}" , e) ;
124126 // FIXME: Non-required reconnections, except for serious errors
125127 // self.reconnect().await;
126128 }
@@ -168,8 +170,9 @@ impl Business {
168170pub struct BusinessHandle {
169171 sender : ArcSwap < PacketSender > ,
170172 reconnecting : Mutex < ( ) > ,
171- pending_requests : DashMap < u32 , oneshot:: Sender < Box < dyn ServerEvent > > > ,
173+ pending_requests : DashMap < u32 , oneshot:: Sender < Result < Box < dyn ServerEvent > > > > ,
172174 pub ( crate ) context : Arc < Context > ,
175+ pub ( crate ) cache : Arc < Cache > ,
173176 // TODO: (outer) event dispatcher, highway
174177}
175178
@@ -197,18 +200,21 @@ impl BusinessHandle {
197200 packet : SsoPacket ,
198201 ) -> std:: result:: Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
199202 let sequence = packet. sequence ( ) ;
200- let mut event = resolve_event ( packet, & self . context ) . await ?;
203+ let result: Result < Box < dyn ServerEvent > > = async {
204+ // TODO: refactor error type?
205+ let mut event = resolve_event ( packet, & self . context )
206+ . await
207+ . map_err ( |e| std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , e. to_string ( ) ) ) ?;
208+ dispatch_logic ( & mut * event, self . clone ( ) , LogicFlow :: InComing ) . await ;
209+ Ok ( event)
210+ }
211+ . await ;
201212 // Lagrange.Core.Internal.Context.BusinessContext.HandleIncomingEvent
202- // 在 send_event 中的 handle incoming event 合并到这里来 (aka dispatch_logic)
203- // GroupSysDecreaseEvent, ... -> Lagrange.Core.Internal.Context.Logic.Implementation.CachingLogic.Incoming
204- // KickNTEvent -> Lagrange.Core.Internal.Context.Logic.Implementation.WtExchangeLogic.Incoming
205- // PushMessageEvent, ... -> Lagrange.Core.Internal.Context.Logic.Implementation.MessagingLogic.Incoming
206- dispatch_logic ( & mut * event, self . clone ( ) , LogicFlow :: InComing ) . await ;
207213 // TODO: timeout auto remove
208214 if let Some ( ( _, tx) ) = self . pending_requests . remove ( & sequence) {
209- tx. send ( event ) . unwrap ( ) ;
215+ tx. send ( result ) . unwrap ( ) ;
210216 } else {
211- tracing:: warn!( "unhandled packet: {:?}" , event ) ;
217+ tracing:: warn!( "unhandled packet: {:?}" , result ) ;
212218 }
213219 Ok ( ( ) )
214220 }
@@ -245,10 +251,9 @@ impl BusinessHandle {
245251 async fn send_packet ( & self , packet : SsoPacket ) -> Result < Box < dyn ServerEvent > > {
246252 tracing:: debug!( "sending packet: {:?}" , packet) ;
247253 let sequence = packet. sequence ( ) ;
248- let ( tx, rx) = oneshot:: channel ( ) ;
254+ let ( tx, rx) = oneshot:: channel :: < Result < Box < dyn ServerEvent > > > ( ) ;
249255 self . pending_requests . insert ( sequence, tx) ;
250256 self . post_packet ( packet) . await ?;
251- let events = rx. await . expect ( "response not received" ) ;
252- Ok ( events)
257+ rx. await . expect ( "response not received" )
253258 }
254259}
0 commit comments