From 4809d16ee2945e3d1b87be6019f657271d62f6ab Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sun, 14 Jun 2026 17:19:45 +0200 Subject: [PATCH 01/15] Event: move type to a separate file, `event_error.go` The reason for the filename will be apparent in a moment. --- pipe/event_error.go | 9 +++++++++ pipe/pipeline.go | 8 -------- 2 files changed, 9 insertions(+), 8 deletions(-) create mode 100644 pipe/event_error.go diff --git a/pipe/event_error.go b/pipe/event_error.go new file mode 100644 index 0000000..1998b1c --- /dev/null +++ b/pipe/event_error.go @@ -0,0 +1,9 @@ +package pipe + +// Event represents anything that could happen during the pipeline execution +type Event struct { + Command string + Msg string + Err error + Context map[string]interface{} +} diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 878cdc6..5b1cc77 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -163,14 +163,6 @@ func WithEnvVarsFunc(valuesFunc ContextValuesFunc) Option { } } -// Event represents anything that could happen during the pipeline execution -type Event struct { - Command string - Msg string - Err error - Context map[string]interface{} -} - // WithEventHandler sets a handler for the pipeline. Setting one will emit // and event for each process. func WithEventHandler(handler func(e *Event)) Option { From e0ee1a03e8789caffc505a66581a7c5966902338 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sun, 14 Jun 2026 17:31:15 +0200 Subject: [PATCH 02/15] EventError: new name for `Event` Rename `Event` to `EventError`, and make it satisfy the `error` interface and also implement `Unwrap() error`. Add an alias `Event` for backwards-compatibility. --- pipe/event_error.go | 22 ++++++++++++++++++++-- pipe/pipeline.go | 10 +++++----- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/pipe/event_error.go b/pipe/event_error.go index 1998b1c..1ba811b 100644 --- a/pipe/event_error.go +++ b/pipe/event_error.go @@ -1,9 +1,27 @@ package pipe -// Event represents anything that could happen during the pipeline execution -type Event struct { +import "fmt" + +// EventError represents an error that could happen during the +// pipeline execution that we might want to report as an event. +type EventError struct { Command string Msg string Err error Context map[string]interface{} } + +func (err *EventError) Error() string { + if err.Msg == "" { + return fmt.Sprintf("%s: %v", err.Command, err.Err) + } + return fmt.Sprintf("%s in stage %q: %v", err.Msg, err.Command, err.Err) +} + +func (err *EventError) Unwrap() error { + return err.Err +} + +// Event is an alias for `EventError`, for reasons of backwards +// compatibility. +type Event = EventError diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 5b1cc77..baaa933 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -64,11 +64,11 @@ type Pipeline struct { // does not guarantee that clients are using the class correctly. started uint32 - eventHandler func(e *Event) + eventHandler func(e *EventError) panicHandler StagePanicHandler } -var emptyEventHandler = func(_ *Event) {} +var emptyEventHandler = func(_ *EventError) {} type NewPipeFn func(opts ...Option) *Pipeline @@ -165,7 +165,7 @@ func WithEnvVarsFunc(valuesFunc ContextValuesFunc) Option { // WithEventHandler sets a handler for the pipeline. Setting one will emit // and event for each process. -func WithEventHandler(handler func(e *Event)) Option { +func WithEventHandler(handler func(e *EventError)) Option { return func(p *Pipeline) { p.eventHandler = handler } @@ -327,7 +327,7 @@ func (p *Pipeline) Start(ctx context.Context) error { for _, s := range p.stages[:i] { _ = s.Wait() } - p.eventHandler(&Event{ + p.eventHandler(&EventError{ Command: p.stages[i].Name(), Msg: "failed to start pipeline stage", Err: err, @@ -443,7 +443,7 @@ func (p *Pipeline) Wait() error { } if earliestStageErr != nil { - p.eventHandler(&Event{ + p.eventHandler(&EventError{ Command: earliestFailedStage.Name(), Msg: "command failed", Err: earliestStageErr, From b70f374de275cb9dde5ef11284df64c180421ec2 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sun, 14 Jun 2026 17:32:25 +0200 Subject: [PATCH 03/15] EventHandler: new type --- pipe/event_error.go | 2 ++ pipe/pipeline.go | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pipe/event_error.go b/pipe/event_error.go index 1ba811b..6d24f39 100644 --- a/pipe/event_error.go +++ b/pipe/event_error.go @@ -11,6 +11,8 @@ type EventError struct { Context map[string]interface{} } +type EventHandler func(err *EventError) + func (err *EventError) Error() string { if err.Msg == "" { return fmt.Sprintf("%s: %v", err.Command, err.Err) diff --git a/pipe/pipeline.go b/pipe/pipeline.go index baaa933..04267f2 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -64,7 +64,7 @@ type Pipeline struct { // does not guarantee that clients are using the class correctly. started uint32 - eventHandler func(e *EventError) + eventHandler EventHandler panicHandler StagePanicHandler } @@ -165,7 +165,7 @@ func WithEnvVarsFunc(valuesFunc ContextValuesFunc) Option { // WithEventHandler sets a handler for the pipeline. Setting one will emit // and event for each process. -func WithEventHandler(handler func(e *EventError)) Option { +func WithEventHandler(handler EventHandler) Option { return func(p *Pipeline) { p.eventHandler = handler } From 72fdbc6eb8df2a41325db1cc59ff6f13c4366ea3 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sun, 14 Jun 2026 17:48:07 +0200 Subject: [PATCH 04/15] Return the `EventError` directly When there is an `EventError`, return it directly rather than passing the `EventError` to the `ErrorHandler` and returning a slightly differently-formatted message as the `error` result. This slightly changes the error's messages, so adjust the tests accordingly. --- pipe/pipeline.go | 16 ++++++++-------- pipe/pipeline_test.go | 8 ++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 04267f2..0f4d33e 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -327,14 +327,13 @@ func (p *Pipeline) Start(ctx context.Context) error { for _, s := range p.stages[:i] { _ = s.Wait() } - p.eventHandler(&EventError{ + eventErr := &EventError{ Command: p.stages[i].Name(), Msg: "failed to start pipeline stage", Err: err, - }) - return fmt.Errorf( - "starting pipeline stage %q: %w", p.stages[i].Name(), err, - ) + } + p.eventHandler(eventErr) + return eventErr } // Loop over all of the stages, starting them in order. @@ -443,12 +442,13 @@ func (p *Pipeline) Wait() error { } if earliestStageErr != nil { - p.eventHandler(&EventError{ + eventErr := &EventError{ Command: earliestFailedStage.Name(), Msg: "command failed", Err: earliestStageErr, - }) - return fmt.Errorf("%s: %w", earliestFailedStage.Name(), earliestStageErr) + } + p.eventHandler(eventErr) + return eventErr } return nil diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index e956964..3d7ca86 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -377,7 +377,7 @@ func TestPipelineExit(t *testing.T) { pipe.Command("false"), pipe.Command("true"), ) - assert.EqualError(t, p.Run(ctx), "false: exit status 1") + assert.EqualError(t, p.Run(ctx), `command failed in stage "false": exit status 1`) } func TestPipelineStderr(t *testing.T) { @@ -393,7 +393,7 @@ func TestPipelineStderr(t *testing.T) { _, err = p.Output(ctx) if assert.Error(t, err) { - assert.Contains(t, err.Error(), "ls: exit status") + assert.Contains(t, err.Error(), `command failed in stage "ls": exit status`) } } @@ -450,7 +450,7 @@ func TestLittleEPIPE(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() err := p.Run(ctx) - assert.EqualError(t, err, "sh: signal: broken pipe") + assert.EqualError(t, err, `command failed in stage "sh": signal: broken pipe`) } // Verify the correct error if one command in the pipeline exits @@ -469,7 +469,7 @@ func TestBigEPIPE(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() err := p.Run(ctx) - assert.EqualError(t, err, "seq: signal: broken pipe") + assert.EqualError(t, err, `command failed in stage "seq": signal: broken pipe`) } // Verify the correct error if one command in the pipeline exits From ef1c45cbe48134651ae4c56f2e9a0d1388b89f9f Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Mon, 15 Jun 2026 13:50:16 +0200 Subject: [PATCH 05/15] options.go: move the `Option` definitions to their own file --- pipe/options.go | 117 +++++++++++++++++++++++++++++++++++++++++++++++ pipe/pipeline.go | 102 ----------------------------------------- 2 files changed, 117 insertions(+), 102 deletions(-) create mode 100644 pipe/options.go diff --git a/pipe/options.go b/pipe/options.go new file mode 100644 index 0000000..c6703fc --- /dev/null +++ b/pipe/options.go @@ -0,0 +1,117 @@ +package pipe + +import ( + "context" + "io" +) + +// Option is a functional option that can be passed to the `Pipeline` +// constructor. +type Option func(*Pipeline) + +// WithDir sets the default directory for running external commands. +func WithDir(dir string) Option { + return func(r *Pipeline) { + r.env.Dir = dir + } +} + +// WithStdin assigns stdin to the first command in the pipeline. The +// caller retains ownership of stdin; the pipeline will not close it, +// even if `Start()` returns an error. +func WithStdin(stdin io.Reader) Option { + return func(r *Pipeline) { + r.stdin = Input(stdin) + } +} + +// WithStdout assigns stdout to the last command in the pipeline. The +// caller retains ownership of stdout; the pipeline will not close it, +// even if `Start()` returns an error. +func WithStdout(stdout io.Writer) Option { + return func(r *Pipeline) { + r.stdout = Output(stdout) + } +} + +// WithStdoutCloser assigns stdout to the last command in the +// pipeline, and closes stdout when the pipeline is done with it. The +// pipeline is responsible for closing stdout even if `Start()` returns +// an error. +func WithStdoutCloser(stdout io.WriteCloser) Option { + return func(r *Pipeline) { + r.stdout = ClosingOutput(stdout) + } +} + +// WithEnvVar appends an environment variable for the pipeline. +func WithEnvVar(key, value string) Option { + return func(r *Pipeline) { + r.env.Vars = append(r.env.Vars, func(_ context.Context, vars []EnvVar) []EnvVar { + return append(vars, EnvVar{Key: key, Value: value}) + }) + } +} + +// WithEnvVars appends several environment variable for the pipeline. +func WithEnvVars(b []EnvVar) Option { + return func(r *Pipeline) { + r.env.Vars = append(r.env.Vars, func(_ context.Context, a []EnvVar) []EnvVar { + return append(a, b...) + }) + } +} + +type ContextValueFunc func(context.Context) (string, bool) + +// WithEnvVarFunc appends a context-based environment variable for a +// runner. +func WithEnvVarFunc(key string, valueFunc ContextValueFunc) Option { + return func(r *Pipeline) { + r.env.Vars = append( + r.env.Vars, + func(ctx context.Context, vars []EnvVar) []EnvVar { + if val, ok := valueFunc(ctx); ok { + return append(vars, EnvVar{Key: key, Value: val}) + } + return vars + }, + ) + } +} + +type ContextValuesFunc func(context.Context) []EnvVar + +// WithEnvVarsFunc appends several context-based environment variables +// for a runner. +func WithEnvVarsFunc(valuesFunc ContextValuesFunc) Option { + return func(r *Pipeline) { + r.env.Vars = append( + r.env.Vars, + func(ctx context.Context, vars []EnvVar) []EnvVar { + return append(vars, valuesFunc(ctx)...) + }, + ) + } +} + +// WithEventHandler sets a handler for the pipeline. Setting one will emit +// and event for each process. +func WithEventHandler(handler EventHandler) Option { + return func(r *Pipeline) { + r.eventHandler = handler + } +} + +// WithStagePanicHandler sets a panic handler for the stages within a pipeline. +// When a pipeline stage panics, the provided handler will be invoked, allowing +// the client to handle the panic in whatever way they see fit. +// +// Note: +// - The client is responsible for deciding whether to recover from the panic or panicking again. +// - If a panic handler is not set, the panic will be propagated normally. +func WithStagePanicHandler(ph StagePanicHandler) Option { + return func(r *Pipeline) { + r.panicHandler = ph + } +} diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 0f4d33e..911240c 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -44,10 +44,6 @@ type EnvVar struct { Value string } -type ContextValueFunc func(context.Context) (string, bool) - -type ContextValuesFunc func(context.Context) []EnvVar - // Pipeline represents a Unix-like pipe that can include multiple // stages, including external processes but also and stages written in // Go. @@ -86,104 +82,6 @@ func New(options ...Option) *Pipeline { return p } -// Option is a type alias for Pipeline functional options. -type Option func(*Pipeline) - -// WithDir sets the default directory for running external commands. -func WithDir(dir string) Option { - return func(p *Pipeline) { - p.env.Dir = dir - } -} - -// WithStdin assigns stdin to the first command in the pipeline. The -// caller retains ownership of stdin; the pipeline will not close it, -// even if `Start()` returns an error. -func WithStdin(stdin io.Reader) Option { - return func(p *Pipeline) { - p.stdin = Input(stdin) - } -} - -// WithStdout assigns stdout to the last command in the pipeline. The -// caller retains ownership of stdout; the pipeline will not close it, -// even if `Start()` returns an error. -func WithStdout(stdout io.Writer) Option { - return func(p *Pipeline) { - p.stdout = Output(stdout) - } -} - -// WithStdoutCloser assigns stdout to the last command in the -// pipeline, and closes stdout when the pipeline is done with it. The -// pipeline is responsible for closing stdout even if `Start()` returns -// an error. -func WithStdoutCloser(stdout io.WriteCloser) Option { - return func(p *Pipeline) { - p.stdout = ClosingOutput(stdout) - } -} - -// WithEnvVar appends an environment variable for the pipeline. -func WithEnvVar(key, value string) Option { - return func(p *Pipeline) { - p.env.Vars = append(p.env.Vars, func(_ context.Context, vars []EnvVar) []EnvVar { - return append(vars, EnvVar{Key: key, Value: value}) - }) - } -} - -// WithEnvVars appends several environment variable for the pipeline. -func WithEnvVars(b []EnvVar) Option { - return func(p *Pipeline) { - p.env.Vars = append(p.env.Vars, func(_ context.Context, a []EnvVar) []EnvVar { - return append(a, b...) - }) - } -} - -// WithEnvVarFunc appends a context-based environment variable for the pipeline. -func WithEnvVarFunc(key string, valueFunc ContextValueFunc) Option { - return func(p *Pipeline) { - p.env.Vars = append(p.env.Vars, func(ctx context.Context, vars []EnvVar) []EnvVar { - if val, ok := valueFunc(ctx); ok { - return append(vars, EnvVar{Key: key, Value: val}) - } - return vars - }) - } -} - -// WithEnvVarsFunc appends several context-based environment variables for the pipeline. -func WithEnvVarsFunc(valuesFunc ContextValuesFunc) Option { - return func(p *Pipeline) { - p.env.Vars = append(p.env.Vars, func(ctx context.Context, vars []EnvVar) []EnvVar { - return append(vars, valuesFunc(ctx)...) - }) - } -} - -// WithEventHandler sets a handler for the pipeline. Setting one will emit -// and event for each process. -func WithEventHandler(handler EventHandler) Option { - return func(p *Pipeline) { - p.eventHandler = handler - } -} - -// WithStagePanicHandler sets a panic handler for the stages within a pipeline. -// When a pipeline stage panics, the provided handler will be invoked, allowing -// the client to handle the panic in whatever way they see fit. -// -// Note: -// - The client is responsible for deciding whether to recover from the panic or panicking again. -// - If a panic handler is not set, the panic will be propagated normally. -func WithStagePanicHandler(ph StagePanicHandler) Option { - return func(p *Pipeline) { - p.panicHandler = ph - } -} - func (p *Pipeline) hasStarted() bool { return atomic.LoadUint32(&p.started) != 0 } From 441be6d84ac20df1ee9ca33f8902ed51513a8ad6 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sun, 14 Jun 2026 18:09:37 +0200 Subject: [PATCH 06/15] Pipe: a new kind of `Stage` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `Pipe`, a `Stage` that itself runs multiple other stages with their stdouts and stdins piped to one another. This is a useful concept, because it allows pipelines to be created in modular form more easily. Also, now that we have this type, we can is it in the implementation of `Pipeline`. Specifically, `Pipeline` now embeds a single `Pipe`, which takes care of managing multiple stages and piping them together. This means that `Pipeline` only needs to know how to run a single stage. More changes are still to come… --- pipe/pipe.go | 319 +++++++++++++++++++++++++++++++++++++++++++++++ pipe/pipeline.go | 252 +++---------------------------------- 2 files changed, 337 insertions(+), 234 deletions(-) create mode 100644 pipe/pipe.go diff --git a/pipe/pipe.go b/pipe/pipe.go new file mode 100644 index 0000000..d69cabe --- /dev/null +++ b/pipe/pipe.go @@ -0,0 +1,319 @@ +package pipe + +import ( + "context" + "errors" + "fmt" + "io" + "sync/atomic" +) + +// Pipe is a `Stage` that consists of a bunch of other `Stage`s that +// are piped together and run in parallel. +type Pipe struct { + name string + + stages []Stage + + cancel func() + + // Atomically written and read value, set if the pipe has been + // started. This is only used for lifecycle sanity checks but does + // not guarantee that clients are using the class correctly. + started atomic.Bool +} + +var ( + _ Stage = (*Pipe)(nil) +) + +// NewPipe returns an initialized `*Pipe` with the specified `name`. +func NewPipe(name string) *Pipe { + return &Pipe{ + name: name, + } +} + +func (p *Pipe) Name() string { + return p.name +} + +func (p *Pipe) Requirements() StageRequirements { + if len(p.stages) == 0 { + return StageRequirements{} + } + + return StageRequirements{ + Stdin: p.stages[0].Requirements().Stdin, + Stdout: p.stages[len(p.stages)-1].Requirements().Stdout, + } +} + +// Add appends one or more stages to the pipe. +func (p *Pipe) Add(stages ...Stage) { + if p.started.Load() { + panic("attempt to modify a pipe that has already started") + } + + p.stages = append(p.stages, stages...) +} + +// AddWithIgnoredError appends one or more stages, suppressing any +// errors from those stages that match `em`. +func (p *Pipe) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { + if p.started.Load() { + panic("attempt to modify a pipe that has already started") + } + + for _, stage := range stages { + p.stages = append(p.stages, IgnoreError(stage, em)) + } +} + +// Start starts the commands in the pipe. If `Start()` exits without +// an error, `Wait()` must also be called, to allow all resources to +// be freed. +// +// If `Start()` returns an error, `Wait()` must not be called. Before +// returning an error, `Start()` cancels and waits for any stages that +// were started, closes any inter-stage pipes that the pipe owns, and +// closes stdout if it was supplied with `WithStdoutCloser()`. Streams +// supplied with `WithStdin()` or `WithStdout()` remain owned by the +// caller and are not closed by the pipe. +func (p *Pipe) Start( + ctx context.Context, opts StageOptions, + stdin *InputStream, stdout *OutputStream, +) error { + if !p.started.CompareAndSwap(false, true) { + panic("attempt to start a pipe that has already started") + } + + // We might need to cancel sub-stages if not all of them start up + // correctly: + ctx, p.cancel = context.WithCancel(ctx) + + if len(p.stages) == 0 { + // This is pretty pointless, but handle it by copying stdin to + // stdout. + + if stdin == nil || stdout == nil { + // If `stdin` and `stout` were not both provided, then + // there's nothing to do except close the other one if it + // was provided: + return errors.Join( + stdin.Close(), + stdout.Close(), + ) + } + + // Add a stage to do the copying, then proceed as usual: + p.stages = append(p.stages, Function( + "identity", + func(_ context.Context, _ Env, stdin io.Reader, stdout io.Writer) error { + if stdin == nil { + return nil + } + _, err := io.Copy(stdout, stdin) + return err + }, + )) + } + + // We need to decide how to start the stages, especially what + // pipes to use to connect adjacent stages (`os.Pipe()` vs. + // `io.Pipe()`) based on the two stages' requirements. + stageJoiners := make([]stageJoiner, len(p.stages)+1) + + // Arrange for the input of the 0th stage to come from `stdin`: + stageJoiners[0].nextStdin = stdin + + // Arrange for the output of the last stage to go to `stdout`: + stageJoiners[len(p.stages)].prevStdout = stdout + + // closePipes closes all of the streams that are currently stored + // in the joiners. This should be called if startup fails. As we + // call `Stage.Start()` and pass that method streams, we clear + // them from the corresponding joiners to avoid closing them + // twice. + closePipes := func() { + for _, sj := range stageJoiners { + _ = sj.closePipe() + } + } + + // Store the stages in the joiners, and verify that the stages' + // requirements are well-formed: + for i, s := range p.stages { + // Make sure that the stage's requirements are well-formed: + requirements := s.Requirements() + if err := requirements.Stdin.Validate(); err != nil { + closePipes() + return fmt.Errorf("stdin: %w", err) + } + if err := requirements.Stdout.Validate(); err != nil { + closePipes() + return fmt.Errorf("stdout: %w", err) + } + + stageJoiners[i].nextStage = s + stageJoiners[i].nextStageReq = requirements + stageJoiners[i+1].prevStage = s + stageJoiners[i+1].prevStageReq = requirements + } + + // Check that each of the stages' requirements are satisfiable: + for i := range stageJoiners { + if err := stageJoiners[i].validate(); err != nil { + closePipes() + return err + } + } + + // Create the "inner" pipes (i.e, all but the first and last + // `stageJoiners`): + for i := 1; i < len(stageJoiners)-1; i++ { + if err := stageJoiners[i].createPipe(); err != nil { + closePipes() + return err + } + } + + // We're about to start up the stages, one by one. If something + // goes wrong during that process, `abort` should be called to + // kill any stages that have already been started and to close any + // pipes that have not yet been passed to a stage. `i` is the + // index of the stage that failed to start. If the stage already + // received its streams, it is responsible for closing them. + abort := func(i int, err error) error { + closePipes() + + // Kill and wait for any stages that have been started + // already to finish: + p.cancel() + for _, s := range p.stages[:i] { + _ = s.Wait() + } + return &EventError{ + Command: p.stages[i].Name(), + Msg: "failed to start pipe stage", + Err: err, + } + } + + // Loop over all of the stages, starting them in order. + for i, s := range p.stages { + prevSJ := &stageJoiners[i] + nextSJ := &stageJoiners[i+1] + + err := s.Start(ctx, opts, prevSJ.nextStdin, nextSJ.prevStdout) + + // Even if that stage failed to start, we are no longer + // responsible for closing its streams: + prevSJ.nextStdin = nil + nextSJ.prevStdout = nil + + if err != nil { + return abort(i, err) + } + } + + return nil +} + +// Wait waits for each stage in the pipe to exit. +func (p *Pipe) Wait() error { + if !p.started.Load() { + panic("unable to wait on a pipe that has not started") + } + + // Make sure that all of the cleanup eventually happens: + defer p.cancel() + + var earliestStageErr error + var earliestFailedStage Stage + + finishedEarly := false + for i := len(p.stages) - 1; i >= 0; i-- { + s := p.stages[i] + err := s.Wait() + + // Handle errors: + switch { + case err == nil: + // No error to handle. But unset the `finishedEarly` flag, + // because earlier stages shouldn't be affected by the + // later stage that finished early. + finishedEarly = false + continue + + case errors.Is(err, FinishEarly): + // Note `FinishEarly` errors, because that is how a stage + // informs us that it intentionally finished early. + // Moreover, if we see a `FinishEarly` error, ignore any + // pipe error from the immediately preceding stage, + // because it probably came from trying to write to this + // stage after this stage closed its stdin. + finishedEarly = true + continue + + case IsPipeError(err): + switch { + case finishedEarly: + // A successor stage finished early. It is common for + // this to cause earlier stages to fail with pipe + // errors. Such errors are uninteresting, so ignore + // them. Leave the `finishedEarly` flag set, because + // the preceding stage might get a pipe error from + // trying to write to this one. + case earliestStageErr != nil: + // A later stage has already reported an error. This + // means that we don't want to report the error from + // this stage: + // + // * If the later error was also a pipe error: we want + // to report the _last_ pipe error seen, which would + // be the one already recorded. + // + // * If the later error was not a pipe error: non-pipe + // errors are always considered more important than + // pipe errors, so again we would want to keep the + // error that is already recorded. + default: + // In this case, the pipe error from this stage is the + // most important error that we have seen so far, so + // remember it: + earliestFailedStage, earliestStageErr = s, err + } + + default: + // This stage exited with a non-pipe error. If multiple + // stages exited with such errors, we want to report the + // one that is most informative. We take that to be the + // error from the earliest failing stage. Since we are + // iterating through stages in reverse order, overwrite + // any existing remembered errors (which would have come + // from a later stage): + earliestFailedStage, earliestStageErr = s, err + finishedEarly = false + } + } + + if earliestStageErr != nil { + return &EventError{ + Command: earliestFailedStage.Name(), + Msg: "command failed", + Err: earliestStageErr, + } + } + + if finishedEarly { + // The earliest stage finished early, so report that as our + // final error. This might cause pipe errors from earlier + // stages to get suppressed. If this is the top-level stage, + // then this `FinishEarly` error will get ignored by + // `Pipeline.Wait()`. + return FinishEarly + } + + return nil +} diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 911240c..c201ba2 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -4,9 +4,6 @@ import ( "bytes" "context" "errors" - "fmt" - "io" - "sync/atomic" ) // Env represents the environment that a pipeline stage should run in. @@ -52,13 +49,8 @@ type Pipeline struct { stdin *InputStream stdout *OutputStream - stages []Stage - cancel func() - // Atomically written and read value, nonzero if the pipeline has - // been started. This is only used for lifecycle sanity checks but - // does not guarantee that clients are using the class correctly. - started uint32 + p *Pipe eventHandler EventHandler panicHandler StagePanicHandler @@ -72,6 +64,7 @@ type NewPipeFn func(opts ...Option) *Pipeline // applied. func New(options ...Option) *Pipeline { p := &Pipeline{ + p: NewPipe(""), eventHandler: emptyEventHandler, } @@ -82,29 +75,15 @@ func New(options ...Option) *Pipeline { return p } -func (p *Pipeline) hasStarted() bool { - return atomic.LoadUint32(&p.started) != 0 -} - // Add appends one or more stages to the pipeline. func (p *Pipeline) Add(stages ...Stage) { - if p.hasStarted() { - panic("attempt to modify a pipeline that has already started") - } - - p.stages = append(p.stages, stages...) + p.p.Add(stages...) } // AddWithIgnoredError appends one or more stages that are ignoring // the passed in error to the pipeline. func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { - if p.hasStarted() { - panic("attempt to modify a pipeline that has already started") - } - - for _, stage := range stages { - p.stages = append(p.stages, IgnoreError(stage, em)) - } + p.p.AddWithIgnoredError(em, stages...) } func (p *Pipeline) stageOptions() StageOptions { @@ -122,136 +101,7 @@ func (p *Pipeline) stageOptions() StageOptions { // Streams supplied with `WithStdin()` or `WithStdout()` remain owned by // the caller and are not closed by the pipeline. func (p *Pipeline) Start(ctx context.Context) error { - if p.hasStarted() { - panic("attempt to start a pipeline that has already started") - } - - atomic.StoreUint32(&p.started, 1) - ctx, p.cancel = context.WithCancel(ctx) - - if len(p.stages) == 0 { - if p.stdout == nil { - // No stages and no destination: there is nothing to do - // and nowhere to put `p.stdin` even if it was set. - return nil - } - // No stages but a destination was configured: synthesize an - // identity-copy stage so that `WithStdin()` is drained into - // `WithStdout()`/`WithStdoutCloser()` and the destination - // closer (if any) is invoked. - p.stages = append(p.stages, Function( - "identity", - func(_ context.Context, _ Env, stdin io.Reader, stdout io.Writer) error { - if stdin == nil { - return nil - } - _, err := io.Copy(stdout, stdin) - return err - }, - )) - } - - // We need to decide how to start the stages, especially what - // pipes to use to connect adjacent stages (`os.Pipe()` vs. - // `io.Pipe()`) based on the two stages' requirements. - stageJoiners := make([]stageJoiner, len(p.stages)+1) - - // Arrange for the input of the 0th stage to come from `p.stdin`: - stageJoiners[0].nextStdin = p.stdin - - // Arrange for the output of the last stage to go to `p.stdout`: - stageJoiners[len(p.stages)].prevStdout = p.stdout - - // closePipes closes all of the streams that are currently stored - // in the joiners. This should be called if startup fails. As we - // call `Stage.Start()` and pass that method streams, we clear - // them from the corresponding joiners to avoid closing them - // twice. - closePipes := func() { - for _, sj := range stageJoiners { - _ = sj.closePipe() - } - } - - // Store the stages in the joiners, and verify that the stages' - // requirements are well-formed: - for i, s := range p.stages { - // Make sure that the stage's requirements are well-formed: - requirements := s.Requirements() - if err := requirements.Stdin.Validate(); err != nil { - closePipes() - return fmt.Errorf("stdin: %w", err) - } - if err := requirements.Stdout.Validate(); err != nil { - closePipes() - return fmt.Errorf("stdout: %w", err) - } - - stageJoiners[i].nextStage = s - stageJoiners[i].nextStageReq = requirements - stageJoiners[i+1].prevStage = s - stageJoiners[i+1].prevStageReq = requirements - } - - // Check that each of the stages' requirements are satisfiable: - for i := range stageJoiners { - if err := stageJoiners[i].validate(); err != nil { - closePipes() - return err - } - } - - // Create the "inner" pipes (i.e, all but the first and last - // `stageJoiners`): - for i := 1; i < len(stageJoiners)-1; i++ { - if err := stageJoiners[i].createPipe(); err != nil { - closePipes() - return err - } - } - - // We're about to start up the stages, one by one. If something - // goes wrong during that process, this function should be called - // to kill any stages that have already been started and to close - // any pipes that have not yet been passed to a stage. `i` is the - // index of the stage that failed to start. If the stage already - // received its streams, it is responsible for closing them. - abort := func(i int, err error) error { - closePipes() - - // Kill and wait for any stages that have been started - // already to finish: - p.cancel() - for _, s := range p.stages[:i] { - _ = s.Wait() - } - eventErr := &EventError{ - Command: p.stages[i].Name(), - Msg: "failed to start pipeline stage", - Err: err, - } - p.eventHandler(eventErr) - return eventErr - } - - // Loop over all of the stages, starting them in order. - for i, s := range p.stages { - prevSJ := &stageJoiners[i] - nextSJ := &stageJoiners[i+1] - - err := s.Start(ctx, p.stageOptions(), prevSJ.nextStdin, nextSJ.prevStdout) - - // Even if that stage failed to start, we are no longer - // responsible for closing its streams: - prevSJ.nextStdin = nil - nextSJ.prevStdout = nil - - if err != nil { - return abort(i, err) - } - } - - return nil + return p.p.Start(ctx, p.stageOptions(), p.stdin, p.stdout) } func (p *Pipeline) Output(ctx context.Context) ([]byte, error) { @@ -263,90 +113,24 @@ func (p *Pipeline) Output(ctx context.Context) ([]byte, error) { // Wait waits for each stage in the pipeline to exit. func (p *Pipeline) Wait() error { - if !p.hasStarted() { - panic("unable to wait on a pipeline that has not started") - } - - // Make sure that all of the cleanup eventually happens: - defer p.cancel() + err := p.p.Wait() - var earliestStageErr error - var earliestFailedStage Stage + // Handle errors: + switch { + case err == nil: + // No error to handle. - finishedEarly := false - for i := len(p.stages) - 1; i >= 0; i-- { - s := p.stages[i] - err := s.Wait() + case errors.Is(err, FinishEarly): + // We ignore `FinishEarly` errors because that is how a + // stage informs us that it intentionally finished early. - // Handle errors: - switch { - case err == nil: - // No error to handle. But unset the `finishedEarly` flag, - // because earlier stages shouldn't be affected by the - // later stage that finished early. - finishedEarly = false - continue - - case errors.Is(err, FinishEarly): - // We ignore `FinishEarly` errors because that is how a - // stage informs us that it intentionally finished early. - // Moreover, if we see a `FinishEarly` error, ignore any - // pipe error from the immediately preceding stage, - // because it probably came from trying to write to this - // stage after this stage closed its stdin. - finishedEarly = true - continue - - case IsPipeError(err): - switch { - case finishedEarly: - // A successor stage finished early. It is common for - // this to cause earlier stages to fail with pipe - // errors. Such errors are uninteresting, so ignore - // them. Leave the `finishedEarly` flag set, because - // the preceding stage might get a pipe error from - // trying to write to this one. - case earliestStageErr != nil: - // A later stage has already reported an error. This - // means that we don't want to report the error from - // this stage: - // - // * If the later error was also a pipe error: we want - // to report the _last_ pipe error seen, which would - // be the one already recorded. - // - // * If the later error was not a pipe error: non-pipe - // errors are always considered more important than - // pipe errors, so again we would want to keep the - // error that is already recorded. - default: - // In this case, the pipe error from this stage is the - // most important error that we have seen so far, so - // remember it: - earliestFailedStage, earliestStageErr = s, err - } - - default: - // This stage exited with a non-pipe error. If multiple - // stages exited with such errors, we want to report the - // one that is most informative. We take that to be the - // error from the earliest failing stage. Since we are - // iterating through stages in reverse order, overwrite - // any existing remembered errors (which would have come - // from a later stage): - earliestFailedStage, earliestStageErr = s, err - finishedEarly = false + default: + var eventErr *EventError + if errors.As(err, &eventErr) { + p.eventHandler(eventErr) } - } - if earliestStageErr != nil { - eventErr := &EventError{ - Command: earliestFailedStage.Name(), - Msg: "command failed", - Err: earliestStageErr, - } - p.eventHandler(eventErr) - return eventErr + return err } return nil From 4e8a4059311c34391026ee2b9afdaf1d108fd14b Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sun, 14 Jun 2026 22:06:14 +0200 Subject: [PATCH 07/15] runner: new type `runner` is an object that can run a single `Stage` of any type. Use `runner` in the implementation of `Pipeline`. Now `Pipeline` is just a thin wrapper around `Pipe` and `runner`. --- pipe/env_stage_test.go | 4 +- pipe/options.go | 70 +++++++++++----------- pipe/pipeline.go | 124 ++------------------------------------ pipe/runner.go | 131 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 175 insertions(+), 154 deletions(-) create mode 100644 pipe/runner.go diff --git a/pipe/env_stage_test.go b/pipe/env_stage_test.go index e33c6b7..e44dd49 100644 --- a/pipe/env_stage_test.go +++ b/pipe/env_stage_test.go @@ -112,8 +112,8 @@ func TestWithExtraEnvDoesNotShareVarsBackingArray(t *testing.T) { }) } - p := New(func(p *Pipeline) { - p.env.Vars = baseVars + p := New(func(r *runner) { + r.env.Vars = baseVars }) p.Add( WithExtraEnv( diff --git a/pipe/options.go b/pipe/options.go index c6703fc..111abe0 100644 --- a/pipe/options.go +++ b/pipe/options.go @@ -5,57 +5,58 @@ import ( "io" ) -// Option is a functional option that can be passed to the `Pipeline` +// Option is a functional option that can be passed to the `Runner` // constructor. -type Option func(*Pipeline) +type Option func(*runner) // WithDir sets the default directory for running external commands. func WithDir(dir string) Option { - return func(r *Pipeline) { + return func(r *runner) { r.env.Dir = dir } } -// WithStdin assigns stdin to the first command in the pipeline. The -// caller retains ownership of stdin; the pipeline will not close it, -// even if `Start()` returns an error. +// WithStdin assigns stdin for the runner. The caller retains +// ownership of stdin; the runner will not close it, even if `Start()` +// returns an error. func WithStdin(stdin io.Reader) Option { - return func(r *Pipeline) { + return func(r *runner) { r.stdin = Input(stdin) } } -// WithStdout assigns stdout to the last command in the pipeline. The -// caller retains ownership of stdout; the pipeline will not close it, -// even if `Start()` returns an error. +// WithStdout assigns stdout for the runner. The caller retains +// ownership of stdout; the runner will not close it, even if +// `Start()` returns an error. func WithStdout(stdout io.Writer) Option { - return func(r *Pipeline) { + return func(r *runner) { r.stdout = Output(stdout) } } -// WithStdoutCloser assigns stdout to the last command in the -// pipeline, and closes stdout when the pipeline is done with it. The -// pipeline is responsible for closing stdout even if `Start()` returns -// an error. +// WithStdoutCloser assigns stdout for the runner, arranging to close +// stdout when the runner is done with it. The runner is responsible +// for closing stdout even if `Start()` returns an error. func WithStdoutCloser(stdout io.WriteCloser) Option { - return func(r *Pipeline) { + return func(r *runner) { r.stdout = ClosingOutput(stdout) } } -// WithEnvVar appends an environment variable for the pipeline. +// WithEnvVar appends an environment variable for commands run within +// the runner. func WithEnvVar(key, value string) Option { - return func(r *Pipeline) { + return func(r *runner) { r.env.Vars = append(r.env.Vars, func(_ context.Context, vars []EnvVar) []EnvVar { return append(vars, EnvVar{Key: key, Value: value}) }) } } -// WithEnvVars appends several environment variable for the pipeline. +// WithEnvVars appends several environment variable for commands run +// within the runner. func WithEnvVars(b []EnvVar) Option { - return func(r *Pipeline) { + return func(r *runner) { r.env.Vars = append(r.env.Vars, func(_ context.Context, a []EnvVar) []EnvVar { return append(a, b...) }) @@ -64,10 +65,10 @@ func WithEnvVars(b []EnvVar) Option { type ContextValueFunc func(context.Context) (string, bool) -// WithEnvVarFunc appends a context-based environment variable for a -// runner. +// WithEnvVarFunc appends a context-based environment variable to be +// passed to commands run within the runner. func WithEnvVarFunc(key string, valueFunc ContextValueFunc) Option { - return func(r *Pipeline) { + return func(r *runner) { r.env.Vars = append( r.env.Vars, func(ctx context.Context, vars []EnvVar) []EnvVar { @@ -85,7 +86,7 @@ type ContextValuesFunc func(context.Context) []EnvVar // WithEnvVarsFunc appends several context-based environment variables // for a runner. func WithEnvVarsFunc(valuesFunc ContextValuesFunc) Option { - return func(r *Pipeline) { + return func(r *runner) { r.env.Vars = append( r.env.Vars, func(ctx context.Context, vars []EnvVar) []EnvVar { @@ -95,23 +96,26 @@ func WithEnvVarsFunc(valuesFunc ContextValuesFunc) Option { } } -// WithEventHandler sets a handler for the pipeline. Setting one will emit -// and event for each process. +// WithEventHandler sets an event handler for the runner. Setting one +// will emit an event for each process. func WithEventHandler(handler EventHandler) Option { - return func(r *Pipeline) { + return func(r *runner) { r.eventHandler = handler } } -// WithStagePanicHandler sets a panic handler for the stages within a pipeline. -// When a pipeline stage panics, the provided handler will be invoked, allowing -// the client to handle the panic in whatever way they see fit. +// WithStagePanicHandler sets a panic handler for the runner. When the +// code being run panics, the provided handler will be invoked, +// allowing the client to handle the panic in whatever way they see +// fit. // // Note: -// - The client is responsible for deciding whether to recover from the panic or panicking again. -// - If a panic handler is not set, the panic will be propagated normally. +// - The client is responsible for deciding whether to recover from +// the panic or panicking again. +// - If a panic handler is not set, the panic will be propagated +// normally. func WithStagePanicHandler(ph StagePanicHandler) Option { - return func(r *Pipeline) { + return func(r *runner) { r.panicHandler = ph } } diff --git a/pipe/pipeline.go b/pipe/pipeline.go index c201ba2..f1936d6 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -1,78 +1,25 @@ package pipe -import ( - "bytes" - "context" - "errors" -) - -// Env represents the environment that a pipeline stage should run in. -// It is passed to `Stage.Start()`. -type Env struct { - // The directory in which external commands should be executed by - // default. - Dir string - - // Vars are extra environment variables. These will override any - // environment variables that would be inherited from the current - // process. - Vars []AppendVars -} - -// FinishEarly is an error that can be returned by a `Stage` to -// request that the iteration be ended early (possibly without reading -// all of its input). This "error" is considered a successful return, -// and is not reported to the caller. -// -//revive:disable:error-naming -//nolint:staticcheck // ST1012: FinishEarly is the intentional name for this sentinel error -var FinishEarly = errors.New("finish stage early") - -//revive:enable:error-naming - -type AppendVars func(context.Context, []EnvVar) []EnvVar - -// EnvVar represents an environment variable that will be provided to any child -// process spawned in this pipeline. -type EnvVar struct { - // The name of the environment variable. - Key string - // The value. - Value string -} - // Pipeline represents a Unix-like pipe that can include multiple // stages, including external processes but also and stages written in // Go. type Pipeline struct { - env Env - - stdin *InputStream - stdout *OutputStream + *runner p *Pipe - - eventHandler EventHandler - panicHandler StagePanicHandler } -var emptyEventHandler = func(_ *EventError) {} - type NewPipeFn func(opts ...Option) *Pipeline // NewPipeline returns a Pipeline struct with all of the `options` // applied. func New(options ...Option) *Pipeline { - p := &Pipeline{ - p: NewPipe(""), - eventHandler: emptyEventHandler, - } + p := NewPipe("") - for _, option := range options { - option(p) + return &Pipeline{ + runner: newRunner(p, options...), + p: p, } - - return p } // Add appends one or more stages to the pipeline. @@ -85,64 +32,3 @@ func (p *Pipeline) Add(stages ...Stage) { func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { p.p.AddWithIgnoredError(em, stages...) } - -func (p *Pipeline) stageOptions() StageOptions { - return StageOptions{Env: p.env, PanicHandler: p.panicHandler} -} - -// Start starts the commands in the pipeline. If `Start()` exits -// without an error, `Wait()` must also be called, to allow all -// resources to be freed. -// -// If `Start()` returns an error, `Wait()` must not be called. Before -// returning an error, `Start()` cancels and waits for any stages that -// were started, closes any inter-stage pipes that the pipeline owns, -// and closes stdout if it was supplied with `WithStdoutCloser()`. -// Streams supplied with `WithStdin()` or `WithStdout()` remain owned by -// the caller and are not closed by the pipeline. -func (p *Pipeline) Start(ctx context.Context) error { - return p.p.Start(ctx, p.stageOptions(), p.stdin, p.stdout) -} - -func (p *Pipeline) Output(ctx context.Context) ([]byte, error) { - var buf bytes.Buffer - p.stdout = Output(&buf) - err := p.Run(ctx) - return buf.Bytes(), err -} - -// Wait waits for each stage in the pipeline to exit. -func (p *Pipeline) Wait() error { - err := p.p.Wait() - - // Handle errors: - switch { - case err == nil: - // No error to handle. - - case errors.Is(err, FinishEarly): - // We ignore `FinishEarly` errors because that is how a - // stage informs us that it intentionally finished early. - - default: - var eventErr *EventError - if errors.As(err, &eventErr) { - p.eventHandler(eventErr) - } - - return err - } - - return nil -} - -// Run starts and waits for the commands in the pipeline. If startup -// fails, it returns the `Start()` error after `Start()` has performed -// its failure cleanup. -func (p *Pipeline) Run(ctx context.Context) error { - if err := p.Start(ctx); err != nil { - return err - } - - return p.Wait() -} diff --git a/pipe/runner.go b/pipe/runner.go new file mode 100644 index 0000000..269dc77 --- /dev/null +++ b/pipe/runner.go @@ -0,0 +1,131 @@ +package pipe + +import ( + "bytes" + "context" + "errors" +) + +// Env represents the environment that a pipeline stage should run in. +// It is passed to `Stage.Start()`. +type Env struct { + // The directory in which external commands should be executed by + // default. + Dir string + + // Vars are extra environment variables. These will override any + // environment variables that would be inherited from the current + // process. + Vars []AppendVars +} + +// FinishEarly is an error that can be returned by a `Stage` to +// request that the iteration be ended early (possibly without reading +// all of its input). This "error" is considered a successful return, +// and is not reported to the caller. +// +//revive:disable:error-naming +//nolint:staticcheck // ST1012: FinishEarly is the intentional name for this sentinel error +var FinishEarly = errors.New("finish stage early") + +//revive:enable:error-naming + +type AppendVars func(context.Context, []EnvVar) []EnvVar + +// EnvVar represents an environment variable that will be provided to any child +// process spawned in this pipeline. +type EnvVar struct { + // The name of the environment variable. + Key string + // The value. + Value string +} + +// runner is able to run a single `Stage`. +type runner struct { + env Env + + stdin *InputStream + stdout *OutputStream + + stage Stage + + eventHandler EventHandler + panicHandler StagePanicHandler +} + +var emptyEventHandler = func(_ *EventError) {} + +// newRunner returns a Runner with all of the `options` applied. +func newRunner(stage Stage, options ...Option) *runner { + r := &runner{ + stage: stage, + eventHandler: emptyEventHandler, + } + + for _, option := range options { + option(r) + } + + return r +} + +func (r *runner) stageOptions() StageOptions { + return StageOptions{Env: r.env, PanicHandler: r.panicHandler} +} + +// Start starts the stage. If `Start()` exits without an error, +// `Wait()` must also be called, to allow all resources to be freed. +// +// If `Start()` returns an error, `Wait()` must not be called. Before +// returning an error, `Start()` cancels and waits for any stages that +// were started, closes any inter-stage pipes that the pipeline owns, +// and closes stdout if it was supplied with `WithStdoutCloser()`. +// Streams supplied with `WithStdin()` or `WithStdout()` remain owned by +// the caller and are not closed by the pipeline. +func (r *runner) Start(ctx context.Context) error { + return r.stage.Start(ctx, r.stageOptions(), r.stdin, r.stdout) +} + +func (r *runner) Output(ctx context.Context) ([]byte, error) { + var buf bytes.Buffer + r.stdout = Output(&buf) + err := r.Run(ctx) + return buf.Bytes(), err +} + +// Wait waits for each stage in the pipeline to exit. +func (r *runner) Wait() error { + err := r.stage.Wait() + + // Handle errors: + switch { + case err == nil: + // No error to handle. + + case errors.Is(err, FinishEarly): + // We ignore `FinishEarly` errors because that is how a + // stage informs us that it intentionally finished early. + + default: + var eventErr *EventError + if errors.As(err, &eventErr) { + r.eventHandler(eventErr) + } + + return err + } + + return nil +} + +// Run starts and waits for the commands in the pipeline. If startup +// fails, it returns the `Start()` error after `Start()` has performed +// its failure cleanup. +func (r *runner) Run(ctx context.Context) error { + if err := r.Start(ctx); err != nil { + return err + } + + return r.Wait() +} From 67078d5c5e13dde5902e1607377c5d8089c0977d Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Tue, 16 Jun 2026 11:06:43 +0200 Subject: [PATCH 08/15] Config: new type, for managing pipe configurations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `Config`, a new type to hold a shared pipe configuration that can be used when creating multiple pipes. `Config` can run a `Stage` directly, in the usual ways: * Start(ctx, stage, options...) * Run(ctx, stage, options...) * Output(ctx, stage, options...) It can also create a pre-configured `Pipeline` for you: * NewPipeline(options...) But we don't want to allow just any `Option` to be included in a `Config`. For example, it makes no sense to use `WithStdin()` for a `Config`, because stdin can't be reused for multiple pipes. So a big change to make this possible is splitting up the concept of `Option` into two concepts: * `ConfigOption`: — an option that can be applied to a `Config` _or_ at start time. * `Option`: — an option that can be applied _only_ at start time. Currently these are `WithStdin()`, `WithStdout()`, and `WithStdoutCloser()`. This involves some boilerplate, but means that the client code to use a `Pipeline` doesn't have to change unless/until it wants to use the new features. --- pipe/config.go | 79 +++++++++++++++ pipe/env_stage_test.go | 10 +- pipe/options.go | 218 ++++++++++++++++++++++++++++++----------- pipe/pipeline.go | 42 +++++++- pipe/runner.go | 81 +++++++++------ 5 files changed, 333 insertions(+), 97 deletions(-) create mode 100644 pipe/config.go diff --git a/pipe/config.go b/pipe/config.go new file mode 100644 index 0000000..75470a7 --- /dev/null +++ b/pipe/config.go @@ -0,0 +1,79 @@ +package pipe + +import "context" + +// Config represents a configuration for running pipes. A config +// instance is immutable. If you want a new configuration, either use +// `NewConfig()` to create one from scratch, or call +// `cfg.WithOption()` or `cfg.WithOptions()` to derive a new +// configuration from an existing one by adding more options. `Config` +// itself implements `Option` and `ConfigOption`, so it can be used +// when constructing a `Pipeline`. Or the Pipeline can be created +// using the helper method `Config.NewPipeline()`. +type Config struct { + options sliceConfigOption +} + +func newConfig(options []ConfigOption, extraSlots int) *Config { + cfg := Config{ + options: make(sliceConfigOption, 0, len(options)+extraSlots), + } + cfg.options = append(cfg.options, options...) + return &cfg +} + +// NewConfig returns a `Config` with the specified `options`. +func NewConfig(options ...ConfigOption) *Config { + return newConfig(options, 0) +} + +// WithOption returns a new `Config` with the same options as `cfg` +// plus the additional `option`. +func (cfg *Config) WithOption(option ConfigOption) *Config { + newCfg := newConfig(cfg.options, 1) + newCfg.options = append(newCfg.options, option) + return newCfg +} + +// WithOption returns a new `Config` with the same options as `cfg` +// plus the additional `options`. +func (cfg *Config) WithOptions(options ...ConfigOption) *Config { + newCfg := newConfig(cfg.options, len(options)) + newCfg.options = append(newCfg.options, options...) + return newCfg +} + +func (cfg *Config) newRunner(stage Stage, options ...Option) *runner { + r := newRunner(stage, cfg.options) + r.applyOptions(options...) + return r +} + +func (cfg *Config) NewPipeline(options ...Option) *Pipeline { + return New(cfg.options, sliceOption(options)) +} + +// Start starts `stage` using a runner created using `cfg` plus any +// additional start options from `options`. If `Start()` exits without +// an error, the returned `Waiter` must also be called, to learn about +// any errors and to ensure that all resources are freed. See +// [runner.Start] for more information. +func (cfg *Config) Start( + ctx context.Context, stage Stage, options ...Option, +) (WaitFunc, error) { + return cfg.newRunner(stage, options...).start(ctx) +} + +// Start starts `stage` using a runner created using `cfg` plus any +// additional start options from `options`. +func (cfg *Config) Run( + ctx context.Context, stage Stage, options ...Option, +) error { + return cfg.newRunner(stage, options...).run(ctx) +} + +func (cfg *Config) Output( + ctx context.Context, stage Stage, options ...Option, +) ([]byte, error) { + return cfg.newRunner(stage, options...).output(ctx) +} diff --git a/pipe/env_stage_test.go b/pipe/env_stage_test.go index e44dd49..ccad81a 100644 --- a/pipe/env_stage_test.go +++ b/pipe/env_stage_test.go @@ -112,9 +112,13 @@ func TestWithExtraEnvDoesNotShareVarsBackingArray(t *testing.T) { }) } - p := New(func(r *runner) { - r.env.Vars = baseVars - }) + p := New( + newConfigOption( + func(r *runner) { + r.env.Vars = baseVars + }, + ), + ) p.Add( WithExtraEnv( Function("first", func(ctx context.Context, env Env, _ io.Reader, _ io.Writer) error { diff --git a/pipe/options.go b/pipe/options.go index 111abe0..101d71c 100644 --- a/pipe/options.go +++ b/pipe/options.go @@ -5,103 +5,205 @@ import ( "io" ) -// Option is a functional option that can be passed to the `Runner` -// constructor. -type Option func(*runner) +// ConfigOption is an option that can be applied to a `Config` or at +// `Start()` time. Options that are not reusable for multiple +// pipelines should implement only `StartOption`, not this interface. +type ConfigOption interface { + apply(*runner) + Option +} -// WithDir sets the default directory for running external commands. -func WithDir(dir string) Option { - return func(r *runner) { - r.env.Dir = dir +// sliceConfigOption is a `ConfigOption` based on a slice of other +// `ConfigOption`s. +type sliceConfigOption []ConfigOption + +var _ ConfigOption = (sliceConfigOption)(nil) + +func (sco sliceConfigOption) apply(r *runner) { + for _, opt := range sco { + opt.apply(r) + } +} + +func (sco sliceConfigOption) applyAtStart(r *runner) { + for _, opt := range sco { + opt.applyAtStart(r) + } +} + +// funcConfigOption implements `ConfigOption` by calling a function. +type funcConfigOption struct { + fn func(r *runner) +} + +var _ ConfigOption = funcConfigOption{} + +func (opt funcConfigOption) apply(r *runner) { + opt.fn(r) +} + +func (opt funcConfigOption) applyAtStart(r *runner) { + opt.fn(r) +} + +// newConfigOption returns a `configOption` that does its work by +// calling `fn`. +func newConfigOption(fn func(r *runner)) funcConfigOption { + return funcConfigOption{fn} +} + +// Option is an option that can be applied only at `Start()` time +// but is not suitable to be applied to a `Config` (e.g., because the +// option can only be used once, like `WithStdin()`). +type Option interface { + applyAtStart(*runner) +} + +// sliceOption is an `Option` based on a slice of other `Option`s. +type sliceOption []Option + +var _ ConfigOption = (sliceConfigOption)(nil) + +func (so sliceOption) applyAtStart(r *runner) { + for _, opt := range so { + opt.applyAtStart(r) } } +// funcOption implements `StartOption` by calling a function. +type funcOption struct { + fn func(r *runner) +} + +var _ Option = funcOption{} + +func (opt funcOption) applyAtStart(r *runner) { + opt.fn(r) +} + +// newFuncOption returns a `startOption` that does its work by +// calling `fn`. +func newFuncOption(fn func(r *runner)) funcOption { + return funcOption{fn} +} + +// WithDir sets the default directory for running external commands. +func WithDir(dir string) ConfigOption { + return newConfigOption( + func(r *runner) { + r.env.Dir = dir + }, + ) +} + // WithStdin assigns stdin for the runner. The caller retains // ownership of stdin; the runner will not close it, even if `Start()` // returns an error. func WithStdin(stdin io.Reader) Option { - return func(r *runner) { - r.stdin = Input(stdin) - } + return newFuncOption( + func(r *runner) { + r.stdin = Input(stdin) + }, + ) } // WithStdout assigns stdout for the runner. The caller retains // ownership of stdout; the runner will not close it, even if // `Start()` returns an error. func WithStdout(stdout io.Writer) Option { - return func(r *runner) { - r.stdout = Output(stdout) - } + return newFuncOption( + func(r *runner) { + r.stdout = Output(stdout) + }, + ) } // WithStdoutCloser assigns stdout for the runner, arranging to close // stdout when the runner is done with it. The runner is responsible // for closing stdout even if `Start()` returns an error. func WithStdoutCloser(stdout io.WriteCloser) Option { - return func(r *runner) { - r.stdout = ClosingOutput(stdout) - } + return newFuncOption( + func(r *runner) { + r.stdout = ClosingOutput(stdout) + }, + ) } // WithEnvVar appends an environment variable for commands run within // the runner. -func WithEnvVar(key, value string) Option { - return func(r *runner) { - r.env.Vars = append(r.env.Vars, func(_ context.Context, vars []EnvVar) []EnvVar { - return append(vars, EnvVar{Key: key, Value: value}) - }) - } +func WithEnvVar(key, value string) ConfigOption { + return newConfigOption( + func(r *runner) { + r.env.Vars = append( + r.env.Vars, + func(_ context.Context, vars []EnvVar) []EnvVar { + return append(vars, EnvVar{Key: key, Value: value}) + }, + ) + }, + ) } // WithEnvVars appends several environment variable for commands run // within the runner. -func WithEnvVars(b []EnvVar) Option { - return func(r *runner) { - r.env.Vars = append(r.env.Vars, func(_ context.Context, a []EnvVar) []EnvVar { - return append(a, b...) - }) - } +func WithEnvVars(b []EnvVar) ConfigOption { + return newConfigOption( + func(r *runner) { + r.env.Vars = append( + r.env.Vars, + func(_ context.Context, a []EnvVar) []EnvVar { + return append(a, b...) + }, + ) + }, + ) } type ContextValueFunc func(context.Context) (string, bool) // WithEnvVarFunc appends a context-based environment variable to be // passed to commands run within the runner. -func WithEnvVarFunc(key string, valueFunc ContextValueFunc) Option { - return func(r *runner) { - r.env.Vars = append( - r.env.Vars, - func(ctx context.Context, vars []EnvVar) []EnvVar { - if val, ok := valueFunc(ctx); ok { - return append(vars, EnvVar{Key: key, Value: val}) - } - return vars - }, - ) - } +func WithEnvVarFunc(key string, valueFunc ContextValueFunc) ConfigOption { + return newConfigOption( + func(r *runner) { + r.env.Vars = append( + r.env.Vars, + func(ctx context.Context, vars []EnvVar) []EnvVar { + if val, ok := valueFunc(ctx); ok { + return append(vars, EnvVar{Key: key, Value: val}) + } + return vars + }, + ) + }, + ) } type ContextValuesFunc func(context.Context) []EnvVar // WithEnvVarsFunc appends several context-based environment variables // for a runner. -func WithEnvVarsFunc(valuesFunc ContextValuesFunc) Option { - return func(r *runner) { - r.env.Vars = append( - r.env.Vars, - func(ctx context.Context, vars []EnvVar) []EnvVar { - return append(vars, valuesFunc(ctx)...) - }, - ) - } +func WithEnvVarsFunc(valuesFunc ContextValuesFunc) ConfigOption { + return newConfigOption( + func(r *runner) { + r.env.Vars = append( + r.env.Vars, + func(ctx context.Context, vars []EnvVar) []EnvVar { + return append(vars, valuesFunc(ctx)...) + }, + ) + }, + ) } // WithEventHandler sets an event handler for the runner. Setting one // will emit an event for each process. -func WithEventHandler(handler EventHandler) Option { - return func(r *runner) { - r.eventHandler = handler - } +func WithEventHandler(handler EventHandler) ConfigOption { + return newConfigOption( + func(r *runner) { + r.eventHandler = handler + }, + ) } // WithStagePanicHandler sets a panic handler for the runner. When the @@ -114,8 +216,10 @@ func WithEventHandler(handler EventHandler) Option { // the panic or panicking again. // - If a panic handler is not set, the panic will be propagated // normally. -func WithStagePanicHandler(ph StagePanicHandler) Option { - return func(r *runner) { - r.panicHandler = ph - } +func WithStagePanicHandler(ph StagePanicHandler) ConfigOption { + return newConfigOption( + func(r *runner) { + r.panicHandler = ph + }, + ) } diff --git a/pipe/pipeline.go b/pipe/pipeline.go index f1936d6..0d86810 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -1,24 +1,32 @@ package pipe +import ( + "context" +) + // Pipeline represents a Unix-like pipe that can include multiple // stages, including external processes but also and stages written in // Go. type Pipeline struct { - *runner + r *runner p *Pipe + + wait WaitFunc } type NewPipeFn func(opts ...Option) *Pipeline // NewPipeline returns a Pipeline struct with all of the `options` -// applied. +// applied. Since `Pipeline` doesn't allow external access to its +// `Runner`, it permits any `StartOption`s as options (not only +// `RunnerOption`s). func New(options ...Option) *Pipeline { p := NewPipe("") - + r := newRunner(p, options...) return &Pipeline{ - runner: newRunner(p, options...), - p: p, + r: r, + p: p, } } @@ -32,3 +40,27 @@ func (p *Pipeline) Add(stages ...Stage) { func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { p.p.AddWithIgnoredError(em, stages...) } + +func (p *Pipeline) Start(ctx context.Context, options ...Option) error { + p.r.applyOptions(options...) + wait, err := p.r.start(ctx) + if err != nil { + return err + } + p.wait = wait + return nil +} + +func (p *Pipeline) Wait() error { + return p.wait() +} + +func (p *Pipeline) Run(ctx context.Context, options ...Option) error { + p.r.applyOptions(options...) + return p.r.run(ctx) +} + +func (p *Pipeline) Output(ctx context.Context, options ...Option) ([]byte, error) { + p.r.applyOptions(options...) + return p.r.output(ctx) +} diff --git a/pipe/runner.go b/pipe/runner.go index 269dc77..aa8dacc 100644 --- a/pipe/runner.go +++ b/pipe/runner.go @@ -32,8 +32,8 @@ var FinishEarly = errors.New("finish stage early") type AppendVars func(context.Context, []EnvVar) []EnvVar -// EnvVar represents an environment variable that will be provided to any child -// process spawned in this pipeline. +// EnvVar represents an environment variable that will be provided to +// any child process. type EnvVar struct { // The name of the environment variable. Key string @@ -41,7 +41,7 @@ type EnvVar struct { Value string } -// runner is able to run a single `Stage`. +// runner runs a `Stage`. A `runner` can only be used once. type runner struct { env Env @@ -63,39 +63,50 @@ func newRunner(stage Stage, options ...Option) *runner { eventHandler: emptyEventHandler, } - for _, option := range options { - option(r) - } + r.applyOptions(options...) return r } +// applyOptions applies `options` to `r` in place (in addition to any +// options that have already been applied). +func (r *runner) applyOptions(options ...Option) { + for _, option := range options { + option.applyAtStart(r) + } +} + func (r *runner) stageOptions() StageOptions { return StageOptions{Env: r.env, PanicHandler: r.panicHandler} } -// Start starts the stage. If `Start()` exits without an error, -// `Wait()` must also be called, to allow all resources to be freed. -// -// If `Start()` returns an error, `Wait()` must not be called. Before -// returning an error, `Start()` cancels and waits for any stages that -// were started, closes any inter-stage pipes that the pipeline owns, -// and closes stdout if it was supplied with `WithStdoutCloser()`. -// Streams supplied with `WithStdin()` or `WithStdout()` remain owned by -// the caller and are not closed by the pipeline. -func (r *runner) Start(ctx context.Context) error { - return r.stage.Start(ctx, r.stageOptions(), r.stdin, r.stdout) +type WaitFunc func() error + +func nopWait(err error) WaitFunc { + return func() error { return err } } -func (r *runner) Output(ctx context.Context) ([]byte, error) { - var buf bytes.Buffer - r.stdout = Output(&buf) - err := r.Run(ctx) - return buf.Bytes(), err +// start starts the stage. If `start()` exits without an error, the +// returned `Waiter` must also be called exactly once, to learn about +// any errors and to allow all resources to be freed. +// +// If `start()` returns an error, the returned `Waiter` is a NOP that +// returns the same error. Before returning an error, `start()` +// cancels and waits for any stages that were started, closes any +// inter-stage pipes that the pipeline owns, and closes stdin/stdout +// if required. Streams that were supplied with `WithStdin()` or +// `WithStdout()` remain owned by the caller and are never closed by +// `runner`. +func (r *runner) start(ctx context.Context) (WaitFunc, error) { + if err := r.stage.Start(ctx, r.stageOptions(), r.stdin, r.stdout); err != nil { + return nopWait(err), err + } + + return r.wait, nil } -// Wait waits for each stage in the pipeline to exit. -func (r *runner) Wait() error { +// wait is the `WaitFunc` that is normally returned by `start()`. +func (r *runner) wait() error { err := r.stage.Wait() // Handle errors: @@ -119,13 +130,19 @@ func (r *runner) Wait() error { return nil } -// Run starts and waits for the commands in the pipeline. If startup -// fails, it returns the `Start()` error after `Start()` has performed -// its failure cleanup. -func (r *runner) Run(ctx context.Context) error { - if err := r.Start(ctx); err != nil { - return err - } +// run starts `stage` and waits for it to finish. +func (r *runner) run(ctx context.Context) error { + // If start returns an error, the same error is returned by + // `wait`. + wait, _ := r.start(ctx) + return wait() +} - return r.Wait() +// output starts `stage`, waits for it to finish, and collects and +// returns its stdout. +func (r *runner) output(ctx context.Context) ([]byte, error) { + var buf bytes.Buffer + r.applyOptions(WithStdout(&buf)) + err := r.run(ctx) + return buf.Bytes(), err } From 2e7209798483119ac0f733942f09eac6487528f9 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sun, 14 Jun 2026 23:25:17 +0200 Subject: [PATCH 09/15] NewPipeFn: remove type Consumers, not producers, should define interfaces. This one is not used anywhere in the library so it shouldn't be defined here. But you might notice that `Config.NewPipeline()` satisfies what used to be called the `NewPipeFn` interface. This is no accident; `Config` is meant to be a better replacement for `NewPipeFn`. --- pipe/pipeline.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 0d86810..6ce3398 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -15,8 +15,6 @@ type Pipeline struct { wait WaitFunc } -type NewPipeFn func(opts ...Option) *Pipeline - // NewPipeline returns a Pipeline struct with all of the `options` // applied. Since `Pipeline` doesn't allow external access to its // `Runner`, it permits any `StartOption`s as options (not only From ca18da99779d6566a144cb69a6995efcb1206ae6 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Tue, 16 Jun 2026 15:35:33 +0200 Subject: [PATCH 10/15] Document WithStdin blocking-reader limitation Explain the known deadlock risk for borrowed non-file stdin readers feeding a command that exits without draining stdin. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pipe/options.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pipe/options.go b/pipe/options.go index 101d71c..30d1c38 100644 --- a/pipe/options.go +++ b/pipe/options.go @@ -99,6 +99,16 @@ func WithDir(dir string) ConfigOption { // WithStdin assigns stdin for the runner. The caller retains // ownership of stdin; the runner will not close it, even if `Start()` // returns an error. +// +// If this stdin is connected to a `Command` stage and is not an +// `*os.File`, `exec.Cmd` has to copy stdin through an internal +// goroutine, and `Cmd.Wait()` waits for that copy to finish. This is +// fine for bounded readers such as `strings.Reader` and +// `bytes.Reader`, and for `*os.File` values, which are passed to the +// command directly. But a borrowed, non-file reader that can block +// forever can also block the runner forever if the command exits +// without consuming all of its stdin. See +// `TestPipelineIOPipeStdinThatIsNeverClosed` for the known limitation. func WithStdin(stdin io.Reader) Option { return newFuncOption( func(r *runner) { From 9f867127d9b8f8b5e2759a5f8b0a06c68d6c71e4 Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Tue, 16 Jun 2026 00:45:05 +0200 Subject: [PATCH 11/15] Expose Function stream requirements Add Function options for declaring stdin and stdout stream requirements, so Function stages can participate in file-preference negotiation. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pipe/function.go | 24 ++++++++++++++++++------ pipe/pipeline_test.go | 18 ++++++++++++++++++ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/pipe/function.go b/pipe/function.go index a80947f..2970a12 100644 --- a/pipe/function.go +++ b/pipe/function.go @@ -33,20 +33,32 @@ type StageFunc func(ctx context.Context, env Env, stdin io.Reader, stdout io.Wri // FunctionOption configures a Function stage. type FunctionOption func(*goStage) +// WithStdinRequirement returns a FunctionOption declaring the stage's stdin +// requirement. +func WithStdinRequirement(requirement StreamRequirement) FunctionOption { + return func(s *goStage) { + s.requirements.Stdin = requirement + } +} + +// WithStdoutRequirement returns a FunctionOption declaring the stage's stdout +// requirement. +func WithStdoutRequirement(requirement StreamRequirement) FunctionOption { + return func(s *goStage) { + s.requirements.Stdout = requirement + } +} + // ForbidStdin returns a FunctionOption declaring that the stage must not be // connected to stdin. func ForbidStdin() FunctionOption { - return func(s *goStage) { - s.requirements.Stdin = StreamForbidden - } + return WithStdinRequirement(StreamForbidden) } // ForbidStdout returns a FunctionOption declaring that the stage must not be // connected to stdout. func ForbidStdout() FunctionOption { - return func(s *goStage) { - s.requirements.Stdout = StreamForbidden - } + return WithStdoutRequirement(StreamForbidden) } // Function returns a pipeline `Stage` that will run a `StageFunc` in diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index 3d7ca86..44ad7e9 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -951,6 +951,24 @@ func TestFunctionOptionsForbidStreams(t *testing.T) { }) } +func TestFunctionOptionsSetStreamRequirements(t *testing.T) { + t.Parallel() + + stage := pipe.Function( + "file-preferring", + func(_ context.Context, _ pipe.Env, _ io.Reader, _ io.Writer) error { + return nil + }, + pipe.WithStdinRequirement(pipe.StreamPreferFile), + pipe.WithStdoutRequirement(pipe.StreamPreferFile), + ) + + assert.Equal(t, pipe.StageRequirements{ + Stdin: pipe.StreamPreferFile, + Stdout: pipe.StreamPreferFile, + }, stage.Requirements()) +} + func TestStreamForbiddenStdin(t *testing.T) { t.Parallel() ctx := context.Background() From 98b35387a4476578164b236b6aa57af13573556d Mon Sep 17 00:00:00 2001 From: Jason Lunz Date: Tue, 16 Jun 2026 00:44:13 +0200 Subject: [PATCH 12/15] Pipe.Start(): improve errors In `Pipe.Start()`, include the offending stage name in invalid stream-requirement errors. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pipe/pipe.go | 8 ++++++-- pipe/pipeline_test.go | 15 ++++++++++++--- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/pipe/pipe.go b/pipe/pipe.go index d69cabe..6d16e5c 100644 --- a/pipe/pipe.go +++ b/pipe/pipe.go @@ -148,11 +148,15 @@ func (p *Pipe) Start( requirements := s.Requirements() if err := requirements.Stdin.Validate(); err != nil { closePipes() - return fmt.Errorf("stdin: %w", err) + return fmt.Errorf( + "stage %q has invalid stdin requirement: %w", s.Name(), err, + ) } if err := requirements.Stdout.Validate(); err != nil { closePipes() - return fmt.Errorf("stdout: %w", err) + return fmt.Errorf( + "stage %q has invalid stdout requirement: %w", s.Name(), err, + ) } stageJoiners[i].nextStage = s diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index 44ad7e9..7fe7ab5 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -1042,7 +1042,10 @@ func TestInvalidStreamRequirements(t *testing.T) { Stdin: pipe.StreamRequirement(123), }, }) - require.ErrorContains(t, p.Run(ctx), `stdin: invalid stream requirement 123`) + require.ErrorContains( + t, p.Run(ctx), + `stage "source" has invalid stdin requirement: invalid stream requirement 123`, + ) assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed") }) @@ -1056,7 +1059,10 @@ func TestInvalidStreamRequirements(t *testing.T) { Stdout: pipe.StreamRequirement(123), }, }) - require.ErrorContains(t, p.Run(ctx), `stdout: invalid stream requirement 123`) + require.ErrorContains( + t, p.Run(ctx), + `stage "sink" has invalid stdout requirement: invalid stream requirement 123`, + ) assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed") }) } @@ -1089,7 +1095,10 @@ func TestInvalidStreamRequirement(t *testing.T) { Stdin: pipe.StreamRequirement(99), }, }) - require.ErrorContains(t, p.Run(ctx), `stdin: invalid stream requirement 99`) + require.ErrorContains( + t, p.Run(ctx), + `stage "invalid" has invalid stdin requirement: invalid stream requirement 99`, + ) } func TestFunctionNoInput(t *testing.T) { From ce08d4d61a03b038a2dd04c3285a5cabcd2a1600 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Tue, 16 Jun 2026 23:24:48 +0200 Subject: [PATCH 13/15] More systematically check the states of objects before use Add a `oneUse` helper type, which makes it easy to keep track of the state of an object that must only be started once, and to add assertions about the object's current state. Add such assertions for the important `Stage`s and for `runner`. --- pipe/command.go | 13 ++++++++++--- pipe/function.go | 8 ++++++++ pipe/one_use.go | 47 +++++++++++++++++++++++++++++++++++++++++++++++ pipe/options.go | 4 ++++ pipe/pipe.go | 25 +++++++------------------ pipe/runner.go | 11 +++++++++++ 6 files changed, 87 insertions(+), 21 deletions(-) create mode 100644 pipe/one_use.go diff --git a/pipe/command.go b/pipe/command.go index 27d34c6..874f76c 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -32,6 +32,8 @@ type commandStage struct { // If the context expired, and we attempted to kill the command, // `ctx.Err()` is stored here. ctxErr atomic.Value + + oneUse oneUse } var ( @@ -65,9 +67,10 @@ func Command(command string, args ...string) Stage { // the command might emit. func CommandStage(name string, cmd *exec.Cmd) Stage { return &commandStage{ - name: name, - cmd: cmd, - done: make(chan struct{}), + name: name, + cmd: cmd, + done: make(chan struct{}), + oneUse: oneUse{thing: "command " + name}, } } @@ -90,6 +93,8 @@ func (s *commandStage) Start( ctx context.Context, opts StageOptions, stdin *InputStream, stdout *OutputStream, ) error { + s.oneUse.assertStarting("start") + r := stdin.Reader() w := stdout.Writer() @@ -307,6 +312,8 @@ func (s *commandStage) filterCmdError(err error) error { } func (s *commandStage) Wait() error { + s.oneUse.assertStarted("wait") + defer close(s.done) // Make sure that any stderr is copied before `s.cmd.Wait()` diff --git a/pipe/function.go b/pipe/function.go index 2970a12..a4c89c9 100644 --- a/pipe/function.go +++ b/pipe/function.go @@ -37,6 +37,7 @@ type FunctionOption func(*goStage) // requirement. func WithStdinRequirement(requirement StreamRequirement) FunctionOption { return func(s *goStage) { + s.oneUse.assertNotStarted("set stdin requirement") s.requirements.Stdin = requirement } } @@ -45,6 +46,7 @@ func WithStdinRequirement(requirement StreamRequirement) FunctionOption { // requirement. func WithStdoutRequirement(requirement StreamRequirement) FunctionOption { return func(s *goStage) { + s.oneUse.assertNotStarted("set stdout requirement") s.requirements.Stdout = requirement } } @@ -70,6 +72,7 @@ func Function(name string, f StageFunc, opts ...FunctionOption) Stage { f: f, done: make(chan struct{}), requirements: StageRequirements{}, + oneUse: oneUse{thing: "function " + name}, } for _, opt := range opts { opt(stage) @@ -85,6 +88,8 @@ type goStage struct { done chan struct{} requirements StageRequirements err error + + oneUse oneUse } var _ Stage = (*goStage)(nil) @@ -101,6 +106,8 @@ func (s *goStage) Start( ctx context.Context, opts StageOptions, stdin *InputStream, stdout *OutputStream, ) error { + s.oneUse.assertStarting("start") + r := stdin.Reader() if r == nil { // treat nil as empty input. @@ -135,6 +142,7 @@ func (s *goStage) Start( } func (s *goStage) Wait() error { + s.oneUse.assertStarted("wait") <-s.done return s.err } diff --git a/pipe/one_use.go b/pipe/one_use.go new file mode 100644 index 0000000..3ce012a --- /dev/null +++ b/pipe/one_use.go @@ -0,0 +1,47 @@ +package pipe + +import ( + "fmt" + "sync/atomic" +) + +// oneUse is a helper class that can be used to enforce consistency +// checks that some `thing` is only started once, and can also check +// that it is in the correct state (either not started yet, or already +// started). Any violations of the assertions cause panics; this is +// only meant as a sanity consistency check. +type oneUse struct { + // thing is a description of the thing that should only be started + // once. + thing string + + // started records whether the thing has been started yet. + started atomic.Bool +} + +// assertNotStarted panics if the thing has already been started. +// `action` is used in the `panic` message to describe the action that +// is being attempted. +func (ou *oneUse) assertNotStarted(action string) { + if ou.started.Load() { + panic(fmt.Sprintf("tried to %s after %s was already started", action, ou.thing)) + } +} + +// assertStarting panics if the thing has already been started, and +// otherwise records that it has now been started. `action` is used in +// the `panic` message to describe the action that is being attempted. +func (ou *oneUse) assertStarting(action string) { + if !ou.started.CompareAndSwap(false, true) { + panic(fmt.Sprintf("tried to %s after %s was already started", action, ou.thing)) + } +} + +// assertStarted panics if the thing has not yet been started. +// `action` is used in the `panic` message to describe the action that +// is being attempted. +func (ou *oneUse) assertStarted(action string) { + if !ou.started.Load() { + panic(fmt.Sprintf("tried to %s but %s hasn't been started", action, ou.thing)) + } +} diff --git a/pipe/options.go b/pipe/options.go index 30d1c38..95341ec 100644 --- a/pipe/options.go +++ b/pipe/options.go @@ -39,10 +39,12 @@ type funcConfigOption struct { var _ ConfigOption = funcConfigOption{} func (opt funcConfigOption) apply(r *runner) { + r.oneUse.assertNotStarted("apply option") opt.fn(r) } func (opt funcConfigOption) applyAtStart(r *runner) { + r.oneUse.assertNotStarted("apply option at start") opt.fn(r) } @@ -78,6 +80,8 @@ type funcOption struct { var _ Option = funcOption{} func (opt funcOption) applyAtStart(r *runner) { + r.oneUse.assertNotStarted("apply option at start") + opt.fn(r) } diff --git a/pipe/pipe.go b/pipe/pipe.go index 6d16e5c..4530be3 100644 --- a/pipe/pipe.go +++ b/pipe/pipe.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "sync/atomic" ) // Pipe is a `Stage` that consists of a bunch of other `Stage`s that @@ -17,10 +16,7 @@ type Pipe struct { cancel func() - // Atomically written and read value, set if the pipe has been - // started. This is only used for lifecycle sanity checks but does - // not guarantee that clients are using the class correctly. - started atomic.Bool + oneUse oneUse } var ( @@ -30,7 +26,8 @@ var ( // NewPipe returns an initialized `*Pipe` with the specified `name`. func NewPipe(name string) *Pipe { return &Pipe{ - name: name, + name: name, + oneUse: oneUse{thing: "Pipe"}, } } @@ -51,9 +48,7 @@ func (p *Pipe) Requirements() StageRequirements { // Add appends one or more stages to the pipe. func (p *Pipe) Add(stages ...Stage) { - if p.started.Load() { - panic("attempt to modify a pipe that has already started") - } + p.oneUse.assertNotStarted("modify a pipe") p.stages = append(p.stages, stages...) } @@ -61,9 +56,7 @@ func (p *Pipe) Add(stages ...Stage) { // AddWithIgnoredError appends one or more stages, suppressing any // errors from those stages that match `em`. func (p *Pipe) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { - if p.started.Load() { - panic("attempt to modify a pipe that has already started") - } + p.oneUse.assertNotStarted("modify a pipe") for _, stage := range stages { p.stages = append(p.stages, IgnoreError(stage, em)) @@ -84,9 +77,7 @@ func (p *Pipe) Start( ctx context.Context, opts StageOptions, stdin *InputStream, stdout *OutputStream, ) error { - if !p.started.CompareAndSwap(false, true) { - panic("attempt to start a pipe that has already started") - } + p.oneUse.assertStarting("start") // We might need to cancel sub-stages if not all of them start up // correctly: @@ -226,9 +217,7 @@ func (p *Pipe) Start( // Wait waits for each stage in the pipe to exit. func (p *Pipe) Wait() error { - if !p.started.Load() { - panic("unable to wait on a pipe that has not started") - } + p.oneUse.assertStarted("wait") // Make sure that all of the cleanup eventually happens: defer p.cancel() diff --git a/pipe/runner.go b/pipe/runner.go index aa8dacc..12e8984 100644 --- a/pipe/runner.go +++ b/pipe/runner.go @@ -52,6 +52,8 @@ type runner struct { eventHandler EventHandler panicHandler StagePanicHandler + + oneUse oneUse } var emptyEventHandler = func(_ *EventError) {} @@ -61,6 +63,7 @@ func newRunner(stage Stage, options ...Option) *runner { r := &runner{ stage: stage, eventHandler: emptyEventHandler, + oneUse: oneUse{thing: "runner for " + stage.Name()}, } r.applyOptions(options...) @@ -71,6 +74,8 @@ func newRunner(stage Stage, options ...Option) *runner { // applyOptions applies `options` to `r` in place (in addition to any // options that have already been applied). func (r *runner) applyOptions(options ...Option) { + r.oneUse.assertNotStarted("apply options") + for _, option := range options { option.applyAtStart(r) } @@ -98,6 +103,8 @@ func nopWait(err error) WaitFunc { // `WithStdout()` remain owned by the caller and are never closed by // `runner`. func (r *runner) start(ctx context.Context) (WaitFunc, error) { + r.oneUse.assertStarting("start") + if err := r.stage.Start(ctx, r.stageOptions(), r.stdin, r.stdout); err != nil { return nopWait(err), err } @@ -107,6 +114,8 @@ func (r *runner) start(ctx context.Context) (WaitFunc, error) { // wait is the `WaitFunc` that is normally returned by `start()`. func (r *runner) wait() error { + r.oneUse.assertStarted("wait") + err := r.stage.Wait() // Handle errors: @@ -141,6 +150,8 @@ func (r *runner) run(ctx context.Context) error { // output starts `stage`, waits for it to finish, and collects and // returns its stdout. func (r *runner) output(ctx context.Context) ([]byte, error) { + r.oneUse.assertNotStarted("get output") + var buf bytes.Buffer r.applyOptions(WithStdout(&buf)) err := r.run(ctx) From f1ca1c055bd29230b69619e4bc5809f0761e4da1 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Tue, 16 Jun 2026 23:54:50 +0200 Subject: [PATCH 14/15] Cancel context on failed pipeline starts Ensure that all error paths out of `Pipe.Start()` call `p.cancel()`. --- pipe/pipe.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/pipe/pipe.go b/pipe/pipe.go index 4530be3..983c1fe 100644 --- a/pipe/pipe.go +++ b/pipe/pipe.go @@ -76,13 +76,20 @@ func (p *Pipe) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { func (p *Pipe) Start( ctx context.Context, opts StageOptions, stdin *InputStream, stdout *OutputStream, -) error { +) (theErr error) { p.oneUse.assertStarting("start") // We might need to cancel sub-stages if not all of them start up // correctly: ctx, p.cancel = context.WithCancel(ctx) + // Be sure to free resources if startup isn't successful: + defer func() { + if theErr != nil { + p.cancel() + } + }() + if len(p.stages) == 0 { // This is pretty pointless, but handle it by copying stdin to // stdout. @@ -90,7 +97,10 @@ func (p *Pipe) Start( if stdin == nil || stdout == nil { // If `stdin` and `stout` were not both provided, then // there's nothing to do except close the other one if it - // was provided: + // was provided. Note that if both closes are successful, + // then as far as the caller is concerned this counts as a + // successful start, and will therefore call `Wait()`, + // which also does the right thing. return errors.Join( stdin.Close(), stdout.Close(), @@ -219,6 +229,11 @@ func (p *Pipe) Start( func (p *Pipe) Wait() error { p.oneUse.assertStarted("wait") + if len(p.stages) == 0 { + // There was nothing to do, and we did it brilliantly! + return nil + } + // Make sure that all of the cleanup eventually happens: defer p.cancel() From e1cc42c9718177f54be2fd19664e8ad7d355407b Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Wed, 17 Jun 2026 00:51:19 +0200 Subject: [PATCH 15/15] Detect redundant setting of stdin/stdout If stdin or stdout are set multiple times, or if stdout is set then `Output()` is called, report an error. (The latter is consistent with the behavior of `exec.Cmd.Output()`.) This requires the addition of a new field, `runner.configErr`, because `ConfigOption`s have no other way to report errors. --- pipe/options.go | 30 +++++++++++++++++-- pipe/pipeline.go | 2 +- pipe/pipeline_test.go | 69 +++++++++++++++++++++++++++++++++++++++++++ pipe/runner.go | 17 +++++++++++ 4 files changed, 114 insertions(+), 4 deletions(-) diff --git a/pipe/options.go b/pipe/options.go index 95341ec..10e2f9d 100644 --- a/pipe/options.go +++ b/pipe/options.go @@ -2,6 +2,8 @@ package pipe import ( "context" + "errors" + "fmt" "io" ) @@ -116,7 +118,14 @@ func WithDir(dir string) ConfigOption { func WithStdin(stdin io.Reader) Option { return newFuncOption( func(r *runner) { - r.stdin = Input(stdin) + if r.stdin != nil { + r.configErr = errors.Join( + r.configErr, + fmt.Errorf("stdin set multiple times for %q", r.stage.Name()), + ) + } else { + r.stdin = Input(stdin) + } }, ) } @@ -127,7 +136,14 @@ func WithStdin(stdin io.Reader) Option { func WithStdout(stdout io.Writer) Option { return newFuncOption( func(r *runner) { - r.stdout = Output(stdout) + if r.stdout != nil { + r.configErr = errors.Join( + r.configErr, + fmt.Errorf("stdout set multiple times for %q", r.stage.Name()), + ) + } else { + r.stdout = Output(stdout) + } }, ) } @@ -138,7 +154,15 @@ func WithStdout(stdout io.Writer) Option { func WithStdoutCloser(stdout io.WriteCloser) Option { return newFuncOption( func(r *runner) { - r.stdout = ClosingOutput(stdout) + if r.stdout != nil { + r.configErr = errors.Join( + r.configErr, + fmt.Errorf("stdout set multiple times for %q", r.stage.Name()), + ) + _ = stdout.Close() + } else { + r.stdout = ClosingOutput(stdout) + } }, ) } diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 6ce3398..a5755ef 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -20,7 +20,7 @@ type Pipeline struct { // `Runner`, it permits any `StartOption`s as options (not only // `RunnerOption`s). func New(options ...Option) *Pipeline { - p := NewPipe("") + p := NewPipe("pipeline") r := newRunner(p, options...) return &Pipeline{ r: r, diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index 7fe7ab5..f0b05f2 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -54,6 +54,75 @@ func TestPipelineEmptyOutput(t *testing.T) { } } +func TestPipelineDuplicateStdin(t *testing.T) { + t.Parallel() + ctx := context.Background() + var stdin1, stdin2 bytes.Buffer + p := pipe.New( + pipe.WithStdin(&stdin1), + pipe.WithStdin(&stdin2), + ) + + err := p.Run(ctx) + if assert.Error(t, err) { + assert.Equal(t, `stdin set multiple times for "pipeline"`, err.Error()) + } +} + +func TestPipelineDuplicateStdout(t *testing.T) { + t.Parallel() + ctx := context.Background() + stdout1 := &closeTrackingWriter{} + stdout2 := &closeTrackingWriter{} + p := pipe.New( + pipe.WithStdout(stdout1), + pipe.WithStdout(stdout2), + ) + + err := p.Run(ctx) + if assert.Error(t, err) { + assert.Equal(t, `stdout set multiple times for "pipeline"`, err.Error()) + assert.False(t, stdout1.closed, "WithStdoutCloser destination should not be closed") + assert.False(t, stdout2.closed, "WithStdoutCloser destination should not be closed") + } +} + +func TestPipelineDuplicateStdoutClosing(t *testing.T) { + t.Parallel() + ctx := context.Background() + stdout1 := &closeTrackingWriter{} + stdout2 := &closeTrackingWriter{} + p := pipe.New( + pipe.WithStdoutCloser(stdout1), + pipe.WithStdoutCloser(stdout2), + ) + + err := p.Run(ctx) + if assert.Error(t, err) { + assert.Equal(t, `stdout set multiple times for "pipeline"`, err.Error()) + assert.True(t, stdout1.closed, "WithStdoutCloser destination should be closed") + assert.True(t, stdout2.closed, "WithStdoutCloser destination should be closed") + } +} + +func TestPipelineStdoutAndOutput(t *testing.T) { + t.Parallel() + ctx := context.Background() + stdout := &closeTrackingWriter{} + p := pipe.New( + pipe.WithStdin(strings.NewReader("hello world\n")), + pipe.WithStdoutCloser(stdout), + ) + + out, err := p.Output(ctx) + if assert.Error(t, err) { + assert.Nil(t, out) + assert.Equal(t, `Output() called for "pipeline" but stdout is already set`, err.Error()) + assert.Equal(t, "", stdout.buf.String()) + assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed") + } +} + func TestPipelineEmptyWithStdoutCloser(t *testing.T) { t.Parallel() ctx := context.Background() diff --git a/pipe/runner.go b/pipe/runner.go index 12e8984..0964920 100644 --- a/pipe/runner.go +++ b/pipe/runner.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "fmt" ) // Env represents the environment that a pipeline stage should run in. @@ -53,6 +54,8 @@ type runner struct { eventHandler EventHandler panicHandler StagePanicHandler + configErr error + oneUse oneUse } @@ -105,6 +108,12 @@ func nopWait(err error) WaitFunc { func (r *runner) start(ctx context.Context) (WaitFunc, error) { r.oneUse.assertStarting("start") + if r.configErr != nil { + _ = r.stdin.Close() + _ = r.stdout.Close() + return nopWait(r.configErr), r.configErr + } + if err := r.stage.Start(ctx, r.stageOptions(), r.stdin, r.stdout); err != nil { return nopWait(err), err } @@ -152,6 +161,14 @@ func (r *runner) run(ctx context.Context) error { func (r *runner) output(ctx context.Context) ([]byte, error) { r.oneUse.assertNotStarted("get output") + if r.stdout != nil { + _ = r.stdin.Close() + _ = r.stdout.Close() + return nil, fmt.Errorf( + "Output() called for %q but stdout is already set", r.stage.Name(), + ) + } + var buf bytes.Buffer r.applyOptions(WithStdout(&buf)) err := r.run(ctx)