Skip to content

Commit 3ed0f84

Browse files
committed
feat(sdk): stub ipc
1 parent 588e681 commit 3ed0f84

File tree

1 file changed

+104
-0
lines changed

1 file changed

+104
-0
lines changed

pkg/stub/ipc.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package stub
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net"
8+
"net/http"
9+
"sync"
10+
11+
"github.com/containerd/nri/pkg/net/multiplex"
12+
)
13+
14+
type ipcImpl struct {
15+
mConn net.Conn
16+
doneC chan struct{}
17+
serverErr error
18+
teardown func() error
19+
}
20+
21+
type Ipc interface {
22+
conn() net.Conn
23+
wait(ctx context.Context) error
24+
close() error
25+
}
26+
27+
func NewIPC(sockConn net.Conn, handler http.Handler) (result Ipc, retErr error) {
28+
mux := multiplex.Multiplex(sockConn)
29+
defer func() {
30+
if retErr != nil {
31+
mux.Close()
32+
}
33+
}()
34+
doneC := make(chan struct{})
35+
listener, err := mux.Listen(multiplex.PluginServiceConn)
36+
if err != nil {
37+
listener.Close()
38+
return nil, err
39+
}
40+
server := &http.Server{
41+
Handler: handler,
42+
}
43+
var serverErr error
44+
go func() {
45+
err := server.Serve(listener)
46+
if !errors.Is(err, http.ErrServerClosed) {
47+
serverErr = err
48+
}
49+
close(doneC)
50+
}()
51+
defer func() {
52+
if retErr != nil {
53+
server.Close()
54+
}
55+
}()
56+
conn, err := mux.Open(multiplex.RuntimeServiceConn)
57+
if err != nil {
58+
return nil, fmt.Errorf("failed to multiplex grcp client connection: %w", err)
59+
}
60+
defer func() {
61+
if retErr != nil {
62+
_ = conn.Close()
63+
}
64+
}()
65+
return &ipcImpl{
66+
mConn: conn,
67+
doneC: doneC,
68+
serverErr: serverErr,
69+
teardown: sync.OnceValue(func() error {
70+
var errList []error
71+
if err := listener.Close(); err != nil {
72+
errList = append(errList, err)
73+
}
74+
if err := server.Close(); err != nil {
75+
errList = append(errList, err)
76+
}
77+
if err := conn.Close(); err != nil {
78+
errList = append(errList, err)
79+
}
80+
if err := mux.Close(); err != nil {
81+
errList = append(errList, err)
82+
}
83+
<-doneC
84+
return errors.Join(errList...)
85+
}),
86+
}, nil
87+
}
88+
89+
func (i *ipcImpl) conn() net.Conn {
90+
return i.mConn
91+
}
92+
93+
func (i *ipcImpl) wait(ctx context.Context) error {
94+
select {
95+
case <-i.doneC:
96+
return i.serverErr
97+
case <-ctx.Done():
98+
return nil
99+
}
100+
}
101+
102+
func (i *ipcImpl) close() error {
103+
return i.teardown()
104+
}

0 commit comments

Comments
 (0)