diff --git a/config/daemon/daemon.yaml b/config/daemon/daemon.yaml index af5d7c8..fefaa14 100644 --- a/config/daemon/daemon.yaml +++ b/config/daemon/daemon.yaml @@ -38,7 +38,7 @@ spec: resources: limits: cpu: 500m - memory: 512Mi + memory: 128Mi requests: cpu: 10m memory: 64Mi diff --git a/docs/IMPLEMENTATION_PLAN.md b/docs/IMPLEMENTATION_PLAN.md index eda8bd0..71e9f54 100644 --- a/docs/IMPLEMENTATION_PLAN.md +++ b/docs/IMPLEMENTATION_PLAN.md @@ -207,7 +207,7 @@ the full controller+daemon loop can be tested end-to-end. DaemonSet, label a node with `bootc.dev/managed`, and verify the daemon pod starts on that node. -### 4b. Core loop +### 4b. Core loop ✅ - Binary identifies its node name (downward API env var) - Watches its single BootcNode via field selector on `metadata.name` @@ -221,7 +221,7 @@ pod starts on that node. **Validation (e2e):** enhance the existing e2e test to verify the daemon populates BootcNode status from `bootc status`. -### 4c. State machine +### 4c. State machine ✅ - Detect `spec.desiredImage != booted` → set `Idle=False reason=Staging`, run `bootc switch ` (no `--download-only` for now; diff --git a/internal/bootc/executor.go b/internal/bootc/executor.go index 83ff77f..9a8711b 100644 --- a/internal/bootc/executor.go +++ b/internal/bootc/executor.go @@ -7,7 +7,9 @@ import ( "fmt" "os/exec" "strings" + "time" + "github.com/go-logr/logr" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -31,7 +33,7 @@ func NewHostExecutor() *HostExecutor { func (e *HostExecutor) nsenterCmd(ctx context.Context, args ...string) *exec.Cmd { base := []string{ "--target", "1", - "--mount", "--pid", + "--mount", "--pid", "--cgroup", "--setuid", "0", "--setgid", "0", "--env", "--", } @@ -47,19 +49,92 @@ func (e *HostExecutor) Status(ctx context.Context) ([]byte, error) { return out, nil } +// stageUnitName is the systemd transient unit name used for bootc switch. +const stageUnitName = "bootc-operator-switch.service" + func (e *HostExecutor) Stage(ctx context.Context, image string) error { log := logf.FromContext(ctx) - // TODO: use --download-only once available (https://github.com/bootc-dev/bootc/issues/2137) - cmd := e.nsenterCmd(ctx, "bootc", "switch", image) + // Stop any stale unit from a previous daemon incarnation. + e.stopStageUnit() + + // Ideally we'd use systemd-run's `--pipe` here, which would avoid + // having to fetch the unit journal down below, but SELinux blocks it + // (dbus-broker can't access container-labeled fds). + cmd := e.nsenterCmd(ctx, + "systemd-run", "--wait", "--collect", + "--unit", stageUnitName, + // TODO: use --download-only once available + // (https://github.com/bootc-dev/bootc/issues/2137) + "bootc", "switch", image, + ) + cmd.Cancel = func() error { + e.stopStageUnit() + return nil + } log.Info("Executing", "cmd", strings.Join(cmd.Args, " ")) - out, err := cmd.CombinedOutput() - if err != nil { - return fmt.Errorf("running bootc switch: %s: %w", out, err) + cursor := e.journalCursor() + if err := cmd.Run(); err != nil { + // Were we cancelled? + if ctx.Err() != nil { + return ctx.Err() + } + e.copyJournalUnitLogs(log, stageUnitName, cursor) + return fmt.Errorf("running bootc switch: %w", err) } return nil } +// stopStageUnit stops the bootc-operator-switch transient unit if it +// is running. Errors are ignored: the unit may not exist. +func (e *HostExecutor) stopStageUnit() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _ = e.nsenterCmd(ctx, "systemctl", "stop", stageUnitName).Run() +} + +// journalCursor returns the current journal cursor position. Used to scope a +// future journal query to only entries after the cursor position. +func (e *HostExecutor) journalCursor() string { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + out, err := e.nsenterCmd(ctx, + "journalctl", "--show-cursor", "-n", "0", "-o", "cat", + ).Output() + if err != nil { + return "" + } + // Output format: "-- cursor: s=...;i=...;b=...;..." + if after, ok := strings.CutPrefix(strings.TrimSpace(string(out)), "-- cursor: "); ok { + return after + } + return "" +} + +// copyJournalUnitLogs logs recent journal output from the given systemd unit. +// If cursor is non-empty, only entries after that cursor are shown; otherwise +// shows all entries. +func (e *HostExecutor) copyJournalUnitLogs(log logr.Logger, unit string, cursor string) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + args := []string{"journalctl", "-o", "cat", "--no-pager", + "-u", unit, + } + if cursor != "" { + args = append(args, "--after-cursor="+cursor) + } + out, err := e.nsenterCmd(ctx, args...).Output() + if err != nil { + log.Error(err, "Failed to read unit journal", "unit", unit) + return + } + for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") { + if line != "" { + log.Info(line, "unit", unit) + } + } +} + func (e *HostExecutor) Reboot(ctx context.Context) error { log := logf.FromContext(ctx) diff --git a/internal/daemon/reconciler.go b/internal/daemon/reconciler.go index cecd60b..8e05a0f 100644 --- a/internal/daemon/reconciler.go +++ b/internal/daemon/reconciler.go @@ -56,8 +56,8 @@ type BootcNodeReconciler struct { inflight stageOp stageDone chan event.GenericEvent - // rebootIssued tracks whether a reboot has been issued so classifyAction - // can distinguish the Staged→Rebooting. + // rebootIssued tracks whether a reboot has been issued so Reconcile + // can skip reconciliation while the node is shutting down. rebootIssued bool } @@ -78,6 +78,11 @@ func (r *BootcNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, nil } + if r.rebootIssued { + log.Info("Reboot already issued, skipping reconcile") + return ctrl.Result{}, nil + } + var bn bootcv1alpha1.BootcNode if err := r.Get(ctx, req.NamespacedName, &bn); err != nil { if apierrors.IsNotFound(err) { @@ -173,9 +178,9 @@ func (r *BootcNodeReconciler) reconcileBootcNode(ctx context.Context, bn *bootcv } // Nothing to do the desired image matches the booted ones. - // Rest the reconciler to start from a clean state + // Reset the stage backoff to start from a clean state. if digested.Digest().String() == bn.Status.Booted.ImageDigest { - r.reset() + r.inflight.reset() return reconcileResult{}, nil } @@ -211,9 +216,6 @@ func (r *BootcNodeReconciler) reconcileBootcNode(ctx context.Context, bn *bootcv reason = bootcv1alpha1.NodeReasonRebooting res.needsReboot = true - case actionAwaitReboot: - reason = bootcv1alpha1.NodeReasonRebooting - case actionAwaitBooted: reason = bootcv1alpha1.NodeReasonStaged log.Info("Image staged", "image", desiredImage) @@ -266,21 +268,31 @@ func (s *stageOp) acquire(log logr.Logger, image string, cancel context.CancelFu s.err = nil } -func (r *BootcNodeReconciler) reset() { - r.rebootIssued = false - r.inflight.mu.Lock() - defer r.inflight.mu.Unlock() - r.inflight.retries = 0 +func (s *stageOp) reset() { + s.mu.Lock() + defer s.mu.Unlock() + s.retries = 0 } // run executes bootc stage in a goroutine. The results are delivered via the done channel. +// +// Technically, multiple goroutines may pile up here waiting on runMu if +// there's multiple rapid image changes (e.g. B→C→D spawns three goroutines). +// This is harmless: each cancelled goroutine checks ctx.Err() after acquiring +// the lock and exits immediately without starting a process. func (s *stageOp) run(ctx context.Context, nodeName, image string, executor bootc.Executor, done chan<- event.GenericEvent) { s.runMu.Lock() defer s.runMu.Unlock() log := logf.FromContext(context.Background()).WithValues("node", nodeName, "image", image) - // TODO: exec bootc switch async and select on the cancel channel to send SIGINT for graceful shutdown. + if ctx.Err() != nil { + log.Info("Stage skipped, context already cancelled") + return + } + + // NB: this uses exec.CommandContext, which will SIGKILL bootc if the + // context is cancelled. err := executor.Stage(ctx, image) s.mu.Lock() @@ -331,7 +343,6 @@ const ( actionAwaitStage // stage in-flight, waiting for completion actionAwaitBooted // staged, waiting for reboot approval actionReboot // staged + approved, issue reboot - actionAwaitReboot // reboot issued, waiting for completion ) func (r *BootcNodeReconciler) classifyAction(bn *bootcv1alpha1.BootcNode, digested reference.Digested, desiredImage string) updateAction { @@ -348,12 +359,6 @@ func (r *BootcNodeReconciler) classifyAction(bn *bootcv1alpha1.BootcNode, digest return actionAwaitBooted } - // rebootIssued is volatile: if the daemon restarts it resets to false. - // That is safe because either the reboot already landed (booted digest - // matches and we return idle earlier) or it hasn't and we re-issue it. - if r.rebootIssued { - return actionAwaitReboot - } return actionReboot } diff --git a/internal/daemon/reconciler_test.go b/internal/daemon/reconciler_test.go index b8f2985..2ba1123 100644 --- a/internal/daemon/reconciler_test.go +++ b/internal/daemon/reconciler_test.go @@ -99,7 +99,7 @@ func TestReconcileBootcStatusError(t *testing.T) { g.SetDefaultEventuallyPollingInterval(pollInterval) ctx := context.Background() - fake.reset() + resetDaemon() fake.setStatusErr(errors.New(bootcStatusErrMsg)) bn := testutil.NewNode(testNodeName, testutil.ImageDigestRefA) @@ -126,7 +126,7 @@ func TestStagingTriggered(t *testing.T) { g.SetDefaultEventuallyPollingInterval(pollInterval) ctx := context.Background() - fake.reset() + resetDaemon() fake.status = newBootcStatus(testutil.DigestA) bn := testutil.NewNode(testNodeName, testutil.ImageDigestRefB) @@ -163,7 +163,7 @@ func TestStagingError(t *testing.T) { g.SetDefaultEventuallyPollingInterval(pollInterval) ctx := context.Background() - fake.reset() + resetDaemon() fake.status = newBootcStatus(testutil.DigestA) fake.setStageErr(errors.New(stageErrMsg)) @@ -196,7 +196,7 @@ func TestAlreadyStaged(t *testing.T) { g.SetDefaultEventuallyPollingInterval(pollInterval) ctx := context.Background() - fake.reset() + resetDaemon() fake.status = newBootcStatus(testutil.DigestA) fake.status.Status.Staged = newBootEntry(testutil.ImageDigestRefB, testutil.DigestB) @@ -225,7 +225,7 @@ func TestRebootingSet(t *testing.T) { g.SetDefaultEventuallyPollingInterval(pollInterval) ctx := context.Background() - fake.reset() + resetDaemon() fake.status = newBootcStatus(testutil.DigestA) fake.status.Status.Staged = newBootEntry(testutil.ImageDigestRefB, testutil.DigestB) @@ -254,7 +254,7 @@ func TestRollback(t *testing.T) { g.SetDefaultEventuallyPollingInterval(pollInterval) ctx := context.Background() - fake.reset() + resetDaemon() fake.status = newBootcStatus(testutil.DigestA) fake.status.Status.Staged = newBootEntry(testutil.ImageDigestRefB, testutil.DigestB) @@ -285,7 +285,7 @@ func TestCancelInflightStage(t *testing.T) { g.SetDefaultEventuallyPollingInterval(pollInterval) ctx := context.Background() - fake.reset() + resetDaemon() fake.status = newBootcStatus(testutil.DigestA) firstBlock := make(chan struct{}) diff --git a/internal/daemon/suite_test.go b/internal/daemon/suite_test.go index 9f1a094..7a94775 100644 --- a/internal/daemon/suite_test.go +++ b/internal/daemon/suite_test.go @@ -22,11 +22,20 @@ import ( const testNodeName = "test-node" var ( - testEnv *envtest.Environment - k8sClient client.Client - fake *fakeExecutor + testEnv *envtest.Environment + k8sClient client.Client + fake *fakeExecutor + reconciler *BootcNodeReconciler ) +// resetDaemon resets all volatile state on the shared reconciler and +// fake executor so each test starts from a clean slate. +func resetDaemon() { + fake.reset() + reconciler.rebootIssued = false + reconciler.inflight.reset() +} + func TestMain(m *testing.M) { ctrl.SetLogger(zap.New(zap.UseDevMode(true))) @@ -65,12 +74,13 @@ func TestMain(m *testing.M) { os.Exit(1) } - if err := (&BootcNodeReconciler{ + reconciler = &BootcNodeReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), NodeName: testNodeName, Executor: fake, - }).SetupWithManager(mgr); err != nil { + } + if err := reconciler.SetupWithManager(mgr); err != nil { fmt.Fprintf(os.Stderr, "Failed to setup reconciler: %v\n", err) os.Exit(1) } diff --git a/test/e2e/bootcnode_test.go b/test/e2e/bootcnode_test.go index 1b82178..7f607cf 100644 --- a/test/e2e/bootcnode_test.go +++ b/test/e2e/bootcnode_test.go @@ -131,10 +131,21 @@ func TestUpdateReboot(t *testing.T) { t.Logf("Patched pool to update image %s", updateRef) - // Phase 3: Wait for Idle with the update digest — proves the full + // Phase 3: Wait for Rebooting — the daemon skips reconciliation after + // issuing a reboot, so this state is durable until the node goes down. + g.Eventually(func(g Gomega) { + g.Expect(env.Client.Get(ctx, client.ObjectKey{Name: nodeName}, &bn)).To(Succeed()) + g.Expect(bn.Status.Conditions).To(ContainElement(And( + HaveField("Type", bootcv1alpha1.NodeIdle), + HaveField("Status", metav1.ConditionFalse), + HaveField("Reason", bootcv1alpha1.NodeReasonRebooting), + ))) + }).WithTimeout(5*time.Minute).Should(Succeed(), "expected node to reach Rebooting state") + + t.Logf("Node %q is Rebooting", nodeName) + + // Phase 4: Wait for Idle with the update digest — proves the full // update lifecycle completed (staging, reboot, boot into new image). - // We don't assert on intermediate states (Staging, Rebooting) because - // they are too transient to catch reliably with polling. g.Eventually(func(g Gomega) { g.Expect(env.Client.Get(ctx, client.ObjectKey{Name: nodeName}, &bn)).To(Succeed()) g.Expect(bn.Status.Booted).NotTo(BeNil())