Skip to content

Commit 4a5761c

Browse files
committed
rbd: implement volume group using go-ceph
This adds the required functionality to call the go-ceph API's for the rbd volume group. Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
1 parent c45cdc9 commit 4a5761c

File tree

3 files changed

+313
-6
lines changed

3 files changed

+313
-6
lines changed

internal/csi-addons/rbd/replication.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
3030
"github.com/ceph/ceph-csi/internal/rbd"
3131
corerbd "github.com/ceph/ceph-csi/internal/rbd"
32+
rbd_group "github.com/ceph/ceph-csi/internal/rbd/group"
3233
"github.com/ceph/ceph-csi/internal/rbd/types"
3334
"github.com/ceph/ceph-csi/internal/util"
3435
"github.com/ceph/ceph-csi/internal/util/log"
@@ -818,11 +819,12 @@ func getGRPCError(err error) error {
818819
}
819820

820821
errorStatusMap := map[error]codes.Code{
821-
corerbd.ErrInvalidArgument: codes.InvalidArgument,
822-
corerbd.ErrFlattenInProgress: codes.Aborted,
823-
corerbd.ErrAborted: codes.Aborted,
824-
corerbd.ErrFailedPrecondition: codes.FailedPrecondition,
825-
corerbd.ErrUnavailable: codes.Unavailable,
822+
corerbd.ErrInvalidArgument: codes.InvalidArgument,
823+
corerbd.ErrFlattenInProgress: codes.Aborted,
824+
corerbd.ErrAborted: codes.Aborted,
825+
corerbd.ErrFailedPrecondition: codes.FailedPrecondition,
826+
corerbd.ErrUnavailable: codes.Unavailable,
827+
rbd_group.ErrRBDGroupUnAvailable: codes.Unavailable,
826828
}
827829

828830
for e, code := range errorStatusMap {

internal/rbd/group/volume_group.go

Lines changed: 303 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ import (
2121
"errors"
2222
"fmt"
2323
"strings"
24+
"time"
2425

2526
"github.com/ceph/go-ceph/rados"
2627
librbd "github.com/ceph/go-ceph/rbd"
28+
"github.com/ceph/go-ceph/rbd/admin"
2729
"github.com/container-storage-interface/spec/lib/go/csi"
2830
"github.com/csi-addons/spec/lib/go/volumegroup"
2931

@@ -33,7 +35,10 @@ import (
3335
"github.com/ceph/ceph-csi/internal/util/log"
3436
)
3537

36-
var ErrRBDGroupNotConnected = errors.New("RBD group is not connected")
38+
var (
39+
ErrRBDGroupNotConnected = errors.New("RBD group is not connected")
40+
ErrRBDGroupUnAvailable = errors.New("RBD group is unavailable")
41+
)
3742

3843
// volumeGroup handles all requests for 'rbd group' operations.
3944
type volumeGroup struct {
@@ -465,3 +470,300 @@ func (vg *volumeGroup) RemoveVolume(ctx context.Context, vol types.Volume) error
465470
func (vg *volumeGroup) ListVolumes(ctx context.Context) ([]types.Volume, error) {
466471
return vg.volumes, nil
467472
}
473+
474+
func (vg *volumeGroup) EnableMirroring(ctx context.Context, mode librbd.ImageMirrorMode) error {
475+
name, err := vg.GetName(ctx)
476+
if err != nil {
477+
return err
478+
}
479+
480+
ioctx, err := vg.GetIOContext(ctx)
481+
if err != nil {
482+
return err
483+
}
484+
485+
err = librbd.MirrorGroupEnable(ioctx, name, mode)
486+
if err != nil {
487+
return fmt.Errorf("failed to enable mirroring on volume group %q: %w", vg, err)
488+
}
489+
490+
log.DebugLog(ctx, "mirroring is enabled on the volume group %q", vg)
491+
492+
return nil
493+
}
494+
495+
func (vg *volumeGroup) DisableMirroring(ctx context.Context, force bool) error {
496+
name, err := vg.GetName(ctx)
497+
if err != nil {
498+
return err
499+
}
500+
501+
ioctx, err := vg.GetIOContext(ctx)
502+
if err != nil {
503+
return err
504+
}
505+
506+
err = librbd.MirrorGroupDisable(ioctx, name, force)
507+
if err != nil && !errors.Is(rados.ErrNotFound, err) {
508+
return fmt.Errorf("failed to disable mirroring on volume group %q: %w", vg, err)
509+
}
510+
511+
log.DebugLog(ctx, "mirroring is disabled on the volume group %q", vg)
512+
513+
return nil
514+
}
515+
516+
func (vg *volumeGroup) Promote(ctx context.Context, force bool) error {
517+
name, err := vg.GetName(ctx)
518+
if err != nil {
519+
return err
520+
}
521+
522+
ioctx, err := vg.GetIOContext(ctx)
523+
if err != nil {
524+
return err
525+
}
526+
527+
err = librbd.MirrorGroupPromote(ioctx, name, force)
528+
if err != nil {
529+
return fmt.Errorf("failed to promote volume group %q: %w", vg, err)
530+
}
531+
532+
log.DebugLog(ctx, "volume group %q has been promoted", vg)
533+
534+
return nil
535+
}
536+
537+
func (vg *volumeGroup) ForcePromote(ctx context.Context, cr *util.Credentials) error {
538+
promoteArgs := []string{
539+
"mirror", "group", "promote",
540+
vg.String(),
541+
"--force",
542+
"--id", cr.ID,
543+
"-m", vg.monitors,
544+
"--keyfile=" + cr.KeyFile,
545+
}
546+
_, stderr, err := util.ExecCommandWithTimeout(
547+
ctx,
548+
// 2 minutes timeout as the Replication RPC timeout is 2.5 minutes.
549+
2*time.Minute,
550+
"rbd",
551+
promoteArgs...,
552+
)
553+
if err != nil {
554+
return fmt.Errorf("failed to promote group %q with error: %w", vg, err)
555+
}
556+
557+
if stderr != "" {
558+
return fmt.Errorf("failed to promote group %q with stderror: %s", vg, stderr)
559+
}
560+
561+
log.DebugLog(ctx, "volume group %q has been force promoted", vg)
562+
563+
return nil
564+
}
565+
566+
func (vg *volumeGroup) Demote(ctx context.Context) error {
567+
name, err := vg.GetName(ctx)
568+
if err != nil {
569+
return err
570+
}
571+
572+
ioctx, err := vg.GetIOContext(ctx)
573+
if err != nil {
574+
return err
575+
}
576+
577+
err = librbd.MirrorGroupDemote(ioctx, name)
578+
if err != nil {
579+
return fmt.Errorf("failed to demote volume group %q: %w", vg, err)
580+
}
581+
582+
log.DebugLog(ctx, "volume group %q has been demoted", vg)
583+
584+
return nil
585+
}
586+
587+
func (vg *volumeGroup) Resync(ctx context.Context) error {
588+
name, err := vg.GetName(ctx)
589+
if err != nil {
590+
return err
591+
}
592+
593+
ioctx, err := vg.GetIOContext(ctx)
594+
if err != nil {
595+
return err
596+
}
597+
598+
err = librbd.MirrorGroupResync(ioctx, name)
599+
if err != nil {
600+
return fmt.Errorf("failed to resync volume group %q: %w", vg, err)
601+
}
602+
603+
log.DebugLog(ctx, "issued resync on volume group %q", vg)
604+
// If we issued a resync, return a non-final error as image needs to be recreated
605+
// locally. Caller retries till RBD syncs an initial version of the image to
606+
// report its status in the resync request.
607+
return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrRBDGroupUnAvailable)
608+
}
609+
610+
func (vg *volumeGroup) GetMirroringInfo(ctx context.Context) (types.MirrorInfo, error) {
611+
name, err := vg.GetName(ctx)
612+
if err != nil {
613+
return nil, err
614+
}
615+
616+
ioctx, err := vg.GetIOContext(ctx)
617+
if err != nil {
618+
return nil, err
619+
}
620+
621+
info, err := librbd.GetMirrorGroupInfo(ioctx, name)
622+
if err != nil {
623+
return nil, fmt.Errorf("failed to get volume group mirroring info %q: %w", vg, err)
624+
}
625+
626+
return GroupStatus{MirrorGroupInfo: info}, nil
627+
}
628+
629+
func (vg *volumeGroup) GetGlobalMirroringStatus(ctx context.Context) (types.GlobalStatus, error) {
630+
name, err := vg.GetName(ctx)
631+
if err != nil {
632+
return nil, err
633+
}
634+
635+
ioctx, err := vg.GetIOContext(ctx)
636+
if err != nil {
637+
return nil, err
638+
}
639+
statusInfo, err := librbd.GetGlobalMirrorGroupStatus(ioctx, name)
640+
if err != nil {
641+
return nil, fmt.Errorf("failed to get volume group mirroring status %q: %w", vg, err)
642+
}
643+
644+
return GlobalMirrorGroupStatus{GlobalMirrorGroupStatus: &statusInfo}, nil
645+
}
646+
647+
func (vg *volumeGroup) AddSnapshotScheduling(interval admin.Interval, startTime admin.StartTime) error {
648+
ls := admin.NewLevelSpec(vg.pool, vg.namespace, "")
649+
ra, err := vg.conn.GetRBDAdmin()
650+
if err != nil {
651+
return err
652+
}
653+
adminConn := ra.MirrorSnashotSchedule()
654+
err = adminConn.Add(ls, interval, startTime)
655+
if err != nil {
656+
return err
657+
}
658+
659+
return nil
660+
}
661+
662+
func (vg *volumeGroup) ToMirror() (types.Mirror, error) {
663+
return vg, nil
664+
}
665+
666+
// GroupStatus is a wrapper around librbd.MirrorGroupInfo that contains the
667+
// group mirror info.
668+
type GroupStatus struct {
669+
*librbd.MirrorGroupInfo
670+
}
671+
672+
func (status GroupStatus) GetState() string {
673+
return status.State.String()
674+
}
675+
676+
func (status GroupStatus) IsPrimary() bool {
677+
return status.Primary
678+
}
679+
680+
// GlobalGroupMirrorStatus is a wrapper around librbd.GlobalGroupMirrorImageStatus that contains the
681+
// global mirror group status.
682+
type GlobalMirrorGroupStatus struct {
683+
*librbd.GlobalMirrorGroupStatus
684+
}
685+
686+
func (status GlobalMirrorGroupStatus) GetState() string {
687+
return status.GlobalMirrorGroupStatus.Info.State.String()
688+
}
689+
690+
func (status GlobalMirrorGroupStatus) IsPrimary() bool {
691+
return status.GlobalMirrorGroupStatus.Info.Primary
692+
}
693+
694+
func (status GlobalMirrorGroupStatus) GetLocalSiteStatus() (types.SiteStatus, error) {
695+
s, err := status.GlobalMirrorGroupStatus.LocalStatus()
696+
if err != nil {
697+
err = fmt.Errorf("failed to get local site status: %w", err)
698+
}
699+
700+
return SiteMirrorGroupStatus{
701+
SiteMirrorGroupStatus: &s,
702+
}, err
703+
}
704+
705+
func (status GlobalMirrorGroupStatus) GetAllSitesStatus() []types.SiteStatus {
706+
var siteStatuses []types.SiteStatus
707+
for i := range status.SiteStatuses {
708+
siteStatuses = append(siteStatuses, SiteMirrorGroupStatus{SiteMirrorGroupStatus: &status.SiteStatuses[i]})
709+
}
710+
711+
return siteStatuses
712+
}
713+
714+
// RemoteStatus returns one SiteMirrorGroupStatus item from the SiteStatuses
715+
// slice that corresponds to the remote site's status. If the remote status
716+
// is not found than the error ErrNotExist will be returned.
717+
func (status GlobalMirrorGroupStatus) GetRemoteSiteStatus(ctx context.Context) (types.SiteStatus, error) {
718+
var (
719+
ss librbd.SiteMirrorGroupStatus
720+
err error = librbd.ErrNotExist
721+
)
722+
723+
for i := range status.SiteStatuses {
724+
log.DebugLog(
725+
ctx,
726+
"Site status of MirrorUUID: %s, state: %s, description: %s, lastUpdate: %v, up: %t",
727+
status.SiteStatuses[i].MirrorUUID,
728+
status.SiteStatuses[i].State,
729+
status.SiteStatuses[i].Description,
730+
status.SiteStatuses[i].LastUpdate,
731+
status.SiteStatuses[i].Up)
732+
733+
if status.SiteStatuses[i].MirrorUUID != "" {
734+
ss = status.SiteStatuses[i]
735+
err = nil
736+
737+
break
738+
}
739+
}
740+
741+
return SiteMirrorGroupStatus{SiteMirrorGroupStatus: &ss}, err
742+
}
743+
744+
// SiteMirrorGroupStatus is a wrapper around librbd.SiteMirrorGroupStatus that contains the
745+
// site mirror group status.
746+
type SiteMirrorGroupStatus struct {
747+
*librbd.SiteMirrorGroupStatus
748+
}
749+
750+
func (status SiteMirrorGroupStatus) GetMirrorUUID() string {
751+
return status.MirrorUUID
752+
}
753+
754+
func (status SiteMirrorGroupStatus) GetState() string {
755+
return status.State.String()
756+
}
757+
758+
func (status SiteMirrorGroupStatus) GetDescription() string {
759+
return status.Description
760+
}
761+
762+
func (status SiteMirrorGroupStatus) IsUP() bool {
763+
return status.Up
764+
}
765+
766+
func (status SiteMirrorGroupStatus) GetLastUpdate() time.Time {
767+
// convert the last update time to UTC
768+
return time.Unix(status.LastUpdate, 0).UTC()
769+
}

internal/rbd/types/group.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,7 @@ type VolumeGroup interface {
6666

6767
// ListVolumes returns a slice with all Volumes in the VolumeGroup.
6868
ListVolumes(ctx context.Context) ([]Volume, error)
69+
70+
// ToMirror converts the VolumeGroup to a Mirror.
71+
ToMirror() (Mirror, error)
6972
}

0 commit comments

Comments
 (0)