-
Notifications
You must be signed in to change notification settings - Fork 9.9k
Expand file tree
/
Copy path__init__.py
More file actions
99 lines (88 loc) · 3.71 KB
/
Copy path__init__.py
File metadata and controls
99 lines (88 loc) · 3.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""Shell step — run a local shell command."""
from __future__ import annotations
import json
import subprocess
from typing import Any
from specify_cli.workflows.base import StepBase, StepContext, StepResult, StepStatus
from specify_cli.workflows.expressions import evaluate_expression
class ShellStep(StepBase):
"""Run a local shell command (non-agent).
Captures exit code and stdout/stderr.
"""
type_key = "shell"
def execute(self, config: dict[str, Any], context: StepContext) -> StepResult:
run_cmd = config.get("run", "")
if isinstance(run_cmd, str) and "{{" in run_cmd:
run_cmd = evaluate_expression(run_cmd, context)
run_cmd = str(run_cmd)
cwd = context.project_root or "."
# NOTE: shell=True is required to support pipes, redirects, and
# multi-command expressions in workflow YAML. Workflow authors
# control commands; catalog-installed workflows should be reviewed
# before use (see PUBLISHING.md for security guidance).
try:
proc = subprocess.run(
run_cmd,
shell=True,
capture_output=True,
text=True,
cwd=cwd,
timeout=300,
)
output = {
"exit_code": proc.returncode,
"stdout": proc.stdout,
"stderr": proc.stderr,
}
if proc.returncode != 0:
return StepResult(
status=StepStatus.FAILED,
error=f"Shell command exited with code {proc.returncode}.",
output=output,
)
if config.get("output_format") == "json":
# Opt-in structured output: expose the parsed stdout under
# ``output.data`` so later steps can consume typed values
# (e.g. a fan-out's ``items:``). A parse failure fails the
# step — declaring ``output_format: json`` is a contract.
try:
output["data"] = json.loads(proc.stdout)
except json.JSONDecodeError as exc:
return StepResult(
status=StepStatus.FAILED,
error=(
f"Shell step {config.get('id', '?')!r} declared "
f"output_format: json but stdout is not valid "
f"JSON: {exc}"
),
output=output,
)
return StepResult(
status=StepStatus.COMPLETED,
output=output,
)
except subprocess.TimeoutExpired:
return StepResult(
status=StepStatus.FAILED,
error="Shell command timed out after 300 seconds.",
output={"exit_code": -1, "stdout": "", "stderr": "timeout"},
)
except OSError as exc:
return StepResult(
status=StepStatus.FAILED,
error=f"Shell command failed: {exc}",
output={"exit_code": -1, "stdout": "", "stderr": str(exc)},
)
def validate(self, config: dict[str, Any]) -> list[str]:
errors = super().validate(config)
if "run" not in config:
errors.append(
f"Shell step {config.get('id', '?')!r} is missing 'run' field."
)
output_format = config.get("output_format")
if output_format is not None and output_format != "json":
errors.append(
f"Shell step {config.get('id', '?')!r}: 'output_format' must "
f"be 'json' when present, got {output_format!r}."
)
return errors