Skip to content

Commit 2079b05

Browse files
committed
operator: update target rollout to handle out-of-band pod restarts
Signed-off-by: Ryan Koo <rbk65@cornell.edu>
1 parent dcba009 commit 2079b05

File tree

2 files changed

+46
-38
lines changed

2 files changed

+46
-38
lines changed

operator/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ We structure this changelog in accordance with [Keep a Changelog](https://keepac
1313
### Changed
1414

1515
- Updated all tool versions and minor dependencies
16+
- Updated target pod rollout strategy to search for lowest pod ordinal not on new revision instead of relying on `UpdatedReplicas`
1617

1718
### Added
1819

operator/pkg/controllers/target_controllers.go

Lines changed: 45 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -314,64 +314,71 @@ func (r *AIStoreReconciler) syncTargetPodSpec(ctx context.Context, ais *aisv1.AI
314314
return false, nil
315315
}
316316

317-
func (r *AIStoreReconciler) isPodReady(ctx context.Context, ais *aisv1.AIStore, podIndex int32) (ready bool, err error) {
318-
pod, err := r.k8sClient.GetPod(ctx, types.NamespacedName{
319-
Name: target.PodName(ais, podIndex),
320-
Namespace: ais.Namespace,
321-
})
317+
func isPodActive(pod *corev1.Pod) bool {
318+
return pod != nil && pod.DeletionTimestamp == nil
319+
}
322320

323-
if err != nil || pod.DeletionTimestamp != nil {
324-
return false, err
321+
func isPodReady(pod *corev1.Pod) bool {
322+
if !isPodActive(pod) {
323+
return false
325324
}
326-
327325
for _, condition := range pod.Status.Conditions {
328326
if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
329-
return true, nil
327+
return true
330328
}
331329
}
332-
333-
return false, nil
330+
return false
334331
}
335332

336333
func (r *AIStoreReconciler) findPodNeedingUpdate(ctx context.Context, ais *aisv1.AIStore, ss *appsv1.StatefulSet) string {
337334
logger := logf.FromContext(ctx).WithValues("statefulset", ss.Name)
338-
339-
// The next pod index is simply the count of updated replicas
340-
nextPodIndex := ss.Status.UpdatedReplicas
341-
if nextPodIndex >= *ss.Spec.Replicas {
335+
podList, err := r.k8sClient.ListPods(ctx, ais, target.RequiredPodLabels(ais))
336+
if err != nil {
337+
logger.Error(err, "Failed to list target pods")
342338
return ""
343339
}
344-
345-
// Before processing the next pod, ensure the previous pod (if any) is ready
346-
// to ensure only one pod is in a non-ready state at a time (for high availability)
347-
if nextPodIndex > 0 {
348-
if ready, err := r.isPodReady(ctx, ais, nextPodIndex-1); err != nil || !ready {
349-
logger.Info("Waiting for previous pod to be ready", "pod", target.PodName(ais, nextPodIndex-1))
350-
return ""
351-
}
340+
podMap := make(map[string]*corev1.Pod, len(podList.Items))
341+
for i := range podList.Items {
342+
pod := &podList.Items[i]
343+
podMap[pod.Name] = pod
352344
}
353345

354-
podName := target.PodName(ais, nextPodIndex)
355-
pod, err := r.k8sClient.GetPod(ctx, types.NamespacedName{
356-
Name: podName,
357-
Namespace: ais.Namespace,
358-
})
346+
for i := range int(*ss.Spec.Replicas) {
347+
idx := int32(i)
348+
// For HA, previous pod MUST be ready before proceeding
349+
if idx > 0 {
350+
prevName := target.PodName(ais, idx-1)
351+
if !isPodReady(podMap[prevName]) {
352+
logger.Info("Waiting for previous pod to be ready before proceeding with rollout", "pod", prevName)
353+
return ""
354+
}
355+
}
359356

360-
// Wait if pod doesn't exist or is being deleted
361-
if err != nil || pod.DeletionTimestamp != nil {
362-
if err != nil {
357+
podName := target.PodName(ais, idx)
358+
pod := podMap[podName]
359+
// Do not block on current pod being ready (need to be able to rollback/fix a bad upgrade)
360+
if !isPodActive(pod) {
363361
logger.Info("Pod doesn't exist or is being deleted, waiting", "pod", podName)
362+
return ""
364363
}
365-
return ""
366-
}
367364

368-
podRevision := pod.Labels["controller-revision-hash"]
369-
if podRevision == ss.Status.UpdateRevision {
370-
return ""
365+
// Proceed to checking next pod if current pod is up-to-date
366+
podRevision := pod.Labels["controller-revision-hash"]
367+
if podRevision == ss.Status.UpdateRevision {
368+
continue
369+
}
370+
371+
logger.Info(
372+
"Found pod needing update",
373+
"pod", podName,
374+
"currentRevision", podRevision,
375+
"targetRevision", ss.Status.UpdateRevision,
376+
)
377+
378+
return podName
371379
}
372380

373-
logger.Info("Found pod needing update", "pod", podName, "currentRevision", podRevision, "targetRevision", ss.Status.UpdateRevision)
374-
return podName
381+
return ""
375382
}
376383

377384
func (r *AIStoreReconciler) handleTargetRollout(ctx context.Context, ais *aisv1.AIStore, ss *appsv1.StatefulSet) (ctrl.Result, error) {

0 commit comments

Comments
 (0)