This repository was archived by the owner on Jul 28, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathzmq4.go
More file actions
63 lines (54 loc) · 1.27 KB
/
zmq4.go
File metadata and controls
63 lines (54 loc) · 1.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package main
import (
"context"
"fmt"
"net/rpc"
"net/rpc/jsonrpc"
"github.com/go-zeromq/zmq4"
)
func NewZmqServer() *ZmqServer {
return &ZmqServer{
end: make(chan struct{}, 1),
socket: zmq4.NewRouter(context.Background(), zmq4.WithID(zmq4.SocketIdentity("router"))),
}
}
type ZmqServer struct {
socket zmq4.Socket
end chan struct{}
}
func (s *ZmqServer) LisenAndServeRPC(endpoint string) (err error) {
err = s.socket.Listen("tcp://" + endpoint)
if err != nil {
return err
}
defer s.socket.Close()
// create rpc Server
arith := new(MyServer)
server := rpc.NewServer()
server.Register(arith)
for {
select {
case <-s.end: // stop the server
return nil
default:
}
msg, err := s.socket.Recv() // this is something like accept but return only the message
if err != nil {
return err
}
if len(msg.Frames) < 3 { // the message should have at least 3 frames: one for socketID,one empty and the rest should be the message
continue
}
go server.ServeCodec(jsonrpc.NewServerCodec(NewZmqMessage(s.socket, msg)))
}
}
func (s *ZmqServer) Close() (err error) {
s.end <- struct{}{}
close(s.end)
return s.socket.Close()
}
func StartZMQServer(port int) {
sport := fmt.Sprintf(":%d", port)
server := NewZmqServer()
server.LisenAndServeRPC(sport)
}