Skip to content

Commit 461df8d

Browse files
authored
Merge pull request #364 from micro/inbox
Add inbox feature to http broker
2 parents ce36d01 + 7c2cbe2 commit 461df8d

File tree

1 file changed

+121
-29
lines changed

1 file changed

+121
-29
lines changed

broker/http_broker.go

Lines changed: 121 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ type httpBroker struct {
4545
subscribers map[string][]*httpSubscriber
4646
running bool
4747
exit chan chan error
48+
49+
// offline message inbox
50+
mtx sync.RWMutex
51+
inbox map[string][][]byte
4852
}
4953

5054
type httpSubscriber struct {
@@ -133,6 +137,7 @@ func newHttpBroker(opts ...Option) Broker {
133137
subscribers: make(map[string][]*httpSubscriber),
134138
exit: make(chan chan error),
135139
mux: http.NewServeMux(),
140+
inbox: make(map[string][][]byte),
136141
}
137142

138143
// specify the message handler
@@ -175,6 +180,49 @@ func (h *httpSubscriber) Unsubscribe() error {
175180
return h.hb.unsubscribe(h)
176181
}
177182

183+
func (h *httpBroker) saveMessage(topic string, msg []byte) {
184+
h.mtx.Lock()
185+
defer h.mtx.Unlock()
186+
187+
// get messages
188+
c := h.inbox[topic]
189+
190+
// save message
191+
c = append(c, msg)
192+
193+
// max length 64
194+
if len(c) > 64 {
195+
c = c[:64]
196+
}
197+
198+
// save inbox
199+
h.inbox[topic] = c
200+
}
201+
202+
func (h *httpBroker) getMessage(topic string, num int) [][]byte {
203+
h.mtx.Lock()
204+
defer h.mtx.Unlock()
205+
206+
// get messages
207+
c, ok := h.inbox[topic]
208+
if !ok {
209+
return nil
210+
}
211+
212+
// more message than requests
213+
if len(c) >= num {
214+
msg := c[:num]
215+
h.inbox[topic] = c[num:]
216+
return msg
217+
}
218+
219+
// reset inbox
220+
h.inbox[topic] = nil
221+
222+
// return all messages
223+
return c
224+
}
225+
178226
func (h *httpBroker) subscribe(s *httpSubscriber) error {
179227
h.Lock()
180228
defer h.Unlock()
@@ -454,14 +502,7 @@ func (h *httpBroker) Options() Options {
454502
}
455503

456504
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
457-
h.RLock()
458-
s, err := h.r.GetService("topic:" + topic)
459-
if err != nil {
460-
h.RUnlock()
461-
return err
462-
}
463-
h.RUnlock()
464-
505+
// create the message first
465506
m := &Message{
466507
Header: make(map[string]string),
467508
Body: msg.Body,
@@ -473,12 +514,26 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
473514

474515
m.Header[":topic"] = topic
475516

517+
// encode the message
476518
b, err := h.opts.Codec.Marshal(m)
477519
if err != nil {
478520
return err
479521
}
480522

481-
pub := func(node *registry.Node, b []byte) {
523+
// save the message
524+
h.saveMessage(topic, b)
525+
526+
// now attempt to get the service
527+
h.RLock()
528+
s, err := h.r.GetService("topic:" + topic)
529+
if err != nil {
530+
h.RUnlock()
531+
// ignore error
532+
return nil
533+
}
534+
h.RUnlock()
535+
536+
pub := func(node *registry.Node, t string, b []byte) error {
482537
scheme := "http"
483538

484539
// check if secure is added in metadata
@@ -491,34 +546,71 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
491546

492547
uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode())
493548
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
494-
if err == nil {
495-
io.Copy(ioutil.Discard, r.Body)
496-
r.Body.Close()
549+
if err != nil {
550+
return err
497551
}
498-
}
499552

500-
for _, service := range s {
501-
// only process if we have nodes
502-
if len(service.Nodes) == 0 {
503-
continue
504-
}
553+
// discard response body
554+
io.Copy(ioutil.Discard, r.Body)
555+
r.Body.Close()
556+
return nil
557+
}
505558

506-
switch service.Version {
507-
// broadcast version means broadcast to all nodes
508-
case broadcastVersion:
509-
for _, node := range service.Nodes {
510-
// publish async
511-
go pub(node, b)
559+
srv := func(s []*registry.Service, b []byte) {
560+
for _, service := range s {
561+
// only process if we have nodes
562+
if len(service.Nodes) == 0 {
563+
continue
512564
}
513-
default:
514-
// select node to publish to
515-
node := service.Nodes[rand.Int()%len(service.Nodes)]
516565

517-
// publish async
518-
go pub(node, b)
566+
switch service.Version {
567+
// broadcast version means broadcast to all nodes
568+
case broadcastVersion:
569+
var success bool
570+
571+
// publish to all nodes
572+
for _, node := range service.Nodes {
573+
// publish async
574+
if err := pub(node, topic, b); err == nil {
575+
success = true
576+
}
577+
}
578+
579+
// save if it failed to publish at least once
580+
if !success {
581+
h.saveMessage(topic, b)
582+
}
583+
default:
584+
// select node to publish to
585+
node := service.Nodes[rand.Int()%len(service.Nodes)]
586+
587+
// publish async to one node
588+
if err := pub(node, topic, b); err != nil {
589+
// if failed save it
590+
h.saveMessage(topic, b)
591+
}
592+
}
519593
}
520594
}
521595

596+
// do the rest async
597+
go func() {
598+
// get a third of the backlog
599+
messages := h.getMessage(topic, 8)
600+
delay := (len(messages) > 1)
601+
602+
// publish all the messages
603+
for _, msg := range messages {
604+
// serialize here
605+
srv(s, msg)
606+
607+
// sending a backlog of messages
608+
if delay {
609+
time.Sleep(time.Millisecond * 100)
610+
}
611+
}
612+
}()
613+
522614
return nil
523615
}
524616

0 commit comments

Comments
 (0)