Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions cmd/pitr/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,12 @@ func (c *Collector) lastGTIDSet(ctx context.Context, suffix string) (pxc.GTIDSet
}

func (c *Collector) newDB(ctx context.Context) error {
host, err := pxc.GetPXCOldestBinlogHost(ctx, c.pxcServiceName, c.pxcUser, c.pxcPass)
prevHost := ""
if c.db != nil {
prevHost = c.db.GetHost()
}

host, err := pxc.GetPXCOldestBinlogHost(ctx, c.pxcServiceName, c.pxcUser, c.pxcPass, prevHost)
if err != nil {
return errors.Wrap(err, "get host")
}
Expand All @@ -316,7 +321,7 @@ func (c *Collector) close() error {
return c.db.Close()
}

func (c *Collector) removeEmptyBinlogs(ctx context.Context, logs []pxc.Binlog) ([]pxc.Binlog, error) {
func (c *Collector) removeEmptyBinlogs(logs []pxc.Binlog) ([]pxc.Binlog, error) {
result := make([]pxc.Binlog, 0)
for _, v := range logs {
if !v.GTIDSet.IsEmpty() {
Expand All @@ -328,7 +333,7 @@ func (c *Collector) removeEmptyBinlogs(ctx context.Context, logs []pxc.Binlog) (

func (c *Collector) filterBinLogs(ctx context.Context, logs []pxc.Binlog, lastBinlogName string) ([]pxc.Binlog, error) {
if lastBinlogName == "" {
return c.removeEmptyBinlogs(ctx, logs)
return c.removeEmptyBinlogs(logs)
}

logsLen := len(logs)
Expand All @@ -353,7 +358,7 @@ func (c *Collector) filterBinLogs(ctx context.Context, logs []pxc.Binlog, lastBi
startIndex++
}

return c.removeEmptyBinlogs(ctx, logs[startIndex:])
return c.removeEmptyBinlogs(logs[startIndex:])
}

func createGapFile(gtidSet pxc.GTIDSet) error {
Expand Down
16 changes: 13 additions & 3 deletions cmd/pitr/pxc/pxc.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,17 +317,21 @@ func GetPXCFirstHost(ctx context.Context, pxcServiceName string) (string, error)
return lastHost, nil
}

func GetPXCOldestBinlogHost(ctx context.Context, pxcServiceName, user, pass string) (string, error) {
func GetPXCOldestBinlogHost(ctx context.Context, pxcServiceName, user, pass, prevHost string) (string, error) {
nodes, err := GetNodesByServiceName(ctx, pxcServiceName)
if err != nil {
return "", errors.Wrap(err, "get nodes by service name")
}

var oldestHost string
var oldestTS int64
var prevNode string
for _, node := range nodes {
nodeArr := strings.Split(node, ":")
if prevHost != "" && nodeArr[0] == prevHost {
prevNode = node
}
if strings.Contains(node, "wsrep_ready:ON:wsrep_connected:ON:wsrep_local_state_comment:Synced:wsrep_cluster_status:Primary") {
nodeArr := strings.Split(node, ":")
binlogTime, err := getBinlogTime(ctx, nodeArr[0], user, pass)
if err != nil {
log.Printf("ERROR: get binlog time: %v", err)
Expand All @@ -337,14 +341,20 @@ func GetPXCOldestBinlogHost(ctx context.Context, pxcServiceName, user, pass stri
oldestHost = nodeArr[0]
oldestTS = binlogTime
}

}
}

if len(oldestHost) == 0 {
return "", errors.New("can't find host")
}

if prevHost != "" && prevHost != oldestHost {
if !strings.Contains(prevNode, "wsrep_ready:ON:wsrep_connected:ON:wsrep_local_state_comment:Synced:wsrep_cluster_status:Primary") {
log.Printf("switching PITR binlog source from %s to %s because current source host %s is not healthy (not Synced/Primary)", prevHost, oldestHost, prevHost)
} else {
log.Printf("switching PITR binlog source from %s to %s because host %s has the oldest available binlog", prevHost, oldestHost, oldestHost)
}
}
return oldestHost, nil
}

Expand Down
Loading