Skip to content

Commit 2188718

Browse files
authored
Merge pull request #170 from Zdanquxunhuan/feat/offlineMsg
2 parents 80d9e53 + b71e776 commit 2188718

File tree

69 files changed

+1909
-79
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+1909
-79
lines changed

README.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,7 @@ docker run --rm --name redis -d -p 6379:6379 redis:7.4.0
125125
```shell
126126
git clone https://github.com/crossoverJie/cim.git
127127
cd cim
128-
mvn clean package -DskipTests=true
129-
cd cim-common
130-
mvn install -DskipTests=true
128+
mvn clean install -DskipTests=true
131129
cd cim-server && cim-client && cim-forward-route
132130
mvn clean package spring-boot:repackage -DskipTests=true
133131
```
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.crossoverjie.cim.client.sdk;
2+
3+
import com.crossoverjie.cim.client.sdk.impl.ClientConfigurationData;
4+
import com.crossoverjie.cim.common.data.construct.RingBufferWheel;
5+
6+
public class FetchOfflineMsgJob extends RingBufferWheel.Task{
7+
private static final int INITIAL_DELAY_SECONDS = 5;
8+
9+
private RouteManager routeManager;
10+
private ClientConfigurationData conf;
11+
12+
public FetchOfflineMsgJob(RouteManager routeManager, ClientConfigurationData conf) {
13+
this.routeManager = routeManager;
14+
this.conf = conf;
15+
setKey(INITIAL_DELAY_SECONDS); //It will be sent with a 5-second delay
16+
}
17+
18+
@Override
19+
public void run() {
20+
routeManager.fetchOfflineMsgs(conf.getAuth().getUserId());
21+
}
22+
}

cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/RouteManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.crossoverjie.cim.route.api.RouteApi;
1111
import com.crossoverjie.cim.route.api.vo.req.ChatReqVO;
1212
import com.crossoverjie.cim.route.api.vo.req.LoginReqVO;
13+
import com.crossoverjie.cim.route.api.vo.req.OfflineMsgReqVO;
1314
import com.crossoverjie.cim.route.api.vo.req.P2PReqVO;
1415
import com.crossoverjie.cim.route.api.vo.res.CIMServerResVO;
1516
import java.util.Optional;
@@ -78,4 +79,9 @@ public Set<CIMUserInfo> onlineUser() throws Exception {
7879
BaseResponse<Set<CIMUserInfo>> onlineUsersResVO = routeApi.onlineUser();
7980
return onlineUsersResVO.getDataBody();
8081
}
82+
83+
public void fetchOfflineMsgs(Long userId){
84+
OfflineMsgReqVO offlineMsgReqVO = OfflineMsgReqVO.builder().receiveUserId(userId).build();
85+
routeApi.fetchOfflineMsgs(offlineMsgReqVO);
86+
}
8187
}

cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientImpl.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package com.crossoverjie.cim.client.sdk.impl;
22

33
import static com.crossoverjie.cim.common.enums.StatusEnum.RECONNECT_FAIL;
4+
45
import com.crossoverjie.cim.client.sdk.Client;
56
import com.crossoverjie.cim.client.sdk.ClientState;
7+
import com.crossoverjie.cim.client.sdk.FetchOfflineMsgJob;
68
import com.crossoverjie.cim.client.sdk.ReConnectManager;
79
import com.crossoverjie.cim.client.sdk.RouteManager;
810
import com.crossoverjie.cim.client.sdk.io.CIMClientHandleInitializer;
11+
import com.crossoverjie.cim.common.data.construct.RingBufferWheel;
912
import com.crossoverjie.cim.common.exception.CIMException;
1013
import com.crossoverjie.cim.common.kit.HeartBeatHandler;
1114
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
@@ -28,13 +31,7 @@
2831
import java.util.Map;
2932
import java.util.Optional;
3033
import java.util.Set;
31-
import java.util.concurrent.BlockingQueue;
32-
import java.util.concurrent.CompletableFuture;
33-
import java.util.concurrent.ConcurrentHashMap;
34-
import java.util.concurrent.LinkedBlockingQueue;
35-
import java.util.concurrent.ThreadFactory;
36-
import java.util.concurrent.ThreadPoolExecutor;
37-
import java.util.concurrent.TimeUnit;
34+
import java.util.concurrent.*;
3835
import java.util.function.Consumer;
3936
import lombok.Getter;
4037
import lombok.extern.slf4j.Slf4j;
@@ -65,6 +62,8 @@ public class ClientImpl extends ClientState implements Client {
6562
@Getter
6663
private final Request heartBeatPacket;
6764

65+
private RingBufferWheel ringBufferWheel;
66+
6867
// Client connected server info
6968
private CIMServerResVO serverInfo;
7069

@@ -93,6 +92,16 @@ public ClientImpl(ClientConfigurationData conf) {
9392
clientMap.put(conf.getAuth().getUserId(), this);
9493

9594
connectServer(v -> this.conf.getEvent().info("Login success!"));
95+
96+
postConnectionSetup();
97+
}
98+
99+
/**
100+
* 1. Pull offline messages from the server
101+
*/
102+
private void postConnectionSetup() {
103+
ringBufferWheel = new RingBufferWheel(Executors.newFixedThreadPool(1));
104+
ringBufferWheel.addTask(new FetchOfflineMsgJob(routeManager, conf));
96105
}
97106

98107
private void connectServer(Consumer<Void> success) {
@@ -226,6 +235,7 @@ public void close() {
226235
super.setState(ClientState.State.Closed);
227236
this.routeManager.offLine(this.getAuth().getUserId());
228237
this.clientMap.remove(this.getAuth().getUserId());
238+
ringBufferWheel.stop(true);
229239
}
230240

231241
@Override

cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/CIMClientHandle.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ protected void channelRead0(ChannelHandlerContext ctx, Response msg) {
6767
if (msg.getCmd() != BaseCommand.PING) {
6868
String receiveUserId = msg.getPropertiesMap().get(Constants.MetaKey.RECEIVE_USER_ID);
6969
ClientImpl client = ClientImpl.getClientMap().get(Long.valueOf(receiveUserId));
70+
if (client == null) {
71+
log.error("client not found for userId: {}", receiveUserId);
72+
return;
73+
}
7074
// callback
7175
client.getConf().getCallbackThreadPool().execute(() -> {
7276
log.info("client address: {} :{}", ctx.channel().remoteAddress(), client);

cim-client-sdk/src/test/java/com/crossoverjie/cim/client/sdk/ClientTest.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.util.Set;
1616
import java.util.concurrent.TimeUnit;
1717
import java.util.concurrent.atomic.AtomicReference;
18+
19+
import com.crossoverjie.cim.route.constant.Constant;
1820
import lombok.Cleanup;
1921
import lombok.extern.slf4j.Slf4j;
2022
import org.awaitility.Awaitility;
@@ -34,7 +36,7 @@ public void tearDown() {
3436
@Test
3537
public void groupChat() throws Exception {
3638
super.starSingleServer();
37-
super.startRoute();
39+
super.startRoute(Constant.OfflineStoreMode.REDIS);
3840
String routeUrl = "http://localhost:8083";
3941
String cj = "crossoverJie";
4042
String zs = "zs";
@@ -102,12 +104,14 @@ public void groupChat() throws Exception {
102104
Awaitility.await().untilAsserted(
103105
() -> Assertions.assertEquals(msg, client2Receive.get()));
104106
super.stopSingle();
107+
client1.close();
108+
client2.close();
105109
}
106110

107111
@Test
108112
public void testP2PChat() throws Exception {
109113
super.starSingleServer();
110-
super.startRoute();
114+
super.startRoute(Constant.OfflineStoreMode.REDIS);
111115
String routeUrl = "http://localhost:8083";
112116
String cj = "cj";
113117
String zs = "zs";
@@ -238,7 +242,7 @@ public void testP2PChat() throws Exception {
238242
@Test
239243
public void testReconnect() throws Exception {
240244
super.startTwoServer();
241-
super.startRoute();
245+
super.startRoute(Constant.OfflineStoreMode.REDIS);
242246

243247
String routeUrl = "http://localhost:8083";
244248
String cj = "cj";
@@ -317,12 +321,14 @@ public void testReconnect() throws Exception {
317321
Awaitility.await()
318322
.untilAsserted(() -> Assertions.assertEquals(msg, client2Receive.get()));
319323
super.stopTwoServer();
324+
client1.close();
325+
client2.close();
320326
}
321327

322328
@Test
323329
public void offLineAndOnline() throws Exception {
324330
super.starSingleServer();
325-
super.startRoute();
331+
super.startRoute(Constant.OfflineStoreMode.REDIS);
326332
String routeUrl = "http://localhost:8083";
327333
String cj = "crossoverJie";
328334
String zs = "zs";
@@ -395,12 +401,14 @@ public void offLineAndOnline() throws Exception {
395401
() -> Assertions.assertEquals(msg, client2Receive.get()));
396402

397403
super.stopSingle();
404+
client1.close();
405+
client2.close();
398406
}
399407

400408
@Test
401409
public void testClose() throws Exception {
402410
super.starSingleServer();
403-
super.startRoute();
411+
super.startRoute(Constant.OfflineStoreMode.REDIS);
404412
String routeUrl = "http://localhost:8083";
405413
String cj = "crossoverJie";
406414
Long id = super.registerAccount(cj);
@@ -430,7 +438,7 @@ public void testClose() throws Exception {
430438
@Test
431439
public void testIncorrectUser() throws Exception {
432440
super.starSingleServer();
433-
super.startRoute();
441+
super.startRoute(Constant.OfflineStoreMode.REDIS);
434442
String routeUrl = "http://localhost:8083";
435443
String cj = "xx";
436444
long id = 100L;

0 commit comments

Comments
 (0)