Skip to content

Commit fed748e

Browse files
committed
feat(sdk): stub ipc
1 parent 588e681 commit fed748e

File tree

1 file changed

+108
-0
lines changed

1 file changed

+108
-0
lines changed

pkg/stub/ipc.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package stub
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net"
8+
"net/http"
9+
"sync"
10+
11+
"github.com/containerd/nri/pkg/net/multiplex"
12+
)
13+
14+
type ipcImpl struct {
15+
mConn net.Conn
16+
doneC chan struct{}
17+
serverErr error
18+
teardown func() error
19+
}
20+
21+
type Ipc interface {
22+
conn() net.Conn
23+
wait(ctx context.Context) error
24+
close() error
25+
}
26+
27+
func NewIPC(sockConn net.Conn, handler http.Handler) (result Ipc, retErr error) {
28+
mux := multiplex.Multiplex(sockConn)
29+
defer func() {
30+
if retErr != nil {
31+
mux.Close()
32+
}
33+
}()
34+
doneC := make(chan struct{})
35+
listener, err := mux.Listen(multiplex.PluginServiceConn)
36+
if err != nil {
37+
return nil, err
38+
}
39+
defer func() {
40+
if retErr != nil {
41+
listener.Close()
42+
}
43+
}()
44+
server := &http.Server{
45+
Handler: handler,
46+
}
47+
var serverErr error
48+
go func() {
49+
err := server.Serve(listener)
50+
if !errors.Is(err, http.ErrServerClosed) {
51+
serverErr = err
52+
}
53+
close(doneC)
54+
}()
55+
defer func() {
56+
if retErr != nil {
57+
server.Close()
58+
}
59+
}()
60+
conn, err := mux.Open(multiplex.RuntimeServiceConn)
61+
if err != nil {
62+
return nil, fmt.Errorf("failed to multiplex grcp client connection: %w", err)
63+
}
64+
defer func() {
65+
if retErr != nil {
66+
_ = conn.Close()
67+
}
68+
}()
69+
return &ipcImpl{
70+
mConn: conn,
71+
doneC: doneC,
72+
serverErr: serverErr,
73+
teardown: sync.OnceValue(func() error {
74+
var errList []error
75+
if err := listener.Close(); err != nil {
76+
errList = append(errList, err)
77+
}
78+
if err := server.Close(); err != nil {
79+
errList = append(errList, err)
80+
}
81+
if err := conn.Close(); err != nil {
82+
errList = append(errList, err)
83+
}
84+
if err := mux.Close(); err != nil {
85+
errList = append(errList, err)
86+
}
87+
<-doneC
88+
return errors.Join(errList...)
89+
}),
90+
}, nil
91+
}
92+
93+
func (i *ipcImpl) conn() net.Conn {
94+
return i.mConn
95+
}
96+
97+
func (i *ipcImpl) wait(ctx context.Context) error {
98+
select {
99+
case <-i.doneC:
100+
return i.serverErr
101+
case <-ctx.Done():
102+
return nil
103+
}
104+
}
105+
106+
func (i *ipcImpl) close() error {
107+
return i.teardown()
108+
}

0 commit comments

Comments
 (0)