Skip to content

Commit 8fb5e20

Browse files
author
Asim Aslam
authored
Merge pull request #248 from micro/rework
Rework Interfaces
2 parents ccbc1b9 + 0315b44 commit 8fb5e20

21 files changed

+207
-212
lines changed

client/backoff_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@ import (
1010
func TestBackoff(t *testing.T) {
1111
delta := time.Duration(0)
1212

13+
c := NewClient()
14+
1315
for i := 0; i < 5; i++ {
14-
d, err := exponentialBackoff(context.TODO(), NewJsonRequest("test", "test", nil), i)
16+
d, err := exponentialBackoff(context.TODO(), c.NewRequest("test", "test", nil), i)
1517
if err != nil {
1618
t.Fatal(err)
1719
}

client/client.go

Lines changed: 19 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,18 @@ import (
1212
type Client interface {
1313
Init(...Option) error
1414
Options() Options
15-
NewPublication(topic string, msg interface{}) Publication
15+
NewMessage(topic string, msg interface{}) Message
1616
NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
17-
NewProtoRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
18-
NewJsonRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
1917
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
20-
CallRemote(ctx context.Context, addr string, req Request, rsp interface{}, opts ...CallOption) error
21-
Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error)
22-
StreamRemote(ctx context.Context, addr string, req Request, opts ...CallOption) (Streamer, error)
23-
Publish(ctx context.Context, p Publication, opts ...PublishOption) error
18+
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
19+
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
2420
String() string
2521
}
2622

27-
// Publication is the interface for a message published asynchronously
28-
type Publication interface {
23+
// Message is the interface for publishing asynchronously
24+
type Message interface {
2925
Topic() string
30-
Message() interface{}
26+
Payload() interface{}
3127
ContentType() string
3228
}
3329

@@ -41,8 +37,8 @@ type Request interface {
4137
Stream() bool
4238
}
4339

44-
// Streamer is the inteface for a bidirectional synchronous stream
45-
type Streamer interface {
40+
// Stream is the inteface for a bidirectional synchronous stream
41+
type Stream interface {
4642
Context() context.Context
4743
Request() Request
4844
Send(interface{}) error
@@ -85,52 +81,32 @@ func Call(ctx context.Context, request Request, response interface{}, opts ...Ca
8581
return DefaultClient.Call(ctx, request, response, opts...)
8682
}
8783

88-
// Makes a synchronous call to the specified address using the default client
89-
func CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error {
90-
return DefaultClient.CallRemote(ctx, address, request, response, opts...)
91-
}
92-
93-
// Creates a streaming connection with a service and returns responses on the
94-
// channel passed in. It's up to the user to close the streamer.
95-
func Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) {
96-
return DefaultClient.Stream(ctx, request, opts...)
97-
}
98-
99-
// Creates a streaming connection to the address specified.
100-
func StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) {
101-
return DefaultClient.StreamRemote(ctx, address, request, opts...)
102-
}
103-
10484
// Publishes a publication using the default client. Using the underlying broker
10585
// set within the options.
106-
func Publish(ctx context.Context, p Publication) error {
107-
return DefaultClient.Publish(ctx, p)
86+
func Publish(ctx context.Context, msg Message) error {
87+
return DefaultClient.Publish(ctx, msg)
88+
}
89+
90+
// Creates a new message using the default client
91+
func NewMessage(topic string, payload interface{}) Message {
92+
return DefaultClient.NewMessage(topic, payload)
10893
}
10994

11095
// Creates a new client with the options passed in
11196
func NewClient(opt ...Option) Client {
11297
return newRpcClient(opt...)
11398
}
11499

115-
// Creates a new publication using the default client
116-
func NewPublication(topic string, message interface{}) Publication {
117-
return DefaultClient.NewPublication(topic, message)
118-
}
119-
120100
// Creates a new request using the default client. Content Type will
121101
// be set to the default within options and use the appropriate codec
122102
func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
123103
return DefaultClient.NewRequest(service, method, request, reqOpts...)
124104
}
125105

126-
// Creates a new protobuf request using the default client
127-
func NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
128-
return DefaultClient.NewProtoRequest(service, method, request, reqOpts...)
129-
}
130-
131-
// Creates a new json request using the default client
132-
func NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
133-
return DefaultClient.NewJsonRequest(service, method, request, reqOpts...)
106+
// Creates a streaming connection with a service and returns responses on the
107+
// channel passed in. It's up to the user to close the streamer.
108+
func NewStream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
109+
return DefaultClient.Stream(ctx, request, opts...)
134110
}
135111

136112
func String() string {

client/mock/mock.go

Lines changed: 4 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,14 @@ func (m *MockClient) Options() client.Options {
4949
return m.Opts
5050
}
5151

52-
func (m *MockClient) NewPublication(topic string, msg interface{}) client.Publication {
53-
return m.Client.NewPublication(topic, msg)
52+
func (m *MockClient) NewMessage(topic string, msg interface{}) client.Message {
53+
return m.Client.NewMessage(topic, msg)
5454
}
5555

5656
func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
5757
return m.Client.NewRequest(service, method, req, reqOpts...)
5858
}
5959

60-
func (m *MockClient) NewProtoRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
61-
return m.Client.NewProtoRequest(service, method, req, reqOpts...)
62-
}
63-
64-
func (m *MockClient) NewJsonRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
65-
return m.Client.NewJsonRequest(service, method, req, reqOpts...)
66-
}
67-
6860
func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
6961
m.Lock()
7062
defer m.Unlock()
@@ -97,55 +89,15 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
9789
return fmt.Errorf("rpc: can't find service %s", req.Method())
9890
}
9991

100-
func (m *MockClient) CallRemote(ctx context.Context, addr string, req client.Request, rsp interface{}, opts ...client.CallOption) error {
101-
m.Lock()
102-
defer m.Unlock()
103-
104-
response, ok := m.Response[req.Service()]
105-
if !ok {
106-
return errors.NotFound("go.micro.client.mock", "service not found")
107-
}
108-
109-
for _, r := range response {
110-
if r.Method != req.Method() {
111-
continue
112-
}
113-
114-
if r.Error != nil {
115-
return r.Error
116-
}
117-
118-
v := reflect.ValueOf(rsp)
119-
120-
if t := reflect.TypeOf(rsp); t.Kind() == reflect.Ptr {
121-
v = reflect.Indirect(v)
122-
}
123-
124-
v.Set(reflect.ValueOf(r.Response))
125-
126-
return nil
127-
}
128-
129-
return fmt.Errorf("rpc: can't find service %s", req.Method())
130-
}
131-
132-
func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) {
133-
m.Lock()
134-
defer m.Unlock()
135-
136-
// TODO: mock stream
137-
return nil, nil
138-
}
139-
140-
func (m *MockClient) StreamRemote(ctx context.Context, addr string, req client.Request, opts ...client.CallOption) (client.Streamer, error) {
92+
func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
14193
m.Lock()
14294
defer m.Unlock()
14395

14496
// TODO: mock stream
14597
return nil, nil
14698
}
14799

148-
func (m *MockClient) Publish(ctx context.Context, p client.Publication, opts ...client.PublishOption) error {
100+
func (m *MockClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
149101
return nil
150102
}
151103

client/mock/mock_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func TestClient(t *testing.T) {
2121
c := NewClient(Response("go.mock", response))
2222

2323
for _, r := range response {
24-
req := c.NewJsonRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"})
24+
req := c.NewRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"})
2525
var rsp interface{}
2626

2727
err := c.Call(context.TODO(), req, &rsp)

client/options.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type Options struct {
4040
type CallOptions struct {
4141
SelectOptions []selector.SelectOption
4242

43+
// Address of remote host
44+
Address string
4345
// Backoff func
4446
Backoff BackoffFunc
4547
// Check if retriable func
@@ -66,7 +68,8 @@ type PublishOptions struct {
6668
}
6769

6870
type RequestOptions struct {
69-
Stream bool
71+
ContentType string
72+
Stream bool
7073

7174
// Other options for implementations of the interface
7275
// can be stored in a context
@@ -226,6 +229,13 @@ func DialTimeout(d time.Duration) Option {
226229

227230
// Call Options
228231

232+
// WithAddress sets the remote address to use rather than using service discovery
233+
func WithAddress(a string) CallOption {
234+
return func(o *CallOptions) {
235+
o.Address = a
236+
}
237+
}
238+
229239
func WithSelectOption(so ...selector.SelectOption) CallOption {
230240
return func(o *CallOptions) {
231241
o.SelectOptions = append(o.SelectOptions, so...)
@@ -281,6 +291,12 @@ func WithDialTimeout(d time.Duration) CallOption {
281291

282292
// Request Options
283293

294+
func WithContentType(ct string) RequestOption {
295+
return func(o *RequestOptions) {
296+
o.ContentType = ct
297+
}
298+
}
299+
284300
func StreamingRequest() RequestOption {
285301
return func(o *RequestOptions) {
286302
o.Stream = true

0 commit comments

Comments
 (0)