Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/integration/server_side_apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ func TestServerSideApply(t *testing.T) {
// check that the manager.Wait call blocks for at least 25 secunds
start := time.Now()

err = manager.Wait(t.Context(), object.ObjMetadataSet{object.UnstructuredToObjMetadata(deploy)}, fluxssa.WaitOptions{FailFast: true, Timeout: 2 * time.Second, Interval: 1 * time.Second})
require.ErrorContains(t, err, "timeout waiting for")

err = manager.Wait(t.Context(), object.ObjMetadataSet{object.UnstructuredToObjMetadata(deploy)}, fluxssa.WaitOptions{FailFast: true, Timeout: 1 * time.Minute, Interval: 1 * time.Second})
require.NoError(t, err)

Expand Down
55 changes: 55 additions & 0 deletions kubernetes/ssa/cli/cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

// Package cli provides helper functions for CLI applications using the SSA manager.
package cli

import (
"context"

"github.com/fluxcd/cli-utils/pkg/object"

"github.com/siderolabs/go-kubernetes/kubernetes/ssa"
)

// LogApplyResults logs the results of an SSA apply operation.
func LogApplyResults(ctx context.Context, changes []ssa.Change, manager *ssa.Manager, logFunc func(line string, args ...any)) {
for _, change := range changes {
switch change.Action {
case ssa.CreatedAction, ssa.ConfiguredAction, ssa.DeletedAction:
logFunc(" < %s %s", change.Action, change.Subject)
logFunc("%s", change.Diff)
case ssa.SkippedAction, ssa.UnchangedAction:
logFunc(" > skipped %s: no changes", change.Subject)
default:
logFunc(" > processing manifest %s: unknown action %q", change.Subject, change.Action)
}
}
}

// Wait waits for the given set of changes to be fully reconciled.
func Wait(ctx context.Context, changes []ssa.Change, logFunc func(line string, args ...any), manager *ssa.Manager, waitOps ssa.WaitOptions) error {
waitObjects := make(map[object.ObjMetadata]struct{}, len(changes))

for _, change := range changes {
switch change.Action {
case ssa.CreatedAction, ssa.ConfiguredAction:
waitObjects[change.ObjMetadata] = struct{}{}
}
}

logFunc("waiting for kubernetes objects to be fully reconciled")

err := manager.Wait(ctx,
object.ObjMetadataSetFromMap(waitObjects),
waitOps,
)
if err != nil {
return err
}

logFunc("done")

return nil
}
14 changes: 6 additions & 8 deletions kubernetes/ssa/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ func (m *Manager) diff(
if err != nil {
return nil, "", err
}

// inventory conflict check: only relevant for modified objects
err = checkInventoryPolicy(invID, inClusterObj, invPolicy)
if err != nil {
return nil, "", invPolicyFailureErr(inputObj, err)
}
}

// should never happen, but just in case
Expand All @@ -179,14 +185,6 @@ func (m *Manager) diff(
}
}

// inventory conflict check: only relevant for modified objects
if changeSet.Action == ssa.ConfiguredAction {
err = checkInventoryPolicy(invID, inClusterObj, invPolicy)
if err != nil {
return nil, "", invPolicyFailureErr(inputObj, err)
}
}

return changeSet, diff, nil
}

Expand Down
12 changes: 12 additions & 0 deletions kubernetes/ssa/inventory_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ const (
InventoryPolicyAdoptAll InventoryPolicy = "AdoptAll"
)

func ParseInventoryPolicy(policy string) (InventoryPolicy, error) {
p := InventoryPolicy(policy)

switch p {
case InventoryPolicyMustMatch, InventoryPolicyAdoptIfNoInventory, InventoryPolicyAdoptAll:
return p, nil
}

return "", fmt.Errorf("invalid inventory policy %q: must be one of %s, %s, or %s",
policy, InventoryPolicyMustMatch, InventoryPolicyAdoptIfNoInventory, InventoryPolicyAdoptAll)
}

type annotated interface {
GetAnnotations() map[string]string
}
Expand Down
7 changes: 7 additions & 0 deletions kubernetes/ssa/object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package object

import (
fluxobj "github.com/fluxcd/cli-utils/pkg/object"
"k8s.io/apimachinery/pkg/runtime"
)

// ObjMetadata is a single object metadata.
Expand All @@ -22,3 +23,9 @@ type UnstructuredSet = fluxobj.UnstructuredSet
func ParseObjMetadata(s string) (ObjMetadata, error) {
return fluxobj.ParseObjMetadata(s)
}

// RuntimeToObjMeta extracts the object metadata information from a
// runtime.Object and returns it as ObjMetadata.
func RuntimeToObjMeta(obj runtime.Object) (ObjMetadata, error) {
return fluxobj.RuntimeToObjMeta(obj)
}
5 changes: 5 additions & 0 deletions kubernetes/ssa/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,10 @@ type WaitOptions = ssa.WaitOptions

// Wait checks if the given set of objects has been fully reconciled.
func (m *Manager) Wait(ctx context.Context, set object.ObjMetadataSet, opts WaitOptions) error {
// remove once https://github.com/fluxcd/pkg/pull/1133 is released
if len(set) == 0 {
return nil
}

return m.resourceManager.WaitForSetWithContext(ctx, set, opts)
}