Skip to content
79 changes: 79 additions & 0 deletions pipe/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package pipe

import "context"

// Config represents a configuration for running pipes. A config
// instance is immutable. If you want a new configuration, either use
// `NewConfig()` to create one from scratch, or call
// `cfg.WithOption()` or `cfg.WithOptions()` to derive a new
// configuration from an existing one by adding more options. `Config`
// itself implements `Option` and `ConfigOption`, so it can be used
// when constructing a `Pipeline`. Or the Pipeline can be created
// using the helper method `Config.NewPipeline()`.
type Config struct {
options sliceConfigOption
}

func newConfig(options []ConfigOption, extraSlots int) *Config {
cfg := Config{
options: make(sliceConfigOption, 0, len(options)+extraSlots),
}
cfg.options = append(cfg.options, options...)
return &cfg
}

// NewConfig returns a `Config` with the specified `options`.
func NewConfig(options ...ConfigOption) *Config {
return newConfig(options, 0)
}

// WithOption returns a new `Config` with the same options as `cfg`
// plus the additional `option`.
func (cfg *Config) WithOption(option ConfigOption) *Config {
newCfg := newConfig(cfg.options, 1)
newCfg.options = append(newCfg.options, option)
return newCfg
}

// WithOption returns a new `Config` with the same options as `cfg`
// plus the additional `options`.
func (cfg *Config) WithOptions(options ...ConfigOption) *Config {
newCfg := newConfig(cfg.options, len(options))
newCfg.options = append(newCfg.options, options...)
return newCfg
}

func (cfg *Config) newRunner(stage Stage, options ...Option) *runner {
r := newRunner(stage, cfg.options)
r.applyOptions(options...)
return r
}

func (cfg *Config) NewPipeline(options ...Option) *Pipeline {
return New(cfg.options, sliceOption(options))
}

// Start starts `stage` using a runner created using `cfg` plus any
// additional start options from `options`. If `Start()` exits without
// an error, the returned `Waiter` must also be called, to learn about
// any errors and to ensure that all resources are freed. See
// [runner.Start] for more information.
func (cfg *Config) Start(
ctx context.Context, stage Stage, options ...Option,
) (WaitFunc, error) {
return cfg.newRunner(stage, options...).start(ctx)
}

// Start starts `stage` using a runner created using `cfg` plus any
// additional start options from `options`.
func (cfg *Config) Run(
ctx context.Context, stage Stage, options ...Option,
) error {
return cfg.newRunner(stage, options...).run(ctx)
}

func (cfg *Config) Output(
ctx context.Context, stage Stage, options ...Option,
) ([]byte, error) {
return cfg.newRunner(stage, options...).output(ctx)
}
10 changes: 7 additions & 3 deletions pipe/env_stage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,13 @@ func TestWithExtraEnvDoesNotShareVarsBackingArray(t *testing.T) {
})
}

p := New(func(p *Pipeline) {
p.env.Vars = baseVars
})
p := New(
newConfigOption(
func(r *runner) {
r.env.Vars = baseVars
},
),
)
p.Add(
WithExtraEnv(
Function("first", func(ctx context.Context, env Env, _ io.Reader, _ io.Writer) error {
Expand Down
29 changes: 29 additions & 0 deletions pipe/event_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package pipe

import "fmt"

// EventError represents an error that could happen during the
// pipeline execution that we might want to report as an event.
type EventError struct {
Command string
Msg string
Err error
Context map[string]interface{}
}

type EventHandler func(err *EventError)

func (err *EventError) Error() string {
if err.Msg == "" {
return fmt.Sprintf("%s: %v", err.Command, err.Err)
}
return fmt.Sprintf("%s in stage %q: %v", err.Msg, err.Command, err.Err)
}

func (err *EventError) Unwrap() error {
return err.Err
}

// Event is an alias for `EventError`, for reasons of backwards
// compatibility.
type Event = EventError
24 changes: 18 additions & 6 deletions pipe/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,32 @@ type StageFunc func(ctx context.Context, env Env, stdin io.Reader, stdout io.Wri
// FunctionOption configures a Function stage.
type FunctionOption func(*goStage)

// WithStdinRequirement returns a FunctionOption declaring the stage's stdin
// requirement.
func WithStdinRequirement(requirement StreamRequirement) FunctionOption {
return func(s *goStage) {
s.requirements.Stdin = requirement
}
}

// WithStdoutRequirement returns a FunctionOption declaring the stage's stdout
// requirement.
func WithStdoutRequirement(requirement StreamRequirement) FunctionOption {
return func(s *goStage) {
s.requirements.Stdout = requirement
}
}

// ForbidStdin returns a FunctionOption declaring that the stage must not be
// connected to stdin.
func ForbidStdin() FunctionOption {
return func(s *goStage) {
s.requirements.Stdin = StreamForbidden
}
return WithStdinRequirement(StreamForbidden)
}

// ForbidStdout returns a FunctionOption declaring that the stage must not be
// connected to stdout.
func ForbidStdout() FunctionOption {
return func(s *goStage) {
s.requirements.Stdout = StreamForbidden
}
return WithStdoutRequirement(StreamForbidden)
}

// Function returns a pipeline `Stage` that will run a `StageFunc` in
Expand Down
Loading
Loading