diff --git a/src/pycalver/vcs.py b/src/pycalver/vcs.py index 23e208a..df12ca9 100644 --- a/src/pycalver/vcs.py +++ b/src/pycalver/vcs.py @@ -9,7 +9,6 @@ # Copyright (c) 2013-2014 Filip Noetzel - MIT License import os -import sys import logging import tempfile import typing as typ @@ -18,106 +17,108 @@ import subprocess as sp log = logging.getLogger("pycalver.vcs") +VCS_SUBCOMMANDS_BY_NAME = { + 'git': { + 'is_usable': "git rev-parse --git-dir", + 'fetch' : "git fetch", + 'push_tag' : "git push {tag}", + 'commit' : "git commit --file {path}", + 'status' : "git status --porcelain", + 'tag' : "git tag --annotate {version} --message '{version}'", + 'add_path' : "git add --update {path}", + 'ls_tags' : "git tag --list v*", + }, + 'hg': { + 'is_usable': "hg root", + 'fetch' : "hg pull", + 'push_tag' : "hg push {tag}", + 'commit' : "hg commit --logfile", + 'status' : "hg status -mard", + 'tag' : "hg tag {version} --message '{version}'", + 'add_path' : "hg add {path}", + 'ls_tags' : "hg tags", + }, +} -class BaseVCS: - _TEST_USABLE_COMMAND: typ.List[str] - _COMMIT_COMMAND : typ.List[str] - _STATUS_COMMAND : typ.List[str] +class VCS: + """Version Control System absraction for git and mercurial""" - @classmethod - def commit(cls, message: str) -> None: - message_data = message.encode("utf-8") + def __init__(self, name: str, subcommands: typ.Dict[str, str] = None): + self.name = name + if subcommands is None: + self.subcommands = VCS_SUBCOMMANDS_BY_NAME[name] + else: + self.subcommands = subcommands - tmp_file = tempfile.NamedTemporaryFile("wb", delete=False) + def __call__(self, cmd_name: str, env=None, **kwargs: str) -> bytes: + cmd_str = self.subcommands[cmd_name] + cmd_parts = cmd_str.format(**kwargs).split() + return sp.check_output(cmd_parts, env=env) - with tmp_file as fh: - fh.write(message_data) + @property + def is_usable(self) -> bool: + cmd = self.subcommands['is_usable'].split() - cmd = cls._COMMIT_COMMAND + [tmp_file.name] - env = os.environ.copy() - # TODO (mb 2018-09-04): check that this works on py27, - # might need to be bytes there, idk. - env['HGENCODING'] = "utf-8" - sp.check_output(cmd, env=env) - os.unlink(tmp_file.name) - - @classmethod - def is_usable(cls) -> bool: try: - return sp.call(cls._TEST_USABLE_COMMAND, stderr=sp.PIPE, stdout=sp.PIPE) == 0 + retcode = sp.call(cmd, stderr=sp.PIPE, stdout=sp.PIPE) + return retcode == 0 except OSError as e: if e.errno == 2: - # mercurial is not installed then, ok. + # git/mercurial is not installed. return False raise - @classmethod - def dirty_files(cls) -> typ.List[str]: - status_output = sp.check_output(cls._STATUS_COMMAND) + def status(self) -> typ.List[str]: + status_output = self('status') return [ line.decode("utf-8")[2:].strip() for line in status_output.splitlines() if not line.strip().startswith(b"??") ] - @classmethod - def assert_not_dirty(cls, filepaths: typ.Set[str], allow_dirty=False) -> None: - dirty_files = cls.dirty_files() + def fetch(self) -> None: + self('fetch') - if dirty_files: - log.warn(f"{cls.__name__} working directory is not clean:") - for dirty_file in dirty_files: - log.warn(" " + dirty_file) + def add(self, path) -> None: + log.info(f"{self.name} add {path}") + self('add', path=path) - if not allow_dirty and dirty_files: - sys.exit(1) + def commit(self, message: str) -> None: + log.info(f"{self.name} commit -m '{message}'") + message_data = message.encode("utf-8") - dirty_pattern_files = set(dirty_files) & filepaths - if dirty_pattern_files: - log.error("Not commiting when pattern files are dirty:") - for dirty_file in dirty_pattern_files: - log.warn(" " + dirty_file) - sys.exit(1) + tmp_file = tempfile.NamedTemporaryFile("wb", delete=False) + assert " " not in tmp_file + + with tmp_file as fh: + fh.write(message_data) + + env = os.environ.copy() + # TODO (mb 2018-09-04): check that this works on py27, + # might need to be bytes there, idk. + env['HGENCODING'] = "utf-8" + self('commit', env=env, path=tmp_file.name) + os.unlink(tmp_file.name) + + def tag(self, name) -> None: + self('tag', name=name) + + def ls_tags(self) -> typ.List[str]: + ls_tag_lines = self('ls_tags').splitlines() + log.debug(f"ls_tags output {ls_tag_lines}") + return [ + line.decode("utf-8").strip() for line in ls_tag_lines if line.strip().startswith(b"v") + ] + + def __repr__(self) -> str: + return f"VCS(name='{self.name}')" -class Git(BaseVCS): - - _TEST_USABLE_COMMAND = ["git", "rev-parse", '--git-dir'] - _COMMIT_COMMAND = ["git", "commit" , '-F'] - _STATUS_COMMAND = ["git", "status" , '--porcelain'] - - @classmethod - def tag(cls, name): - sp.check_output(["git", "tag", name]) - - @classmethod - def add_path(cls, path): - sp.check_output(["git", "add", "--update", path]) - - -class Mercurial(BaseVCS): - - _TEST_USABLE_COMMAND = ["hg", 'root'] - _COMMIT_COMMAND = ["hg", "commit", '--logfile'] - _STATUS_COMMAND = ["hg", "status", '-mard'] - - @classmethod - def tag(cls, name): - sp.check_output(["hg", "tag", name]) - - @classmethod - def add_path(cls, path): - pass - - -VCS = [Git, Mercurial] - - -def get_vcs() -> typ.Type[BaseVCS]: - """Get appropriate sub""" - for vcs in VCS: - if vcs.is_usable(): +def get_vcs() -> VCS: + for vcs_name in VCS_SUBCOMMANDS_BY_NAME.keys(): + vcs = VCS(name=vcs_name) + if vcs.is_usable: return vcs - raise OSError("No such directory .hg/ or .git/ ") + raise OSError("No such directory .git/ or .hg/ ")