Skip to content

Commit 6726fc3

Browse files
authored
Merge pull request #11 from swisscom/master
add elasticdump direct S3 option
2 parents 94e1456 + 53551ec commit 6726fc3

File tree

18 files changed

+199
-109
lines changed

18 files changed

+199
-109
lines changed

.env

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ export BACKMAN_CONFIG='{
1414
"encryption_key":"a_super_strong_example_key"
1515
},
1616
"services": {
17+
"my-elasticsearch": {
18+
"direct_s3": false
19+
},
1720
"my_mysql_db": {
1821
"force_import": true
1922
},

README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,12 @@ Possible JSON properties:
9494
- `services.<service-instance>.timeout`: optional, backman will abort a running backup/restore if timeout is exceeded
9595
- `services.<service-instance>.retention.days`: optional, specifies how long backman will keep backups on S3 at maximum for this service instance
9696
- `services.<service-instance>.retention.files`: optional, specifies how maximum number of files backman will keep on S3 for this service instance
97+
- `services.<service-instance>.direct_s3`: optional / Elasticsearch-specific, bypasses backman internal backup stream and encryption entirely, streaming directly from/to S3 via elasticdump
9798
- `services.<service-instance>.disable_column_statistics`: optional / MySQL-specific, allows for disabling export of column statistics. Set to `true` to avoid issues with pre-8.0 versions of MySQL
98-
- `services.<service-instance>.force_import`: optional / MySQL-specific. Set to `true` to use the `--force` flag for mysql, ignoring any errors that might occur while importing backups.
99-
- `services.<service-instance>.local_backup_path`: optional / PostgreSQL-specific, path where to store backup files locally first before uploading them. Otherwise streams directly onto s3 if not specified.
99+
- `services.<service-instance>.force_import`: optional / MySQL-specific. Set to `true` to use the `--force` flag for mysql, ignoring any errors that might occur while importing backups
100+
- `services.<service-instance>.local_backup_path`: optional / PostgreSQL-specific, path where to store backup files locally first before uploading them. Otherwise streams directly to S3 if not specified
101+
- `services.<service-instance>.backup_options`: optional, allows specifying additional parameters and flags for service backup executable
102+
- `services.<service-instance>.restore_options`: optional, allows specifying additional parameters and flags for service restore executable
100103

101104
Note: Usage of `s3.encryption_key` is not backward compatible! Backups generated without or with a different encryption key cannot be downloaded or restored anymore.
102105

config.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
"bucket_name": "my-database-backups"
1212
},
1313
"services": {
14+
"my-elasticsearch": {
15+
"direct_s3": true
16+
},
1417
"my_postgres_db": {
1518
"schedule": "0 0 2,18 * * *",
1619
"timeout": "2h",

config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type ServiceConfig struct {
4444
Days int
4545
Files int
4646
}
47+
DirectS3 bool `json:"direct_s3"`
4748
DisableColumnStatistics bool `json:"disable_column_statistics"`
4849
ForceImport bool `json:"force_import"`
4950
LocalBackupPath string `json:"local_backup_path"`
@@ -163,6 +164,9 @@ func Get() *Config {
163164
if serviceConfig.Retention.Files > 0 {
164165
mergedServiceConfig.Retention.Files = serviceConfig.Retention.Files
165166
}
167+
if serviceConfig.DirectS3 {
168+
mergedServiceConfig.DirectS3 = serviceConfig.DirectS3
169+
}
166170
if serviceConfig.DisableColumnStatistics {
167171
mergedServiceConfig.DisableColumnStatistics = serviceConfig.DisableColumnStatistics
168172
}

manifest.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ applications:
2323

2424
# ### push either as docker image
2525
docker:
26-
image: jamesclonk/backman:1.27.2 # choose version from https://hub.docker.com/r/jamesclonk/backman/tags, or 'latest'
26+
image: jamesclonk/backman:1.28.0 # choose version from https://hub.docker.com/r/jamesclonk/backman/tags, or 'latest'
2727
# ### or as buildpack/src
2828
# buildpacks:
2929
# - https://github.com/cloudfoundry/apt-buildpack
@@ -62,6 +62,9 @@ applications:
6262
"files": 500
6363
}
6464
},
65+
"my-elasticsearch": {
66+
"direct_s3": true
67+
},
6568
"my_mysql_db": {
6669
"timeout": "35m",
6770
"disable_column_statistics": true

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "backman",
33
"dependencies": {
4-
"elasticdump": "6.7.5"
4+
"elasticdump": "6.69.2"
55
}
66
}

s3/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ import (
1616
type Client struct {
1717
Client *minio.Client
1818
BucketName string
19+
Endpoint string
20+
AccessKey string
21+
SecretKey string
1922
}
2023

2124
func New(app *cfenv.App) *Client {
@@ -95,5 +98,8 @@ func New(app *cfenv.App) *Client {
9598
return &Client{
9699
Client: minioClient,
97100
BucketName: bucketName,
101+
Endpoint: endpoint,
102+
AccessKey: accessKeyID,
103+
SecretKey: secretAccessKey,
98104
}
99105
}

service/elasticsearch/backup.go

Lines changed: 94 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -43,86 +43,118 @@ func Backup(ctx context.Context, s3 *s3.Client, service util.Service, binding *c
4343

4444
u, _ := url.Parse(host)
4545
connectstring := fmt.Sprintf("%s://%s:%s@%s", u.Scheme, username, password, u.Host)
46+
objectPath := fmt.Sprintf("%s/%s/%s", service.Label, service.Name, filename)
4647

4748
// prepare elasticdump command
4849
var command []string
4950
command = append(command, "elasticdump")
5051
command = append(command, "--quiet")
5152
command = append(command, fmt.Sprintf("--input=%s", connectstring))
52-
command = append(command, "--output=$")
5353

54-
log.Debugf("executing elasticsearch backup command: %v", strings.Join(command, " "))
55-
cmd := exec.CommandContext(ctx, command[0], command[1:]...)
54+
// stream to stdout (default behaviour) or directly onto s3 (new behaviour)?
55+
if service.DirectS3 {
56+
command = append(command, fmt.Sprintf("--output=s3://%s/%s", s3.BucketName, objectPath))
57+
command = append(command, "--s3Endpoint")
58+
command = append(command, s3.Endpoint)
59+
command = append(command, "--s3AccessKeyId")
60+
command = append(command, s3.AccessKey)
61+
command = append(command, "--s3SecretAccessKey")
62+
command = append(command, s3.SecretKey)
63+
command = append(command, "--s3Compress")
64+
command = append(command, service.BackupOptions...)
65+
66+
log.Debugf("executing elasticsearch direct S3 backup command: %v", strings.Join(command, " "))
67+
cmd := exec.CommandContext(ctx, command[0], command[1:]...)
68+
69+
if err := cmd.Run(); err != nil {
70+
log.Errorf("could not run elasticdump: %v", err)
71+
state.BackupFailure(service, filename)
5672

57-
// capture stdout to pass to gzipping buffer
58-
outPipe, err := cmd.StdoutPipe()
59-
if err != nil {
60-
log.Errorf("could not get stdout pipe for elasticdump: %v", err)
61-
state.BackupFailure(service, filename)
62-
return err
63-
}
64-
defer outPipe.Close()
65-
66-
var uploadWait sync.WaitGroup
67-
uploadCtx, uploadCancel := context.WithCancel(context.Background()) // allows upload to be cancelable, in case backup times out
68-
defer uploadCancel() // cancel upload in case Backup() exits before uploadWait is done
69-
70-
// start upload in background, streaming output onto S3
71-
uploadWait.Add(1)
72-
go func() {
73-
defer uploadWait.Done()
74-
75-
// gzipping stdout
76-
pr, pw := io.Pipe()
77-
gw := gzip.NewWriter(pw)
78-
gw.Name = strings.TrimSuffix(filename, ".gz")
79-
gw.ModTime = time.Now()
80-
go func() {
81-
_, _ = io.Copy(gw, bufio.NewReader(outPipe))
82-
if err := gw.Flush(); err != nil {
83-
log.Errorf("%v", err)
73+
// check for timeout error
74+
if ctx.Err() == context.DeadlineExceeded {
75+
return fmt.Errorf("elasticdump: timeout: %v", ctx.Err())
8476
}
85-
if err := gw.Close(); err != nil {
86-
log.Errorf("%v", err)
87-
}
88-
if err := pw.Close(); err != nil {
89-
log.Errorf("%v", err)
90-
}
91-
}()
77+
return fmt.Errorf("elasticdump: %v", err)
78+
}
79+
time.Sleep(1 * time.Second)
80+
state.BackupSuccess(service, filename)
81+
82+
} else {
83+
command = append(command, "--output=$")
84+
command = append(command, service.BackupOptions...)
9285

93-
objectPath := fmt.Sprintf("%s/%s/%s", service.Label, service.Name, filename)
94-
err = s3.UploadWithContext(uploadCtx, objectPath, pr, -1)
86+
log.Debugf("executing elasticsearch backup command: %v", strings.Join(command, " "))
87+
cmd := exec.CommandContext(ctx, command[0], command[1:]...)
88+
89+
// capture stdout to pass to gzipping buffer
90+
outPipe, err := cmd.StdoutPipe()
9591
if err != nil {
96-
log.Errorf("could not upload service backup [%s] to S3: %v", service.Name, err)
92+
log.Errorf("could not get stdout pipe for elasticdump: %v", err)
9793
state.BackupFailure(service, filename)
94+
return err
9895
}
99-
}()
100-
time.Sleep(2 * time.Second) // wait for upload goroutine to be ready
96+
defer outPipe.Close()
10197

102-
// capture and read stderr in case an error occurs
103-
var errBuf bytes.Buffer
104-
cmd.Stderr = &errBuf
98+
var uploadWait sync.WaitGroup
99+
uploadCtx, uploadCancel := context.WithCancel(context.Background()) // allows upload to be cancelable, in case backup times out
100+
defer uploadCancel() // cancel upload in case Backup() exits before uploadWait is done
105101

106-
if err := cmd.Start(); err != nil {
107-
log.Errorf("could not run elasticdump: %v", err)
108-
state.BackupFailure(service, filename)
109-
return err
110-
}
102+
// start upload in background, streaming output onto S3
103+
uploadWait.Add(1)
104+
go func() {
105+
defer uploadWait.Done()
106+
107+
// gzipping stdout
108+
pr, pw := io.Pipe()
109+
gw := gzip.NewWriter(pw)
110+
gw.Name = strings.TrimSuffix(filename, ".gz")
111+
gw.ModTime = time.Now()
112+
go func() {
113+
_, _ = io.Copy(gw, bufio.NewReader(outPipe))
114+
if err := gw.Flush(); err != nil {
115+
log.Errorf("%v", err)
116+
}
117+
if err := gw.Close(); err != nil {
118+
log.Errorf("%v", err)
119+
}
120+
if err := pw.Close(); err != nil {
121+
log.Errorf("%v", err)
122+
}
123+
}()
124+
125+
err = s3.UploadWithContext(uploadCtx, objectPath, pr, -1)
126+
if err != nil {
127+
log.Errorf("could not upload service backup [%s] to S3: %v", service.Name, err)
128+
state.BackupFailure(service, filename)
129+
}
130+
}()
131+
time.Sleep(2 * time.Second) // wait for upload goroutine to be ready
132+
133+
// capture and read stderr in case an error occurs
134+
var errBuf bytes.Buffer
135+
cmd.Stderr = &errBuf
111136

112-
if err := cmd.Wait(); err != nil {
113-
state.BackupFailure(service, filename)
114-
// check for timeout error
115-
if ctx.Err() == context.DeadlineExceeded {
116-
return fmt.Errorf("elasticdump: timeout: %v", ctx.Err())
137+
if err := cmd.Start(); err != nil {
138+
log.Errorf("could not run elasticdump: %v", err)
139+
state.BackupFailure(service, filename)
140+
return err
117141
}
118142

119-
log.Errorln(strings.TrimRight(errBuf.String(), "\r\n"))
120-
return fmt.Errorf("elasticdump: %v", err)
121-
}
143+
if err := cmd.Wait(); err != nil {
144+
state.BackupFailure(service, filename)
145+
// check for timeout error
146+
if ctx.Err() == context.DeadlineExceeded {
147+
return fmt.Errorf("elasticdump: timeout: %v", ctx.Err())
148+
}
122149

123-
uploadWait.Wait() // wait for upload to have finished
124-
if err == nil {
125-
state.BackupSuccess(service, filename)
150+
log.Errorln(strings.TrimRight(errBuf.String(), "\r\n"))
151+
return fmt.Errorf("elasticdump: %v", err)
152+
}
153+
154+
uploadWait.Wait() // wait for upload to have finished
155+
if err == nil {
156+
state.BackupSuccess(service, filename)
157+
}
126158
}
127-
return err
159+
return nil
128160
}

0 commit comments

Comments
 (0)