Skip to content

Commit 7c2cbe2

Browse files
committed
better error handling
1 parent abbeb6d commit 7c2cbe2

File tree

1 file changed

+19
-7
lines changed

1 file changed

+19
-7
lines changed

broker/http_broker.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
533533
}
534534
h.RUnlock()
535535

536-
pub := func(node *registry.Node, t string, b []byte) {
536+
pub := func(node *registry.Node, t string, b []byte) error {
537537
scheme := "http"
538538

539539
// check if secure is added in metadata
@@ -547,14 +547,13 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
547547
uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode())
548548
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
549549
if err != nil {
550-
// save on error
551-
h.saveMessage(t, b)
552-
return
550+
return err
553551
}
554552

555553
// discard response body
556554
io.Copy(ioutil.Discard, r.Body)
557555
r.Body.Close()
556+
return nil
558557
}
559558

560559
srv := func(s []*registry.Service, b []byte) {
@@ -567,16 +566,29 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
567566
switch service.Version {
568567
// broadcast version means broadcast to all nodes
569568
case broadcastVersion:
569+
var success bool
570+
571+
// publish to all nodes
570572
for _, node := range service.Nodes {
571573
// publish async
572-
pub(node, topic, b)
574+
if err := pub(node, topic, b); err == nil {
575+
success = true
576+
}
577+
}
578+
579+
// save if it failed to publish at least once
580+
if !success {
581+
h.saveMessage(topic, b)
573582
}
574583
default:
575584
// select node to publish to
576585
node := service.Nodes[rand.Int()%len(service.Nodes)]
577586

578-
// publish async
579-
pub(node, topic, b)
587+
// publish async to one node
588+
if err := pub(node, topic, b); err != nil {
589+
// if failed save it
590+
h.saveMessage(topic, b)
591+
}
580592
}
581593
}
582594
}

0 commit comments

Comments
 (0)