Skip to content

Add some ergonomic features: Pipe, runner, and Config#61

Draft
mhagger wants to merge 9 commits into
znull/stage-v2-cleanfrom
pipe-and-runner-and-config
Draft

Add some ergonomic features: Pipe, runner, and Config#61
mhagger wants to merge 9 commits into
znull/stage-v2-cleanfrom
pipe-and-runner-and-config

Conversation

@mhagger

@mhagger mhagger commented Jun 16, 2026

Copy link
Copy Markdown
Member

Some abstractions were struggling desperately to get out. Here they are.

EventError

The Event type is a little bit weird. It's construed as an event, but really it represents errors, and its actual raison d'être is for logging. I'm still not happy about the whole idea, but for now I've changed Event to implement error and renamed it to EventError (also aliased to Event). This means that the eventHandler doesn't have to be handed down through the stages when running a pipeline, which is useful since we now support compound stages (for more about that, read on…).

Pipe

The only thing that could be run by this package was a Pipeline, which was a chain of Stages strung together with pipes. But "a chain of Stages strung together with pipes" is itself a thing that behaves like a Stage. So add a new type, Pipe, that represents exactly that and implements the Stage interface. A Pipe can be used wherever any other Stage can be used. In particular, somebody could hand you a Pipe as a sort of prefabricated part of a pipeline, and you could insert it into your own Pipeline as a component, along with other stages that feed it inputs or process its outputs.

But then once we have the concept of a Pipe, which knows how to manage sub-stages and run them as a unit, Pipeline doesn't need to duplicate that knowledge. So change the implementation of Pipeline to use a Pipe internally. When it comes time to run a Pipeline, it only has to start that single Pipe stage, and the latter does all of the work.

runner

Now you notice that, aside from helper methods to assemble the Pipe, a Pipeline doesn't really care that the (single!) stage that it runs has to be a Pipe. It could be any kind of single Stage!

So extract the code for starting up a stage into a new private type, runner. A runner knows how to set up the environment as well as the stdin and stdout for any Stage and then run the stage using methods start(), run(), or output(). Change Pipeline into a thin wrapper around a runner and a Pipe (while preserving its same external interface).

Config

Finally, client code has shown a need to use a NewPipeFn interface to create Pipelines. The reason is that often a whole app wants to add the same options uniformly to all pipelines that are created within that app. This is something that the library can make easier.

So add a Config type, which allows options to be stored in it. This type is immutable, but one Config can be derived from another with added options:

  • func (cfg *Config) WithOption(option ConfigOption) *Config
  • func (cfg *Config) WithOptions(options ...ConfigOption) *Config

Config has a method that allow a pipeline to be created using it:

  • func (cfg *Config) NewPipeline(options ...Option) *Pipeline

It also has methods that allow stages (i.e., including compound stages like Pipe) to be run directly, allowing additional options to be specified at runtime:

  • func (cfg *Config) Start(ctx context.Context, stage Stage, options ...Option) (WaitFunc, error)
  • func (cfg *Config) Run(ctx context.Context, stage Stage, options ...Option) error
  • func (cfg *Config) Output(ctx context.Context, stage Stage, options ...Option) ([]byte, error)

I anticipate that this new style will be more natural and will gradually replace the old. Instead of

p := newPipeFn(pipe.WithStdin(f), pipe.WithStdout(os.Stdout))
p.Add(
    pipe.Command(...),
    pipe.LinewiseFunction(...),
)
err := p.Run(ctx)

the new alternative looks like

p := pipe.NewPipe(
    pipe.Command(...),
    pipe.LinewiseFunction(...),
)
err := pipeCfg.Run(ctx, p, pipe.WithStdin(f), pipe.WithStdout(os.Stdout))

Moreover, Config is a place that we can add more configurability in the future; for example, we could add a NewCommand() method that by default calls exec.Command(), but that allows the user to configure it to do something different, like running the command with reduced permissions or with resource limitations.

Conclusion

The changes in this PR didn't require any changes to the test suite aside from a couple trivial changes to tests that used private methods. Well, actually it still needs some tests that use the new features, but at least we know that Pipeline still works as it did before.

It does, however, change some types in ways that would not strictly be backwards compatible (especially, the Options type is now an interface, not a function type). That is why I would like to get these changes in before v2 is officially released.

/cc @znull

mhagger added 9 commits June 16, 2026 12:43
The reason for the filename will be apparent in a moment.
Rename `Event` to `EventError`, and make it satisfy the `error`
interface and also implement `Unwrap() error`. Add an alias `Event`
for backwards-compatibility.
When there is an `EventError`, return it directly rather than passing
the `EventError` to the `ErrorHandler` and returning a slightly
differently-formatted message as the `error` result. This slightly
changes the error's messages, so adjust the tests accordingly.
Add `Pipe`, a `Stage` that itself runs multiple other stages with
their stdouts and stdins piped to one another. This is a useful
concept, because it allows pipelines to be created in modular form
more easily.

Also, now that we have this type, we can is it in the implementation
of `Pipeline`. Specifically, `Pipeline` now embeds a single `Pipe`,
which takes care of managing multiple stages and piping them together.
This means that `Pipeline` only needs to know how to run a single
stage.

More changes are still to come…
`runner` is an object that can run a single `Stage` of any type.

Use `runner` in the implementation of `Pipeline`. Now `Pipeline` is
just a thin wrapper around `Pipe` and `runner`.
Add `Config`, a new type to hold a shared pipe configuration that can
be used when creating multiple pipes. `Config` can run a `Stage`
directly, in the usual ways:

* Start(ctx, stage, options...)
* Run(ctx, stage, options...)
* Output(ctx, stage, options...)

It can also create a pre-configured `Pipeline` for you:

* NewPipeline(options...)

But we don't want to allow just any `Option` to be included in a
`Config`. For example, it makes no sense to use `WithStdin()` for a
`Config`, because stdin can't be reused for multiple pipes. So a big
change to make this possible is splitting up the concept of `Option`
into two concepts:

* `ConfigOption`: — an option that can be applied to a `Config` _or_
  at start time.

* `Option`: — an option that can be applied _only_ at start time.
  Currently these are `WithStdin()`, `WithStdout()`, and
  `WithStdoutCloser()`.

This involves some boilerplate, but means that the client code to use
a `Pipeline` doesn't have to change unless/until it wants to use the
new features.
Consumers, not producers, should define interfaces. This one is not
used anywhere in the library so it shouldn't be defined here.

But you might notice that `Config.NewPipeline()` satisfies what used
to be called the `NewPipeFn` interface. This is no accident; `Config`
is meant to be a better replacement for `NewPipeFn`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant