diff --git a/pipe/command.go b/pipe/command.go index 27d34c6..874f76c 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -32,6 +32,8 @@ type commandStage struct { // If the context expired, and we attempted to kill the command, // `ctx.Err()` is stored here. ctxErr atomic.Value + + oneUse oneUse } var ( @@ -65,9 +67,10 @@ func Command(command string, args ...string) Stage { // the command might emit. func CommandStage(name string, cmd *exec.Cmd) Stage { return &commandStage{ - name: name, - cmd: cmd, - done: make(chan struct{}), + name: name, + cmd: cmd, + done: make(chan struct{}), + oneUse: oneUse{thing: "command " + name}, } } @@ -90,6 +93,8 @@ func (s *commandStage) Start( ctx context.Context, opts StageOptions, stdin *InputStream, stdout *OutputStream, ) error { + s.oneUse.assertStarting("start") + r := stdin.Reader() w := stdout.Writer() @@ -307,6 +312,8 @@ func (s *commandStage) filterCmdError(err error) error { } func (s *commandStage) Wait() error { + s.oneUse.assertStarted("wait") + defer close(s.done) // Make sure that any stderr is copied before `s.cmd.Wait()` diff --git a/pipe/config.go b/pipe/config.go new file mode 100644 index 0000000..75470a7 --- /dev/null +++ b/pipe/config.go @@ -0,0 +1,79 @@ +package pipe + +import "context" + +// Config represents a configuration for running pipes. A config +// instance is immutable. If you want a new configuration, either use +// `NewConfig()` to create one from scratch, or call +// `cfg.WithOption()` or `cfg.WithOptions()` to derive a new +// configuration from an existing one by adding more options. `Config` +// itself implements `Option` and `ConfigOption`, so it can be used +// when constructing a `Pipeline`. Or the Pipeline can be created +// using the helper method `Config.NewPipeline()`. +type Config struct { + options sliceConfigOption +} + +func newConfig(options []ConfigOption, extraSlots int) *Config { + cfg := Config{ + options: make(sliceConfigOption, 0, len(options)+extraSlots), + } + cfg.options = append(cfg.options, options...) + return &cfg +} + +// NewConfig returns a `Config` with the specified `options`. +func NewConfig(options ...ConfigOption) *Config { + return newConfig(options, 0) +} + +// WithOption returns a new `Config` with the same options as `cfg` +// plus the additional `option`. +func (cfg *Config) WithOption(option ConfigOption) *Config { + newCfg := newConfig(cfg.options, 1) + newCfg.options = append(newCfg.options, option) + return newCfg +} + +// WithOption returns a new `Config` with the same options as `cfg` +// plus the additional `options`. +func (cfg *Config) WithOptions(options ...ConfigOption) *Config { + newCfg := newConfig(cfg.options, len(options)) + newCfg.options = append(newCfg.options, options...) + return newCfg +} + +func (cfg *Config) newRunner(stage Stage, options ...Option) *runner { + r := newRunner(stage, cfg.options) + r.applyOptions(options...) + return r +} + +func (cfg *Config) NewPipeline(options ...Option) *Pipeline { + return New(cfg.options, sliceOption(options)) +} + +// Start starts `stage` using a runner created using `cfg` plus any +// additional start options from `options`. If `Start()` exits without +// an error, the returned `Waiter` must also be called, to learn about +// any errors and to ensure that all resources are freed. See +// [runner.Start] for more information. +func (cfg *Config) Start( + ctx context.Context, stage Stage, options ...Option, +) (WaitFunc, error) { + return cfg.newRunner(stage, options...).start(ctx) +} + +// Start starts `stage` using a runner created using `cfg` plus any +// additional start options from `options`. +func (cfg *Config) Run( + ctx context.Context, stage Stage, options ...Option, +) error { + return cfg.newRunner(stage, options...).run(ctx) +} + +func (cfg *Config) Output( + ctx context.Context, stage Stage, options ...Option, +) ([]byte, error) { + return cfg.newRunner(stage, options...).output(ctx) +} diff --git a/pipe/env_stage_test.go b/pipe/env_stage_test.go index e33c6b7..ccad81a 100644 --- a/pipe/env_stage_test.go +++ b/pipe/env_stage_test.go @@ -112,9 +112,13 @@ func TestWithExtraEnvDoesNotShareVarsBackingArray(t *testing.T) { }) } - p := New(func(p *Pipeline) { - p.env.Vars = baseVars - }) + p := New( + newConfigOption( + func(r *runner) { + r.env.Vars = baseVars + }, + ), + ) p.Add( WithExtraEnv( Function("first", func(ctx context.Context, env Env, _ io.Reader, _ io.Writer) error { diff --git a/pipe/event_error.go b/pipe/event_error.go new file mode 100644 index 0000000..6d24f39 --- /dev/null +++ b/pipe/event_error.go @@ -0,0 +1,29 @@ +package pipe + +import "fmt" + +// EventError represents an error that could happen during the +// pipeline execution that we might want to report as an event. +type EventError struct { + Command string + Msg string + Err error + Context map[string]interface{} +} + +type EventHandler func(err *EventError) + +func (err *EventError) Error() string { + if err.Msg == "" { + return fmt.Sprintf("%s: %v", err.Command, err.Err) + } + return fmt.Sprintf("%s in stage %q: %v", err.Msg, err.Command, err.Err) +} + +func (err *EventError) Unwrap() error { + return err.Err +} + +// Event is an alias for `EventError`, for reasons of backwards +// compatibility. +type Event = EventError diff --git a/pipe/function.go b/pipe/function.go index a80947f..a4c89c9 100644 --- a/pipe/function.go +++ b/pipe/function.go @@ -33,20 +33,34 @@ type StageFunc func(ctx context.Context, env Env, stdin io.Reader, stdout io.Wri // FunctionOption configures a Function stage. type FunctionOption func(*goStage) +// WithStdinRequirement returns a FunctionOption declaring the stage's stdin +// requirement. +func WithStdinRequirement(requirement StreamRequirement) FunctionOption { + return func(s *goStage) { + s.oneUse.assertNotStarted("set stdin requirement") + s.requirements.Stdin = requirement + } +} + +// WithStdoutRequirement returns a FunctionOption declaring the stage's stdout +// requirement. +func WithStdoutRequirement(requirement StreamRequirement) FunctionOption { + return func(s *goStage) { + s.oneUse.assertNotStarted("set stdout requirement") + s.requirements.Stdout = requirement + } +} + // ForbidStdin returns a FunctionOption declaring that the stage must not be // connected to stdin. func ForbidStdin() FunctionOption { - return func(s *goStage) { - s.requirements.Stdin = StreamForbidden - } + return WithStdinRequirement(StreamForbidden) } // ForbidStdout returns a FunctionOption declaring that the stage must not be // connected to stdout. func ForbidStdout() FunctionOption { - return func(s *goStage) { - s.requirements.Stdout = StreamForbidden - } + return WithStdoutRequirement(StreamForbidden) } // Function returns a pipeline `Stage` that will run a `StageFunc` in @@ -58,6 +72,7 @@ func Function(name string, f StageFunc, opts ...FunctionOption) Stage { f: f, done: make(chan struct{}), requirements: StageRequirements{}, + oneUse: oneUse{thing: "function " + name}, } for _, opt := range opts { opt(stage) @@ -73,6 +88,8 @@ type goStage struct { done chan struct{} requirements StageRequirements err error + + oneUse oneUse } var _ Stage = (*goStage)(nil) @@ -89,6 +106,8 @@ func (s *goStage) Start( ctx context.Context, opts StageOptions, stdin *InputStream, stdout *OutputStream, ) error { + s.oneUse.assertStarting("start") + r := stdin.Reader() if r == nil { // treat nil as empty input. @@ -123,6 +142,7 @@ func (s *goStage) Start( } func (s *goStage) Wait() error { + s.oneUse.assertStarted("wait") <-s.done return s.err } diff --git a/pipe/one_use.go b/pipe/one_use.go new file mode 100644 index 0000000..3ce012a --- /dev/null +++ b/pipe/one_use.go @@ -0,0 +1,47 @@ +package pipe + +import ( + "fmt" + "sync/atomic" +) + +// oneUse is a helper class that can be used to enforce consistency +// checks that some `thing` is only started once, and can also check +// that it is in the correct state (either not started yet, or already +// started). Any violations of the assertions cause panics; this is +// only meant as a sanity consistency check. +type oneUse struct { + // thing is a description of the thing that should only be started + // once. + thing string + + // started records whether the thing has been started yet. + started atomic.Bool +} + +// assertNotStarted panics if the thing has already been started. +// `action` is used in the `panic` message to describe the action that +// is being attempted. +func (ou *oneUse) assertNotStarted(action string) { + if ou.started.Load() { + panic(fmt.Sprintf("tried to %s after %s was already started", action, ou.thing)) + } +} + +// assertStarting panics if the thing has already been started, and +// otherwise records that it has now been started. `action` is used in +// the `panic` message to describe the action that is being attempted. +func (ou *oneUse) assertStarting(action string) { + if !ou.started.CompareAndSwap(false, true) { + panic(fmt.Sprintf("tried to %s after %s was already started", action, ou.thing)) + } +} + +// assertStarted panics if the thing has not yet been started. +// `action` is used in the `panic` message to describe the action that +// is being attempted. +func (ou *oneUse) assertStarted(action string) { + if !ou.started.Load() { + panic(fmt.Sprintf("tried to %s but %s hasn't been started", action, ou.thing)) + } +} diff --git a/pipe/options.go b/pipe/options.go new file mode 100644 index 0000000..10e2f9d --- /dev/null +++ b/pipe/options.go @@ -0,0 +1,263 @@ +package pipe + +import ( + "context" + "errors" + "fmt" + "io" +) + +// ConfigOption is an option that can be applied to a `Config` or at +// `Start()` time. Options that are not reusable for multiple +// pipelines should implement only `StartOption`, not this interface. +type ConfigOption interface { + apply(*runner) + Option +} + +// sliceConfigOption is a `ConfigOption` based on a slice of other +// `ConfigOption`s. +type sliceConfigOption []ConfigOption + +var _ ConfigOption = (sliceConfigOption)(nil) + +func (sco sliceConfigOption) apply(r *runner) { + for _, opt := range sco { + opt.apply(r) + } +} + +func (sco sliceConfigOption) applyAtStart(r *runner) { + for _, opt := range sco { + opt.applyAtStart(r) + } +} + +// funcConfigOption implements `ConfigOption` by calling a function. +type funcConfigOption struct { + fn func(r *runner) +} + +var _ ConfigOption = funcConfigOption{} + +func (opt funcConfigOption) apply(r *runner) { + r.oneUse.assertNotStarted("apply option") + opt.fn(r) +} + +func (opt funcConfigOption) applyAtStart(r *runner) { + r.oneUse.assertNotStarted("apply option at start") + opt.fn(r) +} + +// newConfigOption returns a `configOption` that does its work by +// calling `fn`. +func newConfigOption(fn func(r *runner)) funcConfigOption { + return funcConfigOption{fn} +} + +// Option is an option that can be applied only at `Start()` time +// but is not suitable to be applied to a `Config` (e.g., because the +// option can only be used once, like `WithStdin()`). +type Option interface { + applyAtStart(*runner) +} + +// sliceOption is an `Option` based on a slice of other `Option`s. +type sliceOption []Option + +var _ ConfigOption = (sliceConfigOption)(nil) + +func (so sliceOption) applyAtStart(r *runner) { + for _, opt := range so { + opt.applyAtStart(r) + } +} + +// funcOption implements `StartOption` by calling a function. +type funcOption struct { + fn func(r *runner) +} + +var _ Option = funcOption{} + +func (opt funcOption) applyAtStart(r *runner) { + r.oneUse.assertNotStarted("apply option at start") + + opt.fn(r) +} + +// newFuncOption returns a `startOption` that does its work by +// calling `fn`. +func newFuncOption(fn func(r *runner)) funcOption { + return funcOption{fn} +} + +// WithDir sets the default directory for running external commands. +func WithDir(dir string) ConfigOption { + return newConfigOption( + func(r *runner) { + r.env.Dir = dir + }, + ) +} + +// WithStdin assigns stdin for the runner. The caller retains +// ownership of stdin; the runner will not close it, even if `Start()` +// returns an error. +// +// If this stdin is connected to a `Command` stage and is not an +// `*os.File`, `exec.Cmd` has to copy stdin through an internal +// goroutine, and `Cmd.Wait()` waits for that copy to finish. This is +// fine for bounded readers such as `strings.Reader` and +// `bytes.Reader`, and for `*os.File` values, which are passed to the +// command directly. But a borrowed, non-file reader that can block +// forever can also block the runner forever if the command exits +// without consuming all of its stdin. See +// `TestPipelineIOPipeStdinThatIsNeverClosed` for the known limitation. +func WithStdin(stdin io.Reader) Option { + return newFuncOption( + func(r *runner) { + if r.stdin != nil { + r.configErr = errors.Join( + r.configErr, + fmt.Errorf("stdin set multiple times for %q", r.stage.Name()), + ) + } else { + r.stdin = Input(stdin) + } + }, + ) +} + +// WithStdout assigns stdout for the runner. The caller retains +// ownership of stdout; the runner will not close it, even if +// `Start()` returns an error. +func WithStdout(stdout io.Writer) Option { + return newFuncOption( + func(r *runner) { + if r.stdout != nil { + r.configErr = errors.Join( + r.configErr, + fmt.Errorf("stdout set multiple times for %q", r.stage.Name()), + ) + } else { + r.stdout = Output(stdout) + } + }, + ) +} + +// WithStdoutCloser assigns stdout for the runner, arranging to close +// stdout when the runner is done with it. The runner is responsible +// for closing stdout even if `Start()` returns an error. +func WithStdoutCloser(stdout io.WriteCloser) Option { + return newFuncOption( + func(r *runner) { + if r.stdout != nil { + r.configErr = errors.Join( + r.configErr, + fmt.Errorf("stdout set multiple times for %q", r.stage.Name()), + ) + _ = stdout.Close() + } else { + r.stdout = ClosingOutput(stdout) + } + }, + ) +} + +// WithEnvVar appends an environment variable for commands run within +// the runner. +func WithEnvVar(key, value string) ConfigOption { + return newConfigOption( + func(r *runner) { + r.env.Vars = append( + r.env.Vars, + func(_ context.Context, vars []EnvVar) []EnvVar { + return append(vars, EnvVar{Key: key, Value: value}) + }, + ) + }, + ) +} + +// WithEnvVars appends several environment variable for commands run +// within the runner. +func WithEnvVars(b []EnvVar) ConfigOption { + return newConfigOption( + func(r *runner) { + r.env.Vars = append( + r.env.Vars, + func(_ context.Context, a []EnvVar) []EnvVar { + return append(a, b...) + }, + ) + }, + ) +} + +type ContextValueFunc func(context.Context) (string, bool) + +// WithEnvVarFunc appends a context-based environment variable to be +// passed to commands run within the runner. +func WithEnvVarFunc(key string, valueFunc ContextValueFunc) ConfigOption { + return newConfigOption( + func(r *runner) { + r.env.Vars = append( + r.env.Vars, + func(ctx context.Context, vars []EnvVar) []EnvVar { + if val, ok := valueFunc(ctx); ok { + return append(vars, EnvVar{Key: key, Value: val}) + } + return vars + }, + ) + }, + ) +} + +type ContextValuesFunc func(context.Context) []EnvVar + +// WithEnvVarsFunc appends several context-based environment variables +// for a runner. +func WithEnvVarsFunc(valuesFunc ContextValuesFunc) ConfigOption { + return newConfigOption( + func(r *runner) { + r.env.Vars = append( + r.env.Vars, + func(ctx context.Context, vars []EnvVar) []EnvVar { + return append(vars, valuesFunc(ctx)...) + }, + ) + }, + ) +} + +// WithEventHandler sets an event handler for the runner. Setting one +// will emit an event for each process. +func WithEventHandler(handler EventHandler) ConfigOption { + return newConfigOption( + func(r *runner) { + r.eventHandler = handler + }, + ) +} + +// WithStagePanicHandler sets a panic handler for the runner. When the +// code being run panics, the provided handler will be invoked, +// allowing the client to handle the panic in whatever way they see +// fit. +// +// Note: +// - The client is responsible for deciding whether to recover from +// the panic or panicking again. +// - If a panic handler is not set, the panic will be propagated +// normally. +func WithStagePanicHandler(ph StagePanicHandler) ConfigOption { + return newConfigOption( + func(r *runner) { + r.panicHandler = ph + }, + ) +} diff --git a/pipe/pipe.go b/pipe/pipe.go new file mode 100644 index 0000000..983c1fe --- /dev/null +++ b/pipe/pipe.go @@ -0,0 +1,327 @@ +package pipe + +import ( + "context" + "errors" + "fmt" + "io" +) + +// Pipe is a `Stage` that consists of a bunch of other `Stage`s that +// are piped together and run in parallel. +type Pipe struct { + name string + + stages []Stage + + cancel func() + + oneUse oneUse +} + +var ( + _ Stage = (*Pipe)(nil) +) + +// NewPipe returns an initialized `*Pipe` with the specified `name`. +func NewPipe(name string) *Pipe { + return &Pipe{ + name: name, + oneUse: oneUse{thing: "Pipe"}, + } +} + +func (p *Pipe) Name() string { + return p.name +} + +func (p *Pipe) Requirements() StageRequirements { + if len(p.stages) == 0 { + return StageRequirements{} + } + + return StageRequirements{ + Stdin: p.stages[0].Requirements().Stdin, + Stdout: p.stages[len(p.stages)-1].Requirements().Stdout, + } +} + +// Add appends one or more stages to the pipe. +func (p *Pipe) Add(stages ...Stage) { + p.oneUse.assertNotStarted("modify a pipe") + + p.stages = append(p.stages, stages...) +} + +// AddWithIgnoredError appends one or more stages, suppressing any +// errors from those stages that match `em`. +func (p *Pipe) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { + p.oneUse.assertNotStarted("modify a pipe") + + for _, stage := range stages { + p.stages = append(p.stages, IgnoreError(stage, em)) + } +} + +// Start starts the commands in the pipe. If `Start()` exits without +// an error, `Wait()` must also be called, to allow all resources to +// be freed. +// +// If `Start()` returns an error, `Wait()` must not be called. Before +// returning an error, `Start()` cancels and waits for any stages that +// were started, closes any inter-stage pipes that the pipe owns, and +// closes stdout if it was supplied with `WithStdoutCloser()`. Streams +// supplied with `WithStdin()` or `WithStdout()` remain owned by the +// caller and are not closed by the pipe. +func (p *Pipe) Start( + ctx context.Context, opts StageOptions, + stdin *InputStream, stdout *OutputStream, +) (theErr error) { + p.oneUse.assertStarting("start") + + // We might need to cancel sub-stages if not all of them start up + // correctly: + ctx, p.cancel = context.WithCancel(ctx) + + // Be sure to free resources if startup isn't successful: + defer func() { + if theErr != nil { + p.cancel() + } + }() + + if len(p.stages) == 0 { + // This is pretty pointless, but handle it by copying stdin to + // stdout. + + if stdin == nil || stdout == nil { + // If `stdin` and `stout` were not both provided, then + // there's nothing to do except close the other one if it + // was provided. Note that if both closes are successful, + // then as far as the caller is concerned this counts as a + // successful start, and will therefore call `Wait()`, + // which also does the right thing. + return errors.Join( + stdin.Close(), + stdout.Close(), + ) + } + + // Add a stage to do the copying, then proceed as usual: + p.stages = append(p.stages, Function( + "identity", + func(_ context.Context, _ Env, stdin io.Reader, stdout io.Writer) error { + if stdin == nil { + return nil + } + _, err := io.Copy(stdout, stdin) + return err + }, + )) + } + + // We need to decide how to start the stages, especially what + // pipes to use to connect adjacent stages (`os.Pipe()` vs. + // `io.Pipe()`) based on the two stages' requirements. + stageJoiners := make([]stageJoiner, len(p.stages)+1) + + // Arrange for the input of the 0th stage to come from `stdin`: + stageJoiners[0].nextStdin = stdin + + // Arrange for the output of the last stage to go to `stdout`: + stageJoiners[len(p.stages)].prevStdout = stdout + + // closePipes closes all of the streams that are currently stored + // in the joiners. This should be called if startup fails. As we + // call `Stage.Start()` and pass that method streams, we clear + // them from the corresponding joiners to avoid closing them + // twice. + closePipes := func() { + for _, sj := range stageJoiners { + _ = sj.closePipe() + } + } + + // Store the stages in the joiners, and verify that the stages' + // requirements are well-formed: + for i, s := range p.stages { + // Make sure that the stage's requirements are well-formed: + requirements := s.Requirements() + if err := requirements.Stdin.Validate(); err != nil { + closePipes() + return fmt.Errorf( + "stage %q has invalid stdin requirement: %w", s.Name(), err, + ) + } + if err := requirements.Stdout.Validate(); err != nil { + closePipes() + return fmt.Errorf( + "stage %q has invalid stdout requirement: %w", s.Name(), err, + ) + } + + stageJoiners[i].nextStage = s + stageJoiners[i].nextStageReq = requirements + stageJoiners[i+1].prevStage = s + stageJoiners[i+1].prevStageReq = requirements + } + + // Check that each of the stages' requirements are satisfiable: + for i := range stageJoiners { + if err := stageJoiners[i].validate(); err != nil { + closePipes() + return err + } + } + + // Create the "inner" pipes (i.e, all but the first and last + // `stageJoiners`): + for i := 1; i < len(stageJoiners)-1; i++ { + if err := stageJoiners[i].createPipe(); err != nil { + closePipes() + return err + } + } + + // We're about to start up the stages, one by one. If something + // goes wrong during that process, `abort` should be called to + // kill any stages that have already been started and to close any + // pipes that have not yet been passed to a stage. `i` is the + // index of the stage that failed to start. If the stage already + // received its streams, it is responsible for closing them. + abort := func(i int, err error) error { + closePipes() + + // Kill and wait for any stages that have been started + // already to finish: + p.cancel() + for _, s := range p.stages[:i] { + _ = s.Wait() + } + return &EventError{ + Command: p.stages[i].Name(), + Msg: "failed to start pipe stage", + Err: err, + } + } + + // Loop over all of the stages, starting them in order. + for i, s := range p.stages { + prevSJ := &stageJoiners[i] + nextSJ := &stageJoiners[i+1] + + err := s.Start(ctx, opts, prevSJ.nextStdin, nextSJ.prevStdout) + + // Even if that stage failed to start, we are no longer + // responsible for closing its streams: + prevSJ.nextStdin = nil + nextSJ.prevStdout = nil + + if err != nil { + return abort(i, err) + } + } + + return nil +} + +// Wait waits for each stage in the pipe to exit. +func (p *Pipe) Wait() error { + p.oneUse.assertStarted("wait") + + if len(p.stages) == 0 { + // There was nothing to do, and we did it brilliantly! + return nil + } + + // Make sure that all of the cleanup eventually happens: + defer p.cancel() + + var earliestStageErr error + var earliestFailedStage Stage + + finishedEarly := false + for i := len(p.stages) - 1; i >= 0; i-- { + s := p.stages[i] + err := s.Wait() + + // Handle errors: + switch { + case err == nil: + // No error to handle. But unset the `finishedEarly` flag, + // because earlier stages shouldn't be affected by the + // later stage that finished early. + finishedEarly = false + continue + + case errors.Is(err, FinishEarly): + // Note `FinishEarly` errors, because that is how a stage + // informs us that it intentionally finished early. + // Moreover, if we see a `FinishEarly` error, ignore any + // pipe error from the immediately preceding stage, + // because it probably came from trying to write to this + // stage after this stage closed its stdin. + finishedEarly = true + continue + + case IsPipeError(err): + switch { + case finishedEarly: + // A successor stage finished early. It is common for + // this to cause earlier stages to fail with pipe + // errors. Such errors are uninteresting, so ignore + // them. Leave the `finishedEarly` flag set, because + // the preceding stage might get a pipe error from + // trying to write to this one. + case earliestStageErr != nil: + // A later stage has already reported an error. This + // means that we don't want to report the error from + // this stage: + // + // * If the later error was also a pipe error: we want + // to report the _last_ pipe error seen, which would + // be the one already recorded. + // + // * If the later error was not a pipe error: non-pipe + // errors are always considered more important than + // pipe errors, so again we would want to keep the + // error that is already recorded. + default: + // In this case, the pipe error from this stage is the + // most important error that we have seen so far, so + // remember it: + earliestFailedStage, earliestStageErr = s, err + } + + default: + // This stage exited with a non-pipe error. If multiple + // stages exited with such errors, we want to report the + // one that is most informative. We take that to be the + // error from the earliest failing stage. Since we are + // iterating through stages in reverse order, overwrite + // any existing remembered errors (which would have come + // from a later stage): + earliestFailedStage, earliestStageErr = s, err + finishedEarly = false + } + } + + if earliestStageErr != nil { + return &EventError{ + Command: earliestFailedStage.Name(), + Msg: "command failed", + Err: earliestStageErr, + } + } + + if finishedEarly { + // The earliest stage finished early, so report that as our + // final error. This might cause pipe errors from earlier + // stages to get suppressed. If this is the top-level stage, + // then this `FinishEarly` error will get ignored by + // `Pipeline.Wait()`. + return FinishEarly + } + + return nil +} diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 878cdc6..a5755ef 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -1,474 +1,64 @@ package pipe import ( - "bytes" "context" - "errors" - "fmt" - "io" - "sync/atomic" ) -// Env represents the environment that a pipeline stage should run in. -// It is passed to `Stage.Start()`. -type Env struct { - // The directory in which external commands should be executed by - // default. - Dir string - - // Vars are extra environment variables. These will override any - // environment variables that would be inherited from the current - // process. - Vars []AppendVars -} - -// FinishEarly is an error that can be returned by a `Stage` to -// request that the iteration be ended early (possibly without reading -// all of its input). This "error" is considered a successful return, -// and is not reported to the caller. -// -//revive:disable:error-naming -//nolint:staticcheck // ST1012: FinishEarly is the intentional name for this sentinel error -var FinishEarly = errors.New("finish stage early") - -//revive:enable:error-naming - -type AppendVars func(context.Context, []EnvVar) []EnvVar - -// EnvVar represents an environment variable that will be provided to any child -// process spawned in this pipeline. -type EnvVar struct { - // The name of the environment variable. - Key string - // The value. - Value string -} - -type ContextValueFunc func(context.Context) (string, bool) - -type ContextValuesFunc func(context.Context) []EnvVar - // Pipeline represents a Unix-like pipe that can include multiple // stages, including external processes but also and stages written in // Go. type Pipeline struct { - env Env + r *runner - stdin *InputStream - stdout *OutputStream - stages []Stage - cancel func() + p *Pipe - // Atomically written and read value, nonzero if the pipeline has - // been started. This is only used for lifecycle sanity checks but - // does not guarantee that clients are using the class correctly. - started uint32 - - eventHandler func(e *Event) - panicHandler StagePanicHandler + wait WaitFunc } -var emptyEventHandler = func(_ *Event) {} - -type NewPipeFn func(opts ...Option) *Pipeline - // NewPipeline returns a Pipeline struct with all of the `options` -// applied. +// applied. Since `Pipeline` doesn't allow external access to its +// `Runner`, it permits any `StartOption`s as options (not only +// `RunnerOption`s). func New(options ...Option) *Pipeline { - p := &Pipeline{ - eventHandler: emptyEventHandler, - } - - for _, option := range options { - option(p) - } - - return p -} - -// Option is a type alias for Pipeline functional options. -type Option func(*Pipeline) - -// WithDir sets the default directory for running external commands. -func WithDir(dir string) Option { - return func(p *Pipeline) { - p.env.Dir = dir - } -} - -// WithStdin assigns stdin to the first command in the pipeline. The -// caller retains ownership of stdin; the pipeline will not close it, -// even if `Start()` returns an error. -func WithStdin(stdin io.Reader) Option { - return func(p *Pipeline) { - p.stdin = Input(stdin) - } -} - -// WithStdout assigns stdout to the last command in the pipeline. The -// caller retains ownership of stdout; the pipeline will not close it, -// even if `Start()` returns an error. -func WithStdout(stdout io.Writer) Option { - return func(p *Pipeline) { - p.stdout = Output(stdout) + p := NewPipe("pipeline") + r := newRunner(p, options...) + return &Pipeline{ + r: r, + p: p, } } -// WithStdoutCloser assigns stdout to the last command in the -// pipeline, and closes stdout when the pipeline is done with it. The -// pipeline is responsible for closing stdout even if `Start()` returns -// an error. -func WithStdoutCloser(stdout io.WriteCloser) Option { - return func(p *Pipeline) { - p.stdout = ClosingOutput(stdout) - } -} - -// WithEnvVar appends an environment variable for the pipeline. -func WithEnvVar(key, value string) Option { - return func(p *Pipeline) { - p.env.Vars = append(p.env.Vars, func(_ context.Context, vars []EnvVar) []EnvVar { - return append(vars, EnvVar{Key: key, Value: value}) - }) - } -} - -// WithEnvVars appends several environment variable for the pipeline. -func WithEnvVars(b []EnvVar) Option { - return func(p *Pipeline) { - p.env.Vars = append(p.env.Vars, func(_ context.Context, a []EnvVar) []EnvVar { - return append(a, b...) - }) - } -} - -// WithEnvVarFunc appends a context-based environment variable for the pipeline. -func WithEnvVarFunc(key string, valueFunc ContextValueFunc) Option { - return func(p *Pipeline) { - p.env.Vars = append(p.env.Vars, func(ctx context.Context, vars []EnvVar) []EnvVar { - if val, ok := valueFunc(ctx); ok { - return append(vars, EnvVar{Key: key, Value: val}) - } - return vars - }) - } -} - -// WithEnvVarsFunc appends several context-based environment variables for the pipeline. -func WithEnvVarsFunc(valuesFunc ContextValuesFunc) Option { - return func(p *Pipeline) { - p.env.Vars = append(p.env.Vars, func(ctx context.Context, vars []EnvVar) []EnvVar { - return append(vars, valuesFunc(ctx)...) - }) - } -} - -// Event represents anything that could happen during the pipeline execution -type Event struct { - Command string - Msg string - Err error - Context map[string]interface{} -} - -// WithEventHandler sets a handler for the pipeline. Setting one will emit -// and event for each process. -func WithEventHandler(handler func(e *Event)) Option { - return func(p *Pipeline) { - p.eventHandler = handler - } -} - -// WithStagePanicHandler sets a panic handler for the stages within a pipeline. -// When a pipeline stage panics, the provided handler will be invoked, allowing -// the client to handle the panic in whatever way they see fit. -// -// Note: -// - The client is responsible for deciding whether to recover from the panic or panicking again. -// - If a panic handler is not set, the panic will be propagated normally. -func WithStagePanicHandler(ph StagePanicHandler) Option { - return func(p *Pipeline) { - p.panicHandler = ph - } -} - -func (p *Pipeline) hasStarted() bool { - return atomic.LoadUint32(&p.started) != 0 -} - // Add appends one or more stages to the pipeline. func (p *Pipeline) Add(stages ...Stage) { - if p.hasStarted() { - panic("attempt to modify a pipeline that has already started") - } - - p.stages = append(p.stages, stages...) + p.p.Add(stages...) } // AddWithIgnoredError appends one or more stages that are ignoring // the passed in error to the pipeline. func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { - if p.hasStarted() { - panic("attempt to modify a pipeline that has already started") - } - - for _, stage := range stages { - p.stages = append(p.stages, IgnoreError(stage, em)) - } -} - -func (p *Pipeline) stageOptions() StageOptions { - return StageOptions{Env: p.env, PanicHandler: p.panicHandler} + p.p.AddWithIgnoredError(em, stages...) } -// Start starts the commands in the pipeline. If `Start()` exits -// without an error, `Wait()` must also be called, to allow all -// resources to be freed. -// -// If `Start()` returns an error, `Wait()` must not be called. Before -// returning an error, `Start()` cancels and waits for any stages that -// were started, closes any inter-stage pipes that the pipeline owns, -// and closes stdout if it was supplied with `WithStdoutCloser()`. -// Streams supplied with `WithStdin()` or `WithStdout()` remain owned by -// the caller and are not closed by the pipeline. -func (p *Pipeline) Start(ctx context.Context) error { - if p.hasStarted() { - panic("attempt to start a pipeline that has already started") - } - - atomic.StoreUint32(&p.started, 1) - ctx, p.cancel = context.WithCancel(ctx) - - if len(p.stages) == 0 { - if p.stdout == nil { - // No stages and no destination: there is nothing to do - // and nowhere to put `p.stdin` even if it was set. - return nil - } - // No stages but a destination was configured: synthesize an - // identity-copy stage so that `WithStdin()` is drained into - // `WithStdout()`/`WithStdoutCloser()` and the destination - // closer (if any) is invoked. - p.stages = append(p.stages, Function( - "identity", - func(_ context.Context, _ Env, stdin io.Reader, stdout io.Writer) error { - if stdin == nil { - return nil - } - _, err := io.Copy(stdout, stdin) - return err - }, - )) - } - - // We need to decide how to start the stages, especially what - // pipes to use to connect adjacent stages (`os.Pipe()` vs. - // `io.Pipe()`) based on the two stages' requirements. - stageJoiners := make([]stageJoiner, len(p.stages)+1) - - // Arrange for the input of the 0th stage to come from `p.stdin`: - stageJoiners[0].nextStdin = p.stdin - - // Arrange for the output of the last stage to go to `p.stdout`: - stageJoiners[len(p.stages)].prevStdout = p.stdout - - // closePipes closes all of the streams that are currently stored - // in the joiners. This should be called if startup fails. As we - // call `Stage.Start()` and pass that method streams, we clear - // them from the corresponding joiners to avoid closing them - // twice. - closePipes := func() { - for _, sj := range stageJoiners { - _ = sj.closePipe() - } - } - - // Store the stages in the joiners, and verify that the stages' - // requirements are well-formed: - for i, s := range p.stages { - // Make sure that the stage's requirements are well-formed: - requirements := s.Requirements() - if err := requirements.Stdin.Validate(); err != nil { - closePipes() - return fmt.Errorf("stdin: %w", err) - } - if err := requirements.Stdout.Validate(); err != nil { - closePipes() - return fmt.Errorf("stdout: %w", err) - } - - stageJoiners[i].nextStage = s - stageJoiners[i].nextStageReq = requirements - stageJoiners[i+1].prevStage = s - stageJoiners[i+1].prevStageReq = requirements - } - - // Check that each of the stages' requirements are satisfiable: - for i := range stageJoiners { - if err := stageJoiners[i].validate(); err != nil { - closePipes() - return err - } - } - - // Create the "inner" pipes (i.e, all but the first and last - // `stageJoiners`): - for i := 1; i < len(stageJoiners)-1; i++ { - if err := stageJoiners[i].createPipe(); err != nil { - closePipes() - return err - } - } - - // We're about to start up the stages, one by one. If something - // goes wrong during that process, this function should be called - // to kill any stages that have already been started and to close - // any pipes that have not yet been passed to a stage. `i` is the - // index of the stage that failed to start. If the stage already - // received its streams, it is responsible for closing them. - abort := func(i int, err error) error { - closePipes() - - // Kill and wait for any stages that have been started - // already to finish: - p.cancel() - for _, s := range p.stages[:i] { - _ = s.Wait() - } - p.eventHandler(&Event{ - Command: p.stages[i].Name(), - Msg: "failed to start pipeline stage", - Err: err, - }) - return fmt.Errorf( - "starting pipeline stage %q: %w", p.stages[i].Name(), err, - ) - } - - // Loop over all of the stages, starting them in order. - for i, s := range p.stages { - prevSJ := &stageJoiners[i] - nextSJ := &stageJoiners[i+1] - - err := s.Start(ctx, p.stageOptions(), prevSJ.nextStdin, nextSJ.prevStdout) - - // Even if that stage failed to start, we are no longer - // responsible for closing its streams: - prevSJ.nextStdin = nil - nextSJ.prevStdout = nil - - if err != nil { - return abort(i, err) - } +func (p *Pipeline) Start(ctx context.Context, options ...Option) error { + p.r.applyOptions(options...) + wait, err := p.r.start(ctx) + if err != nil { + return err } - + p.wait = wait return nil } -func (p *Pipeline) Output(ctx context.Context) ([]byte, error) { - var buf bytes.Buffer - p.stdout = Output(&buf) - err := p.Run(ctx) - return buf.Bytes(), err -} - -// Wait waits for each stage in the pipeline to exit. func (p *Pipeline) Wait() error { - if !p.hasStarted() { - panic("unable to wait on a pipeline that has not started") - } - - // Make sure that all of the cleanup eventually happens: - defer p.cancel() - - var earliestStageErr error - var earliestFailedStage Stage - - finishedEarly := false - for i := len(p.stages) - 1; i >= 0; i-- { - s := p.stages[i] - err := s.Wait() - - // Handle errors: - switch { - case err == nil: - // No error to handle. But unset the `finishedEarly` flag, - // because earlier stages shouldn't be affected by the - // later stage that finished early. - finishedEarly = false - continue - - case errors.Is(err, FinishEarly): - // We ignore `FinishEarly` errors because that is how a - // stage informs us that it intentionally finished early. - // Moreover, if we see a `FinishEarly` error, ignore any - // pipe error from the immediately preceding stage, - // because it probably came from trying to write to this - // stage after this stage closed its stdin. - finishedEarly = true - continue - - case IsPipeError(err): - switch { - case finishedEarly: - // A successor stage finished early. It is common for - // this to cause earlier stages to fail with pipe - // errors. Such errors are uninteresting, so ignore - // them. Leave the `finishedEarly` flag set, because - // the preceding stage might get a pipe error from - // trying to write to this one. - case earliestStageErr != nil: - // A later stage has already reported an error. This - // means that we don't want to report the error from - // this stage: - // - // * If the later error was also a pipe error: we want - // to report the _last_ pipe error seen, which would - // be the one already recorded. - // - // * If the later error was not a pipe error: non-pipe - // errors are always considered more important than - // pipe errors, so again we would want to keep the - // error that is already recorded. - default: - // In this case, the pipe error from this stage is the - // most important error that we have seen so far, so - // remember it: - earliestFailedStage, earliestStageErr = s, err - } - - default: - // This stage exited with a non-pipe error. If multiple - // stages exited with such errors, we want to report the - // one that is most informative. We take that to be the - // error from the earliest failing stage. Since we are - // iterating through stages in reverse order, overwrite - // any existing remembered errors (which would have come - // from a later stage): - earliestFailedStage, earliestStageErr = s, err - finishedEarly = false - } - } - - if earliestStageErr != nil { - p.eventHandler(&Event{ - Command: earliestFailedStage.Name(), - Msg: "command failed", - Err: earliestStageErr, - }) - return fmt.Errorf("%s: %w", earliestFailedStage.Name(), earliestStageErr) - } - - return nil + return p.wait() } -// Run starts and waits for the commands in the pipeline. If startup -// fails, it returns the `Start()` error after `Start()` has performed -// its failure cleanup. -func (p *Pipeline) Run(ctx context.Context) error { - if err := p.Start(ctx); err != nil { - return err - } +func (p *Pipeline) Run(ctx context.Context, options ...Option) error { + p.r.applyOptions(options...) + return p.r.run(ctx) +} - return p.Wait() +func (p *Pipeline) Output(ctx context.Context, options ...Option) ([]byte, error) { + p.r.applyOptions(options...) + return p.r.output(ctx) } diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index e956964..f0b05f2 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -54,6 +54,75 @@ func TestPipelineEmptyOutput(t *testing.T) { } } +func TestPipelineDuplicateStdin(t *testing.T) { + t.Parallel() + ctx := context.Background() + var stdin1, stdin2 bytes.Buffer + p := pipe.New( + pipe.WithStdin(&stdin1), + pipe.WithStdin(&stdin2), + ) + + err := p.Run(ctx) + if assert.Error(t, err) { + assert.Equal(t, `stdin set multiple times for "pipeline"`, err.Error()) + } +} + +func TestPipelineDuplicateStdout(t *testing.T) { + t.Parallel() + ctx := context.Background() + stdout1 := &closeTrackingWriter{} + stdout2 := &closeTrackingWriter{} + p := pipe.New( + pipe.WithStdout(stdout1), + pipe.WithStdout(stdout2), + ) + + err := p.Run(ctx) + if assert.Error(t, err) { + assert.Equal(t, `stdout set multiple times for "pipeline"`, err.Error()) + assert.False(t, stdout1.closed, "WithStdoutCloser destination should not be closed") + assert.False(t, stdout2.closed, "WithStdoutCloser destination should not be closed") + } +} + +func TestPipelineDuplicateStdoutClosing(t *testing.T) { + t.Parallel() + ctx := context.Background() + stdout1 := &closeTrackingWriter{} + stdout2 := &closeTrackingWriter{} + p := pipe.New( + pipe.WithStdoutCloser(stdout1), + pipe.WithStdoutCloser(stdout2), + ) + + err := p.Run(ctx) + if assert.Error(t, err) { + assert.Equal(t, `stdout set multiple times for "pipeline"`, err.Error()) + assert.True(t, stdout1.closed, "WithStdoutCloser destination should be closed") + assert.True(t, stdout2.closed, "WithStdoutCloser destination should be closed") + } +} + +func TestPipelineStdoutAndOutput(t *testing.T) { + t.Parallel() + ctx := context.Background() + stdout := &closeTrackingWriter{} + p := pipe.New( + pipe.WithStdin(strings.NewReader("hello world\n")), + pipe.WithStdoutCloser(stdout), + ) + + out, err := p.Output(ctx) + if assert.Error(t, err) { + assert.Nil(t, out) + assert.Equal(t, `Output() called for "pipeline" but stdout is already set`, err.Error()) + assert.Equal(t, "", stdout.buf.String()) + assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed") + } +} + func TestPipelineEmptyWithStdoutCloser(t *testing.T) { t.Parallel() ctx := context.Background() @@ -377,7 +446,7 @@ func TestPipelineExit(t *testing.T) { pipe.Command("false"), pipe.Command("true"), ) - assert.EqualError(t, p.Run(ctx), "false: exit status 1") + assert.EqualError(t, p.Run(ctx), `command failed in stage "false": exit status 1`) } func TestPipelineStderr(t *testing.T) { @@ -393,7 +462,7 @@ func TestPipelineStderr(t *testing.T) { _, err = p.Output(ctx) if assert.Error(t, err) { - assert.Contains(t, err.Error(), "ls: exit status") + assert.Contains(t, err.Error(), `command failed in stage "ls": exit status`) } } @@ -450,7 +519,7 @@ func TestLittleEPIPE(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() err := p.Run(ctx) - assert.EqualError(t, err, "sh: signal: broken pipe") + assert.EqualError(t, err, `command failed in stage "sh": signal: broken pipe`) } // Verify the correct error if one command in the pipeline exits @@ -469,7 +538,7 @@ func TestBigEPIPE(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() err := p.Run(ctx) - assert.EqualError(t, err, "seq: signal: broken pipe") + assert.EqualError(t, err, `command failed in stage "seq": signal: broken pipe`) } // Verify the correct error if one command in the pipeline exits @@ -951,6 +1020,24 @@ func TestFunctionOptionsForbidStreams(t *testing.T) { }) } +func TestFunctionOptionsSetStreamRequirements(t *testing.T) { + t.Parallel() + + stage := pipe.Function( + "file-preferring", + func(_ context.Context, _ pipe.Env, _ io.Reader, _ io.Writer) error { + return nil + }, + pipe.WithStdinRequirement(pipe.StreamPreferFile), + pipe.WithStdoutRequirement(pipe.StreamPreferFile), + ) + + assert.Equal(t, pipe.StageRequirements{ + Stdin: pipe.StreamPreferFile, + Stdout: pipe.StreamPreferFile, + }, stage.Requirements()) +} + func TestStreamForbiddenStdin(t *testing.T) { t.Parallel() ctx := context.Background() @@ -1024,7 +1111,10 @@ func TestInvalidStreamRequirements(t *testing.T) { Stdin: pipe.StreamRequirement(123), }, }) - require.ErrorContains(t, p.Run(ctx), `stdin: invalid stream requirement 123`) + require.ErrorContains( + t, p.Run(ctx), + `stage "source" has invalid stdin requirement: invalid stream requirement 123`, + ) assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed") }) @@ -1038,7 +1128,10 @@ func TestInvalidStreamRequirements(t *testing.T) { Stdout: pipe.StreamRequirement(123), }, }) - require.ErrorContains(t, p.Run(ctx), `stdout: invalid stream requirement 123`) + require.ErrorContains( + t, p.Run(ctx), + `stage "sink" has invalid stdout requirement: invalid stream requirement 123`, + ) assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed") }) } @@ -1071,7 +1164,10 @@ func TestInvalidStreamRequirement(t *testing.T) { Stdin: pipe.StreamRequirement(99), }, }) - require.ErrorContains(t, p.Run(ctx), `stdin: invalid stream requirement 99`) + require.ErrorContains( + t, p.Run(ctx), + `stage "invalid" has invalid stdin requirement: invalid stream requirement 99`, + ) } func TestFunctionNoInput(t *testing.T) { diff --git a/pipe/runner.go b/pipe/runner.go new file mode 100644 index 0000000..0964920 --- /dev/null +++ b/pipe/runner.go @@ -0,0 +1,176 @@ +package pipe + +import ( + "bytes" + "context" + "errors" + "fmt" +) + +// Env represents the environment that a pipeline stage should run in. +// It is passed to `Stage.Start()`. +type Env struct { + // The directory in which external commands should be executed by + // default. + Dir string + + // Vars are extra environment variables. These will override any + // environment variables that would be inherited from the current + // process. + Vars []AppendVars +} + +// FinishEarly is an error that can be returned by a `Stage` to +// request that the iteration be ended early (possibly without reading +// all of its input). This "error" is considered a successful return, +// and is not reported to the caller. +// +//revive:disable:error-naming +//nolint:staticcheck // ST1012: FinishEarly is the intentional name for this sentinel error +var FinishEarly = errors.New("finish stage early") + +//revive:enable:error-naming + +type AppendVars func(context.Context, []EnvVar) []EnvVar + +// EnvVar represents an environment variable that will be provided to +// any child process. +type EnvVar struct { + // The name of the environment variable. + Key string + // The value. + Value string +} + +// runner runs a `Stage`. A `runner` can only be used once. +type runner struct { + env Env + + stdin *InputStream + stdout *OutputStream + + stage Stage + + eventHandler EventHandler + panicHandler StagePanicHandler + + configErr error + + oneUse oneUse +} + +var emptyEventHandler = func(_ *EventError) {} + +// newRunner returns a Runner with all of the `options` applied. +func newRunner(stage Stage, options ...Option) *runner { + r := &runner{ + stage: stage, + eventHandler: emptyEventHandler, + oneUse: oneUse{thing: "runner for " + stage.Name()}, + } + + r.applyOptions(options...) + + return r +} + +// applyOptions applies `options` to `r` in place (in addition to any +// options that have already been applied). +func (r *runner) applyOptions(options ...Option) { + r.oneUse.assertNotStarted("apply options") + + for _, option := range options { + option.applyAtStart(r) + } +} + +func (r *runner) stageOptions() StageOptions { + return StageOptions{Env: r.env, PanicHandler: r.panicHandler} +} + +type WaitFunc func() error + +func nopWait(err error) WaitFunc { + return func() error { return err } +} + +// start starts the stage. If `start()` exits without an error, the +// returned `Waiter` must also be called exactly once, to learn about +// any errors and to allow all resources to be freed. +// +// If `start()` returns an error, the returned `Waiter` is a NOP that +// returns the same error. Before returning an error, `start()` +// cancels and waits for any stages that were started, closes any +// inter-stage pipes that the pipeline owns, and closes stdin/stdout +// if required. Streams that were supplied with `WithStdin()` or +// `WithStdout()` remain owned by the caller and are never closed by +// `runner`. +func (r *runner) start(ctx context.Context) (WaitFunc, error) { + r.oneUse.assertStarting("start") + + if r.configErr != nil { + _ = r.stdin.Close() + _ = r.stdout.Close() + return nopWait(r.configErr), r.configErr + } + + if err := r.stage.Start(ctx, r.stageOptions(), r.stdin, r.stdout); err != nil { + return nopWait(err), err + } + + return r.wait, nil +} + +// wait is the `WaitFunc` that is normally returned by `start()`. +func (r *runner) wait() error { + r.oneUse.assertStarted("wait") + + err := r.stage.Wait() + + // Handle errors: + switch { + case err == nil: + // No error to handle. + + case errors.Is(err, FinishEarly): + // We ignore `FinishEarly` errors because that is how a + // stage informs us that it intentionally finished early. + + default: + var eventErr *EventError + if errors.As(err, &eventErr) { + r.eventHandler(eventErr) + } + + return err + } + + return nil +} + +// run starts `stage` and waits for it to finish. +func (r *runner) run(ctx context.Context) error { + // If start returns an error, the same error is returned by + // `wait`. + wait, _ := r.start(ctx) + return wait() +} + +// output starts `stage`, waits for it to finish, and collects and +// returns its stdout. +func (r *runner) output(ctx context.Context) ([]byte, error) { + r.oneUse.assertNotStarted("get output") + + if r.stdout != nil { + _ = r.stdin.Close() + _ = r.stdout.Close() + return nil, fmt.Errorf( + "Output() called for %q but stdout is already set", r.stage.Name(), + ) + } + + var buf bytes.Buffer + r.applyOptions(WithStdout(&buf)) + err := r.run(ctx) + return buf.Bytes(), err +}