Skip to content

Commit 4c16bca

Browse files
committed
feat(sdk): stub ipc
1 parent 588e681 commit 4c16bca

File tree

1 file changed

+94
-0
lines changed

1 file changed

+94
-0
lines changed

pkg/stub/ipc.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package stub
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net"
8+
"net/http"
9+
"sync"
10+
11+
"github.com/containerd/nri/pkg/net/multiplex"
12+
)
13+
14+
type ipcImpl struct {
15+
mConn net.Conn
16+
server *ipcServer
17+
teardown func() error
18+
}
19+
20+
type Ipc interface {
21+
conn() net.Conn
22+
wait(ctx context.Context) error
23+
close() error
24+
}
25+
26+
type ipcServer struct {
27+
done chan struct{}
28+
server *http.Server
29+
err error
30+
}
31+
32+
func newIpcServer(l net.Listener, handler http.Handler) *ipcServer {
33+
result := &ipcServer{
34+
done: make(chan struct{}),
35+
server: &http.Server{
36+
Handler: handler,
37+
},
38+
}
39+
go func() {
40+
err := result.server.Serve(l)
41+
if !errors.Is(err, http.ErrServerClosed) {
42+
result.err = err
43+
}
44+
close(result.done)
45+
}()
46+
return result
47+
}
48+
49+
func NewIPC(sockConn net.Conn, handler http.Handler) (result Ipc, retErr error) {
50+
mux := multiplex.Multiplex(sockConn)
51+
listener, err := mux.Listen(multiplex.PluginServiceConn)
52+
if err != nil {
53+
mux.Close()
54+
return nil, err
55+
}
56+
conn, err := mux.Open(multiplex.RuntimeServiceConn)
57+
if err != nil {
58+
mux.Close()
59+
return nil, fmt.Errorf("failed to multiplex grcp client connection: %w", err)
60+
}
61+
server := newIpcServer(listener, handler)
62+
return &ipcImpl{
63+
mConn: conn,
64+
server: server,
65+
teardown: sync.OnceValue(func() error {
66+
var errList []error
67+
if err := server.server.Close(); err != nil {
68+
errList = append(errList, err)
69+
}
70+
if err := mux.Close(); err != nil {
71+
errList = append(errList, err)
72+
}
73+
<-server.done
74+
return errors.Join(errList...)
75+
}),
76+
}, nil
77+
}
78+
79+
func (i *ipcImpl) conn() net.Conn {
80+
return i.mConn
81+
}
82+
83+
func (i *ipcImpl) wait(ctx context.Context) error {
84+
select {
85+
case <-i.server.done:
86+
return i.server.err
87+
case <-ctx.Done():
88+
return nil
89+
}
90+
}
91+
92+
func (i *ipcImpl) close() error {
93+
return i.teardown()
94+
}

0 commit comments

Comments
 (0)