diff --git a/git/cmd.py b/git/cmd.py index 92ca09c2a..866de76bc 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -649,6 +649,11 @@ class Git(metaclass=_GitMeta): re_unsafe_protocol = re.compile(r"(.+)::.+") + unsafe_git_ls_remote_options = [ + # This option allows arbitrary command execution in git-ls-remote. + "--upload-pack", + ] + def __getstate__(self) -> Dict[str, Any]: return slots_to_dict(self, exclude=self._excluded_) @@ -973,6 +978,16 @@ def check_unsafe_options(cls, options: List[str], unsafe_options: List[str]) -> if unsafe_option is not None: raise UnsafeOptionError(f"{unsafe_option} is not allowed, use `allow_unsafe_options=True` to allow it.") + @classmethod + def _option_candidates(cls, args: Sequence[Any] = (), kwargs: Optional[Mapping[str, Any]] = None) -> List[str]: + """Collect possible option spellings before command-line transformation.""" + options = [ + option for option in cls._unpack_args([arg for arg in args if arg is not None]) if option.startswith("-") + ] + if kwargs: + options.extend(str(key) for key in kwargs) + return options + AutoInterrupt: TypeAlias = _AutoInterrupt CatFileContentStream: TypeAlias = _CatFileContentStream @@ -1030,6 +1045,22 @@ def set_persistent_git_options(self, **kwargs: Any) -> None: self._persistent_git_options = self.transform_kwargs(split_single_char_options=True, **kwargs) + def ls_remote( + self, + *args: Any, + allow_unsafe_options: bool = False, + **kwargs: Any, + ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: + """List references in a remote repository. + + :param allow_unsafe_options: + Allow unsafe options, like ``--upload-pack``. + """ + if not allow_unsafe_options: + candidate_options = self._option_candidates(args, kwargs) + Git.check_unsafe_options(options=candidate_options, unsafe_options=self.unsafe_git_ls_remote_options) + return self._call_process("ls_remote", *args, **kwargs) + @property def working_dir(self) -> Union[None, PathLike]: """:return: Git directory we are working on""" @@ -1536,7 +1567,7 @@ def transform_kwargs(self, split_single_char_options: bool = True, **kwargs: Any return args @classmethod - def _unpack_args(cls, arg_list: Sequence[str]) -> List[str]: + def _unpack_args(cls, arg_list: Sequence[Any]) -> List[str]: outlist = [] if isinstance(arg_list, (list, tuple)): for arg in arg_list: diff --git a/git/objects/commit.py b/git/objects/commit.py index da7677ee0..eae019ef9 100644 --- a/git/objects/commit.py +++ b/git/objects/commit.py @@ -86,6 +86,13 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # INVARIANTS default_encoding = "UTF-8" + unsafe_git_rev_options = [ + # This option can truncate or write arbitrary files before revision parsing. + "--output", + "-o", + ] + """Options to :manpage:`git-rev-list(1)` that can overwrite files.""" + type: Literal["commit"] = "commit" __slots__ = ( @@ -302,6 +309,7 @@ def iter_items( repo: "Repo", rev: Union[str, "Commit", "SymbolicReference"], paths: Union[PathLike, Sequence[PathLike]] = "", + allow_unsafe_options: bool = False, **kwargs: Any, ) -> Iterator["Commit"]: R"""Find all commits matching the given criteria. @@ -330,6 +338,11 @@ def iter_items( raise ValueError("--pretty cannot be used as parsing expects single sha's only") # END handle pretty + if not allow_unsafe_options: + Git.check_unsafe_options( + options=Git._option_candidates([rev], kwargs), unsafe_options=cls.unsafe_git_rev_options + ) + # Use -- in all cases, to prevent possibility of ambiguous arguments. # See https://github.com/gitpython-developers/GitPython/issues/264. diff --git a/git/remote.py b/git/remote.py index 18d4829af..0c3dbfe15 100644 --- a/git/remote.py +++ b/git/remote.py @@ -1071,7 +1071,10 @@ def fetch( Git.check_unsafe_protocols(ref) if not allow_unsafe_options: - Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_fetch_options) + Git.check_unsafe_options( + options=Git._option_candidates([], kwargs), + unsafe_options=self.unsafe_git_fetch_options, + ) proc = self.repo.git.fetch( "--", self, *args, as_process=True, with_stdout=False, universal_newlines=True, v=verbose, **kwargs @@ -1125,7 +1128,10 @@ def pull( Git.check_unsafe_protocols(ref) if not allow_unsafe_options: - Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_pull_options) + Git.check_unsafe_options( + options=Git._option_candidates([], kwargs), + unsafe_options=self.unsafe_git_pull_options, + ) proc = self.repo.git.pull( "--", self, refspec, with_stdout=False, as_process=True, universal_newlines=True, v=True, **kwargs @@ -1198,7 +1204,10 @@ def push( Git.check_unsafe_protocols(ref) if not allow_unsafe_options: - Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_push_options) + Git.check_unsafe_options( + options=Git._option_candidates([], kwargs), + unsafe_options=self.unsafe_git_push_options, + ) proc = self.repo.git.push( "--", diff --git a/git/repo/base.py b/git/repo/base.py index 2d3cf24f0..58d894988 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -161,6 +161,20 @@ class Repo: https://git-scm.com/docs/git-clone#Documentation/git-clone.txt---configltkeygtltvaluegt """ + unsafe_git_archive_options = [ + # Allows arbitrary command execution through the remote git-upload-archive command. + "--exec", + # Writes output to a caller-controlled filesystem path. + "--output", + "-o", + ] + + unsafe_git_revision_options = [ + # This option allows output to be written to arbitrary files before revision parsing. + "--output", + "-o", + ] + # Invariants config_level: ConfigLevels_Tup = ("system", "user", "global", "repository") """Represents the configuration level of a configuration file.""" @@ -775,6 +789,7 @@ def iter_commits( self, rev: Union[str, Commit, "SymbolicReference", None] = None, paths: Union[PathLike, Sequence[PathLike]] = "", + allow_unsafe_options: bool = False, **kwargs: Any, ) -> Iterator[Commit]: """An iterator of :class:`~git.objects.commit.Commit` objects representing the @@ -792,6 +807,9 @@ def iter_commits( Arguments to be passed to :manpage:`git-rev-list(1)`. Common ones are ``max_count`` and ``skip``. + :param allow_unsafe_options: + Allow unsafe options in the revision argument, like ``--output``. + :note: To receive only commits between two named revisions, use the ``"revA...revB"`` revision specifier. @@ -802,7 +820,18 @@ def iter_commits( if rev is None: rev = self.head.commit - return Commit.iter_items(self, rev, paths, **kwargs) + if not allow_unsafe_options: + Git.check_unsafe_options( + options=Git._option_candidates([rev], kwargs), unsafe_options=self.unsafe_git_revision_options + ) + + return Commit.iter_items( + self, + rev, + paths, + allow_unsafe_options=allow_unsafe_options, + **kwargs, + ) def merge_base(self, *rev: TBD, **kwargs: Any) -> List[Commit]: R"""Find the closest common ancestor for the given revision @@ -1079,7 +1108,9 @@ def active_branch(self) -> Head: ) return active_branch - def blame_incremental(self, rev: str | HEAD | None, file: str, **kwargs: Any) -> Iterator["BlameEntry"]: + def blame_incremental( + self, rev: str | HEAD | None, file: str, allow_unsafe_options: bool = False, **kwargs: Any + ) -> Iterator["BlameEntry"]: """Iterator for blame information for the given file at the given revision. Unlike :meth:`blame`, this does not return the actual file's contents, only a @@ -1090,6 +1121,9 @@ def blame_incremental(self, rev: str | HEAD | None, file: str, **kwargs: Any) -> uncommitted changes. Otherwise, anything successfully parsed by :manpage:`git-rev-parse(1)` is a valid option. + :param allow_unsafe_options: + Allow unsafe options in revision argument, like ``--output``. + :return: Lazy iterator of :class:`BlameEntry` tuples, where the commit indicates the commit to blame for the line, and range indicates a span of line numbers in @@ -1098,6 +1132,10 @@ def blame_incremental(self, rev: str | HEAD | None, file: str, **kwargs: Any) -> If you combine all line number ranges outputted by this command, you should get a continuous range spanning all line numbers in the file. """ + if not allow_unsafe_options: + Git.check_unsafe_options( + options=Git._option_candidates([rev], kwargs), unsafe_options=self.unsafe_git_revision_options + ) data: bytes = self.git.blame(rev, "--", file, p=True, incremental=True, stdout_as_string=False, **kwargs) commits: Dict[bytes, Commit] = {} @@ -1176,7 +1214,8 @@ def blame( rev: Union[str, HEAD, None], file: str, incremental: bool = False, - rev_opts: Optional[List[str]] = None, + rev_opts: Optional[Sequence[str]] = None, + allow_unsafe_options: bool = False, **kwargs: Any, ) -> List[List[Commit | List[str | bytes] | None]] | Iterator[BlameEntry] | None: """The blame information for the given file at the given revision. @@ -1186,6 +1225,9 @@ def blame( uncommitted changes. Otherwise, anything successfully parsed by :manpage:`git-rev-parse(1)` is a valid option. + :param allow_unsafe_options: + Allow unsafe options in revision argument, like ``--output``. + :return: list: [git.Commit, list: []] @@ -1195,9 +1237,14 @@ def blame( appearance. """ if incremental: - return self.blame_incremental(rev, file, **kwargs) - rev_opts = rev_opts or [] - data: bytes = self.git.blame(rev, *rev_opts, "--", file, p=True, stdout_as_string=False, **kwargs) + return self.blame_incremental(rev, file, allow_unsafe_options=allow_unsafe_options, **kwargs) + rev_opts_list = list(rev_opts or []) + if not allow_unsafe_options: + Git.check_unsafe_options( + options=Git._option_candidates([rev, rev_opts_list], kwargs), + unsafe_options=self.unsafe_git_revision_options, + ) + data: bytes = self.git.blame(rev, *rev_opts_list, "--", file, p=True, stdout_as_string=False, **kwargs) commits: Dict[str, Commit] = {} blames: List[List[Commit | List[str | bytes] | None]] = [] @@ -1408,7 +1455,10 @@ def _clone( if not allow_unsafe_protocols: Git.check_unsafe_protocols(url) if not allow_unsafe_options: - Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=cls.unsafe_git_clone_options) + Git.check_unsafe_options( + options=Git._option_candidates([], kwargs), + unsafe_options=cls.unsafe_git_clone_options, + ) if not allow_unsafe_options and multi: Git.check_unsafe_options(options=multi, unsafe_options=cls.unsafe_git_clone_options) @@ -1583,6 +1633,8 @@ def archive( ostream: Union[TextIO, BinaryIO], treeish: Optional[str] = None, prefix: Optional[str] = None, + allow_unsafe_options: bool = False, + allow_unsafe_protocols: bool = False, **kwargs: Any, ) -> Repo: """Archive the tree at the given revision. @@ -1605,6 +1657,12 @@ def archive( repository-relative path to a directory or file to place into the archive, or a list or tuple of multiple paths. + :param allow_unsafe_options: + Allow unsafe options, like ``--exec`` or ``--output``. + + :param allow_unsafe_protocols: + Allow unsafe protocols to be used in ``remote``, like ``ext``. + :raise git.exc.GitCommandError: If something went wrong. @@ -1615,6 +1673,14 @@ def archive( treeish = self.head.commit if prefix and "prefix" not in kwargs: kwargs["prefix"] = prefix + remote = kwargs.get("remote") + if not allow_unsafe_protocols and remote: + Git.check_unsafe_protocols(str(remote)) + if not allow_unsafe_options: + Git.check_unsafe_options( + options=Git._option_candidates([], kwargs), + unsafe_options=self.unsafe_git_archive_options, + ) kwargs["output_stream"] = ostream path = kwargs.pop("path", []) path = cast(Union[PathLike, List[PathLike], Tuple[PathLike, ...]], path) diff --git a/test/test_commit.py b/test/test_commit.py index b56ad3a18..74b7078f5 100644 --- a/test/test_commit.py +++ b/test/test_commit.py @@ -6,6 +6,7 @@ import copy from datetime import datetime from io import BytesIO +import tempfile import os.path as osp import re import sys @@ -15,6 +16,7 @@ from gitdb import IStream from git import Actor, Commit, Repo +from git.exc import UnsafeOptionError from git.objects.util import tzoffset, utc from git.repo.fun import touch @@ -288,6 +290,17 @@ def test_iter_items(self): # pretty not allowed. self.assertRaises(ValueError, Commit.iter_items, self.rorepo, "master", pretty="raw") + def test_iter_items_rejects_unsafe_revision(self): + with tempfile.TemporaryDirectory() as tdir: + marker = osp.join(tdir, "pwn") + self.assertRaises(UnsafeOptionError, Commit.iter_items, self.rorepo, f"--output={marker}") + + def test_iter_items_rejects_unsafe_options(self): + with tempfile.TemporaryDirectory() as tdir: + marker = osp.join(tdir, "pwn") + with self.assertRaises(UnsafeOptionError): + list(Commit.iter_items(self.rorepo, "HEAD", output=marker)) + def test_rev_list_bisect_all(self): """ 'git rev-list --bisect-all' returns additional information diff --git a/test/test_remote.py b/test/test_remote.py index 2230c8df4..379e1e2ac 100644 --- a/test/test_remote.py +++ b/test/test_remote.py @@ -9,7 +9,7 @@ import random import sys import tempfile -from unittest import skipIf +from unittest import mock, skipIf import pytest @@ -1007,6 +1007,37 @@ def test_push_unsafe_options_allowed(self, rw_repo): assert tmp_file.exists() tmp_file.unlink() + @with_rw_repo("HEAD") + def test_ls_remote_unsafe_options(self, rw_repo): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}, {"upload_pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + rw_repo.git.ls_remote(".", **unsafe_option) + with self.assertRaises(UnsafeOptionError): + rw_repo.git.ls_remote([f"--upload-pack={tmp_file}"], ".") + with self.assertRaises(UnsafeOptionError): + rw_repo.git.ls_remote(f"--upload-pack={tmp_file}", ".") + with self.assertRaises(UnsafeOptionError): + rw_repo.git.ls_remote("--upload-pack", "touch", ".") + + def test_ls_remote_allows_operand_named_like_unsafe_option(self): + with mock.patch.object(Git, "_call_process") as git: + Git().ls_remote("upload-pack") + git.assert_called_once() + + @with_rw_repo("HEAD") + def test_ls_remote_unsafe_options_allowed(self, rw_repo): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + with self.assertRaises(GitCommandError): + rw_repo.git.ls_remote(".", **unsafe_option, allow_unsafe_options=True) + @with_rw_and_rw_remote_repo("0.1.6") def test_fetch_unsafe_branch_name(self, rw_repo, remote_repo): # Create branch with a name containing a NBSP diff --git a/test/test_repo.py b/test/test_repo.py index d2dd1ea5d..0820d6eab 100644 --- a/test/test_repo.py +++ b/test/test_repo.py @@ -37,6 +37,8 @@ Submodule, Tree, ) +from git.exc import UnsafeOptionError +from git.exc import UnsafeProtocolError from git.exc import BadObject from git.repo.fun import touch from git.util import bin_to_hex, cwd, cygpath, join_path_native, rmfile, rmtree @@ -422,6 +424,43 @@ def test_archive(self): assert stream.tell() os.remove(stream.name) # Do it this way so we can inspect the file on failure. + def test_archive_rejects_unsafe_options(self): + with tempfile.TemporaryDirectory() as tdir: + output_marker = osp.join(tdir, "pwn") + with self.assertRaises(UnsafeOptionError): + self.rorepo.archive(io.BytesIO(), "0.1.6", exec=f"touch {output_marker}") + assert not osp.exists(output_marker) + with self.assertRaises(UnsafeOptionError): + self.rorepo.archive(io.BytesIO(), "0.1.6", output=output_marker) + assert not osp.exists(output_marker) + + def test_archive_rejects_unsafe_remote_protocol(self): + with tempfile.TemporaryDirectory() as tdir: + output_marker = osp.join(tdir, "pwn") + with self.assertRaises(UnsafeProtocolError): + self.rorepo.archive(io.BytesIO(), "HEAD", remote=f"ext::sh -c touch% {output_marker}") + assert not osp.exists(output_marker) + + def test_archive_preserves_positional_allow_unsafe_options(self): + with mock.patch.object(Git, "_call_process") as git: + self.rorepo.archive(io.BytesIO(), "HEAD", None, True, exec="git-upload-archive") + git.assert_called_once() + + def test_archive_accepts_stringifiable_remote(self): + class StringifiableRemote: + def __str__(self): + return "origin" + + with mock.patch.object(Git, "_call_process") as git: + self.rorepo.archive(io.BytesIO(), "HEAD", remote=StringifiableRemote()) + git.assert_called_once() + + def test_iter_commits_rejects_unsafe_revision(self): + with tempfile.TemporaryDirectory() as tdir: + target = osp.join(tdir, "pwn") + with self.assertRaises(UnsafeOptionError): + list(self.rorepo.iter_commits(f"--output={target}", max_count=1)) + @mock.patch.object(Git, "_call_process") def test_should_display_blame_information(self, git): git.return_value = fixture("blame") @@ -471,6 +510,27 @@ def test_blame_real(self): assert c, "Should have executed at least one blame command" assert nml, "There should at least be one blame commit that contains multiple lines" + def test_blame_rejects_unsafe_revision(self): + with tempfile.TemporaryDirectory() as tdir: + output_marker = osp.join(tdir, "pwn") + with self.assertRaises(UnsafeOptionError): + self.rorepo.blame(f"--output={output_marker}", "README.md") + assert not osp.exists(output_marker) + + def test_blame_rejects_unsafe_options(self): + with tempfile.TemporaryDirectory() as tdir: + output_marker = osp.join(tdir, "pwn") + with self.assertRaises(UnsafeOptionError): + self.rorepo.blame("HEAD", "README.md", output=output_marker) + assert not osp.exists(output_marker) + + def test_blame_rejects_unsafe_rev_opts(self): + with tempfile.TemporaryDirectory() as tdir: + output_marker = osp.join(tdir, "pwn") + with self.assertRaises(UnsafeOptionError): + self.rorepo.blame("HEAD", "README.md", rev_opts=(f"--output={output_marker}",)) + assert not osp.exists(output_marker) + @mock.patch.object(Git, "_call_process") def test_blame_incremental(self, git): # Loop over two fixtures, create a test fixture for 2.11.1+ syntax.