Skip to content

Commit 934c456

Browse files
creydrknative-prow-robotpierDipimatzew
authored
[release-v1.17] Add missing backports (#1674)
* [release-1.17] Reduce mt-broker-controller memory usage with namespaced endpoint informer (knative#8421) * Reduce mt-broker-controller memory usage with namespaced endpoint informer Currently, the mt-broker-controller is using a cluster-wide endpoints informer but it actually only uses endpoints in the "SYSTEM_NAMESPACE". Using the namespaced informer factory ensures that the watcher is only watching endpoints in the `knative-eventing` (also known as `SYSTEM_NAMESPACE`) namespace. Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> * Start informer Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> --------- Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com> * [release-1.17] Scheduler: Resync reserved periodically to keep state consistent (knative#8453) Scheduler: Resync reserved periodically to keep state consistent Add resyncReserved removes deleted vPods from reserved to keep the state consistent when leadership changes (Promote / Demote). `initReserved` is not enough since the vPod lister can be stale. Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com> * [release-1.17] Scheduler: log expected vreplicas by vpod (knative#8462) Scheduler: log expected vreplicas by vpod Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com> * [release-1.17] Compare the entire PodTemplateSpec, instead of just its PodSpec (knative#8560) Compare the entire PodTemplateSpec, instead of just its PodSpec Signed-off-by: Matthias Wessendorf <mwessend@redhat.com> Co-authored-by: Matthias Wessendorf <mwessend@redhat.com> * [release-1.17] Fix hardcoded knative-eventing namespace (knative#8577) * dizzy: Remove hard-coded knative-eventing namespace and replace with system.Namespace() and touch test to get that properly injected Signed-off-by: Matthias Wessendorf <mwessend@redhat.com> * broom: Remove yet another hard-coded instance of knative-eventing Signed-off-by: Matthias Wessendorf <mwessend@redhat.com> --------- Signed-off-by: Matthias Wessendorf <mwessend@redhat.com> Co-authored-by: Matthias Wessendorf <mwessend@redhat.com> * [release-1.17] Guard reserved access with lock and create vpods in tests (knative#8504) * Guard reserved access with lock in tests * Create vpods in test Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> --------- Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com> --------- Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> Signed-off-by: Matthias Wessendorf <mwessend@redhat.com> Co-authored-by: Knative Prow Robot <automation+prow-robot@knative.team> Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com> Co-authored-by: Matthias Wessendorf <mwessend@redhat.com>
1 parent a517782 commit 934c456

File tree

11 files changed

+134
-25
lines changed

11 files changed

+134
-25
lines changed

pkg/apis/messaging/v1/in_memory_channel_validation.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ import (
2222

2323
"knative.dev/pkg/apis"
2424
"knative.dev/pkg/kmp"
25+
"knative.dev/pkg/system"
2526

2627
"knative.dev/eventing/pkg/apis/eventing"
28+
_ "knative.dev/pkg/system/testing"
2729
)
2830

29-
const eventingControllerSAName = "system:serviceaccount:knative-eventing:eventing-controller"
31+
var eventingControllerSAName = fmt.Sprintf("%s:%s:%s", "system:serviceaccount", system.Namespace(), "eventing-controller")
3032

3133
func (imc *InMemoryChannel) Validate(ctx context.Context) *apis.FieldError {
3234
errs := imc.Spec.Validate(ctx).ViaField("spec")

pkg/apis/messaging/v1/in_memory_channel_validation_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@ limitations under the License.
1717
package v1
1818

1919
import (
20+
"fmt"
2021
"testing"
2122

23+
"knative.dev/pkg/system"
24+
2225
"golang.org/x/net/context"
2326
authenticationv1 "k8s.io/api/authentication/v1"
2427
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -139,7 +142,7 @@ func TestInMemoryChannelValidation(t *testing.T) {
139142
want: func() *apis.FieldError {
140143
diff, _ := kmp.ShortDiff(validIMCSingleSubscriber.Spec.Subscribers, validIMCTwoSubscribers.Spec.Subscribers)
141144
return &apis.FieldError{
142-
Message: "Channel.Spec.Subscribers changed by user test-user which was not the system:serviceaccount:knative-eventing:eventing-controller service account",
145+
Message: fmt.Sprintf("%s:%s:%s", "Channel.Spec.Subscribers changed by user test-user which was not the system:serviceaccount", system.Namespace(), "eventing-controller service account"),
143146
Paths: []string{"spec.subscribers"},
144147
Details: diff,
145148
}

pkg/reconciler/broker/controller.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ import (
2525
"k8s.io/client-go/tools/cache"
2626
"knative.dev/pkg/apis"
2727
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
28-
endpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints"
2928
"knative.dev/pkg/configmap"
3029
"knative.dev/pkg/controller"
3130
"knative.dev/pkg/injection/clients/dynamicclient"
3231
secretinformer "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret"
32+
namespacedinformerfactory "knative.dev/pkg/injection/clients/namespacedkube/informers/factory"
3333
"knative.dev/pkg/logging"
3434
pkgreconciler "knative.dev/pkg/reconciler"
3535
"knative.dev/pkg/resolver"
@@ -69,7 +69,12 @@ func NewController(
6969
logger := logging.FromContext(ctx)
7070
brokerInformer := brokerinformer.Get(ctx)
7171
subscriptionInformer := subscriptioninformer.Get(ctx)
72-
endpointsInformer := endpointsinformer.Get(ctx)
72+
73+
endpointsInformer := namespacedinformerfactory.Get(ctx).Core().V1().Endpoints()
74+
if err := controller.StartInformers(ctx.Done(), endpointsInformer.Informer()); err != nil {
75+
logger.Fatalw("Failed to start namespaced endpoints informer", zap.Error(err))
76+
}
77+
7378
configmapInformer := configmapinformer.Get(ctx)
7479
secretInformer := secretinformer.Get(ctx)
7580
eventPolicyInformer := eventpolicyinformer.Get(ctx)

pkg/reconciler/broker/resources/eventpolicy.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,21 @@ limitations under the License.
1717
package resources
1818

1919
import (
20+
"fmt"
21+
2022
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2123
"k8s.io/utils/ptr"
2224
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
2325
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
2426
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
2527
"knative.dev/pkg/kmeta"
28+
"knative.dev/pkg/system"
2629
)
2730

31+
var OIDCBrokerSub = fmt.Sprintf("%s:%s:%s", "system:serviceaccount", system.Namespace(), "mt-broker-ingress-oidc")
32+
2833
const (
2934
BackingChannelEventPolicyLabelPrefix = "eventing.knative.dev/"
30-
OIDCBrokerSub = "system:serviceaccount:knative-eventing:mt-broker-ingress-oidc"
3135
brokerKind = "Broker"
3236
)
3337

pkg/reconciler/containersource/containersource.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1.Con
109109
return nil, fmt.Errorf("getting Deployment: %v", err)
110110
} else if !metav1.IsControlledBy(ra, source) {
111111
return nil, fmt.Errorf("deployment %q is not owned by ContainerSource %q", ra.Name, source.Name)
112-
} else if r.podSpecChanged(&ra.Spec.Template.Spec, &expected.Spec.Template.Spec) {
113-
ra.Spec.Template.Spec = expected.Spec.Template.Spec
112+
} else if r.podTemplateChanged(&ra.Spec.Template, &expected.Spec.Template) {
113+
ra.Spec.Template = expected.Spec.Template
114114
ra, err = r.kubeClientSet.AppsV1().Deployments(expected.Namespace).Update(ctx, ra, metav1.UpdateOptions{})
115115
if err != nil {
116116
return nil, fmt.Errorf("updating Deployment: %v", err)
@@ -159,6 +159,10 @@ func (r *Reconciler) podSpecChanged(have *corev1.PodSpec, want *corev1.PodSpec)
159159
return !equality.Semantic.DeepDerivative(want, have)
160160
}
161161

162+
func (r *Reconciler) podTemplateChanged(have *corev1.PodTemplateSpec, want *corev1.PodTemplateSpec) bool {
163+
return !equality.Semantic.DeepDerivative(want, have)
164+
}
165+
162166
func (r *Reconciler) sinkBindingSpecChanged(have *v1.SinkBindingSpec, want *v1.SinkBindingSpec) bool {
163167
return !equality.Semantic.DeepDerivative(want, have)
164168
}

pkg/scheduler/scheduler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.
6666
// VPod represents virtual replicas placed into real Kubernetes pods
6767
// The scheduler is responsible for placing VPods
6868
type VPod interface {
69+
GetDeletionTimestamp() *metav1.Time
70+
6971
// GetKey returns the VPod key (namespace/name).
7072
GetKey() types.NamespacedName
7173

pkg/scheduler/state/state.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -278,23 +278,25 @@ func isPodUnschedulable(pod *v1.Pod) bool {
278278
func (s *State) MarshalJSON() ([]byte, error) {
279279

280280
type S struct {
281-
FreeCap []int32 `json:"freeCap"`
282-
SchedulablePods []int32 `json:"schedulablePods"`
283-
Capacity int32 `json:"capacity"`
284-
Replicas int32 `json:"replicas"`
285-
StatefulSetName string `json:"statefulSetName"`
286-
PodSpread map[string]map[string]int32 `json:"podSpread"`
287-
Pending map[string]int32 `json:"pending"`
281+
FreeCap []int32 `json:"freeCap"`
282+
SchedulablePods []int32 `json:"schedulablePods"`
283+
Capacity int32 `json:"capacity"`
284+
Replicas int32 `json:"replicas"`
285+
StatefulSetName string `json:"statefulSetName"`
286+
PodSpread map[string]map[string]int32 `json:"podSpread"`
287+
Pending map[string]int32 `json:"pending"`
288+
ExpectedVReplicaByVPod map[string]int32 `json:"expectedVReplicaByVPod"`
288289
}
289290

290291
sj := S{
291-
FreeCap: s.FreeCap,
292-
SchedulablePods: s.SchedulablePods,
293-
Capacity: s.Capacity,
294-
Replicas: s.Replicas,
295-
StatefulSetName: s.StatefulSetName,
296-
PodSpread: ToJSONable(s.PodSpread),
297-
Pending: toJSONablePending(s.Pending),
292+
FreeCap: s.FreeCap,
293+
SchedulablePods: s.SchedulablePods,
294+
Capacity: s.Capacity,
295+
Replicas: s.Replicas,
296+
StatefulSetName: s.StatefulSetName,
297+
PodSpread: ToJSONable(s.PodSpread),
298+
Pending: toJSONablePending(s.Pending),
299+
ExpectedVReplicaByVPod: toJSONablePending(s.ExpectedVReplicaByVPod),
298300
}
299301

300302
return json.Marshal(sj)

pkg/scheduler/statefulset/autoscaler.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,17 +205,19 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err
205205
return err
206206
}
207207

208-
logger.Debugw("checking adapter capacity",
209-
zap.Int32("replicas", scale.Spec.Replicas),
210-
zap.Any("state", state))
211-
212208
newReplicas := integer.Int32Max(int32(math.Ceil(float64(state.TotalExpectedVReplicas())/float64(state.Capacity))), a.minReplicas)
213209

214210
// Only scale down if permitted
215211
if !attemptScaleDown && newReplicas < scale.Spec.Replicas {
216212
newReplicas = scale.Spec.Replicas
217213
}
218214

215+
logger.Debugw("checking adapter capacity",
216+
zap.Bool("attemptScaleDown", attemptScaleDown),
217+
zap.Int32("replicas", scale.Spec.Replicas),
218+
zap.Int32("newReplicas", newReplicas),
219+
zap.Any("state", state))
220+
219221
if newReplicas != scale.Spec.Replicas {
220222
scale.Spec.Replicas = newReplicas
221223
logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas))

pkg/scheduler/statefulset/scheduler.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"sort"
2323
"sync"
24+
"sync/atomic"
2425
"time"
2526

2627
"go.uber.org/zap"
@@ -114,6 +115,11 @@ type StatefulSetScheduler struct {
114115
// replicas is the (cached) number of statefulset replicas.
115116
replicas int32
116117

118+
// isLeader signals whether a given Scheduler instance is leader or not.
119+
// The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a
120+
// bucket where we've been promoted.
121+
isLeader atomic.Bool
122+
117123
// reserved tracks vreplicas that have been placed (ie. scheduled) but haven't been
118124
// committed yet (ie. not appearing in vpodLister)
119125
reserved map[types.NamespacedName]map[string]int32
@@ -130,6 +136,9 @@ func (s *StatefulSetScheduler) Promote(b reconciler.Bucket, enq func(reconciler.
130136
if !b.Has(ephemeralLeaderElectionObject) {
131137
return nil
132138
}
139+
// The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore.
140+
// Flip the flag after running initReserved.
141+
defer s.isLeader.Store(true)
133142

134143
if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
135144
return v.Promote(b, enq)
@@ -151,6 +160,9 @@ func (s *StatefulSetScheduler) initReserved() error {
151160

152161
s.reserved = make(map[types.NamespacedName]map[string]int32, len(vPods))
153162
for _, vPod := range vPods {
163+
if !vPod.GetDeletionTimestamp().IsZero() {
164+
continue
165+
}
154166
s.reserved[vPod.GetKey()] = make(map[string]int32, len(vPod.GetPlacements()))
155167
for _, placement := range vPod.GetPlacements() {
156168
s.reserved[vPod.GetKey()][placement.PodName] += placement.VReplicas
@@ -159,8 +171,41 @@ func (s *StatefulSetScheduler) initReserved() error {
159171
return nil
160172
}
161173

174+
// resyncReserved removes deleted vPods from reserved to keep the state consistent when leadership
175+
// changes (Promote / Demote).
176+
// initReserved is not enough since the vPod lister can be stale.
177+
func (s *StatefulSetScheduler) resyncReserved() error {
178+
if !s.isLeader.Load() {
179+
return nil
180+
}
181+
182+
vPods, err := s.vpodLister()
183+
if err != nil {
184+
return fmt.Errorf("failed to list vPods during reserved resync: %w", err)
185+
}
186+
vPodsByK := vPodsByKey(vPods)
187+
188+
s.reservedMu.Lock()
189+
defer s.reservedMu.Unlock()
190+
191+
for key := range s.reserved {
192+
vPod, ok := vPodsByK[key]
193+
if !ok || vPod == nil {
194+
delete(s.reserved, key)
195+
}
196+
}
197+
198+
return nil
199+
}
200+
162201
// Demote implements reconciler.LeaderAware.
163202
func (s *StatefulSetScheduler) Demote(b reconciler.Bucket) {
203+
if !b.Has(ephemeralLeaderElectionObject) {
204+
return
205+
}
206+
// The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore.
207+
defer s.isLeader.Store(false)
208+
164209
if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
165210
v.Demote(b)
166211
}
@@ -208,6 +253,17 @@ func newStatefulSetScheduler(ctx context.Context,
208253
sif.Shutdown()
209254
}()
210255

256+
go func() {
257+
for {
258+
select {
259+
case <-ctx.Done():
260+
return
261+
case <-time.After(cfg.RefreshPeriod * 3):
262+
_ = s.resyncReserved()
263+
}
264+
}
265+
}()
266+
211267
return s
212268
}
213269

@@ -561,3 +617,11 @@ func upsertPlacements(placements []duckv1alpha1.Placement, placement duckv1alpha
561617
}
562618
return placements
563619
}
620+
621+
func vPodsByKey(vPods []scheduler.VPod) map[types.NamespacedName]scheduler.VPod {
622+
r := make(map[types.NamespacedName]scheduler.VPod, len(vPods))
623+
for _, vPod := range vPods {
624+
r[vPod.GetKey()] = vPod
625+
}
626+
return r
627+
}

pkg/scheduler/statefulset/scheduler_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,21 @@ func TestStatefulsetScheduler(t *testing.T) {
708708

709709
vpodClient := tscheduler.NewVPodClient()
710710
vpod := vpodClient.Create(vpodNamespace, vpodName, tc.vreplicas, tc.placements)
711+
for vPodKey, res := range tc.initialReserved {
712+
if vPodKey.Namespace == vpodNamespace && vPodKey.Name == vpodName {
713+
continue
714+
}
715+
var placements []duckv1alpha1.Placement
716+
count := int32(0)
717+
for pod, vReplicas := range res {
718+
count += vReplicas
719+
placements = append(placements, duckv1alpha1.Placement{
720+
PodName: pod,
721+
VReplicas: vReplicas,
722+
})
723+
}
724+
vpodClient.Create(vPodKey.Namespace, vPodKey.Name, count, placements)
725+
}
711726

712727
for i := int32(0); i < tc.replicas; i++ {
713728
nodeName := "node" + fmt.Sprint(i)
@@ -745,7 +760,9 @@ func TestStatefulsetScheduler(t *testing.T) {
745760
t.Fatal("unexpected error", err)
746761
}
747762
if tc.initialReserved != nil {
763+
s.reservedMu.Lock()
748764
s.reserved = tc.initialReserved
765+
s.reservedMu.Unlock()
749766
}
750767

751768
// Give some time for the informer to notify the scheduler and set the number of replicas

0 commit comments

Comments
 (0)