refactor vcs

This commit is contained in:
Manuel Barkhau 2018-11-11 15:07:46 +01:00
parent ba46c151e6
commit 120f08d83b

View file

@ -9,7 +9,6 @@
# Copyright (c) 2013-2014 Filip Noetzel - MIT License
import os
import sys
import logging
import tempfile
import typing as typ
@ -18,106 +17,108 @@ import subprocess as sp
log = logging.getLogger("pycalver.vcs")
VCS_SUBCOMMANDS_BY_NAME = {
'git': {
'is_usable': "git rev-parse --git-dir",
'fetch' : "git fetch",
'push_tag' : "git push {tag}",
'commit' : "git commit --file {path}",
'status' : "git status --porcelain",
'tag' : "git tag --annotate {version} --message '{version}'",
'add_path' : "git add --update {path}",
'ls_tags' : "git tag --list v*",
},
'hg': {
'is_usable': "hg root",
'fetch' : "hg pull",
'push_tag' : "hg push {tag}",
'commit' : "hg commit --logfile",
'status' : "hg status -mard",
'tag' : "hg tag {version} --message '{version}'",
'add_path' : "hg add {path}",
'ls_tags' : "hg tags",
},
}
class BaseVCS:
_TEST_USABLE_COMMAND: typ.List[str]
_COMMIT_COMMAND : typ.List[str]
_STATUS_COMMAND : typ.List[str]
class VCS:
"""Version Control System absraction for git and mercurial"""
@classmethod
def commit(cls, message: str) -> None:
message_data = message.encode("utf-8")
def __init__(self, name: str, subcommands: typ.Dict[str, str] = None):
self.name = name
if subcommands is None:
self.subcommands = VCS_SUBCOMMANDS_BY_NAME[name]
else:
self.subcommands = subcommands
tmp_file = tempfile.NamedTemporaryFile("wb", delete=False)
def __call__(self, cmd_name: str, env=None, **kwargs: str) -> bytes:
cmd_str = self.subcommands[cmd_name]
cmd_parts = cmd_str.format(**kwargs).split()
return sp.check_output(cmd_parts, env=env)
with tmp_file as fh:
fh.write(message_data)
@property
def is_usable(self) -> bool:
cmd = self.subcommands['is_usable'].split()
cmd = cls._COMMIT_COMMAND + [tmp_file.name]
env = os.environ.copy()
# TODO (mb 2018-09-04): check that this works on py27,
# might need to be bytes there, idk.
env['HGENCODING'] = "utf-8"
sp.check_output(cmd, env=env)
os.unlink(tmp_file.name)
@classmethod
def is_usable(cls) -> bool:
try:
return sp.call(cls._TEST_USABLE_COMMAND, stderr=sp.PIPE, stdout=sp.PIPE) == 0
retcode = sp.call(cmd, stderr=sp.PIPE, stdout=sp.PIPE)
return retcode == 0
except OSError as e:
if e.errno == 2:
# mercurial is not installed then, ok.
# git/mercurial is not installed.
return False
raise
@classmethod
def dirty_files(cls) -> typ.List[str]:
status_output = sp.check_output(cls._STATUS_COMMAND)
def status(self) -> typ.List[str]:
status_output = self('status')
return [
line.decode("utf-8")[2:].strip()
for line in status_output.splitlines()
if not line.strip().startswith(b"??")
]
@classmethod
def assert_not_dirty(cls, filepaths: typ.Set[str], allow_dirty=False) -> None:
dirty_files = cls.dirty_files()
def fetch(self) -> None:
self('fetch')
if dirty_files:
log.warn(f"{cls.__name__} working directory is not clean:")
for dirty_file in dirty_files:
log.warn(" " + dirty_file)
def add(self, path) -> None:
log.info(f"{self.name} add {path}")
self('add', path=path)
if not allow_dirty and dirty_files:
sys.exit(1)
def commit(self, message: str) -> None:
log.info(f"{self.name} commit -m '{message}'")
message_data = message.encode("utf-8")
dirty_pattern_files = set(dirty_files) & filepaths
if dirty_pattern_files:
log.error("Not commiting when pattern files are dirty:")
for dirty_file in dirty_pattern_files:
log.warn(" " + dirty_file)
sys.exit(1)
tmp_file = tempfile.NamedTemporaryFile("wb", delete=False)
assert " " not in tmp_file
with tmp_file as fh:
fh.write(message_data)
env = os.environ.copy()
# TODO (mb 2018-09-04): check that this works on py27,
# might need to be bytes there, idk.
env['HGENCODING'] = "utf-8"
self('commit', env=env, path=tmp_file.name)
os.unlink(tmp_file.name)
def tag(self, name) -> None:
self('tag', name=name)
def ls_tags(self) -> typ.List[str]:
ls_tag_lines = self('ls_tags').splitlines()
log.debug(f"ls_tags output {ls_tag_lines}")
return [
line.decode("utf-8").strip() for line in ls_tag_lines if line.strip().startswith(b"v")
]
def __repr__(self) -> str:
return f"VCS(name='{self.name}')"
class Git(BaseVCS):
_TEST_USABLE_COMMAND = ["git", "rev-parse", '--git-dir']
_COMMIT_COMMAND = ["git", "commit" , '-F']
_STATUS_COMMAND = ["git", "status" , '--porcelain']
@classmethod
def tag(cls, name):
sp.check_output(["git", "tag", name])
@classmethod
def add_path(cls, path):
sp.check_output(["git", "add", "--update", path])
class Mercurial(BaseVCS):
_TEST_USABLE_COMMAND = ["hg", 'root']
_COMMIT_COMMAND = ["hg", "commit", '--logfile']
_STATUS_COMMAND = ["hg", "status", '-mard']
@classmethod
def tag(cls, name):
sp.check_output(["hg", "tag", name])
@classmethod
def add_path(cls, path):
pass
VCS = [Git, Mercurial]
def get_vcs() -> typ.Type[BaseVCS]:
"""Get appropriate sub"""
for vcs in VCS:
if vcs.is_usable():
def get_vcs() -> VCS:
for vcs_name in VCS_SUBCOMMANDS_BY_NAME.keys():
vcs = VCS(name=vcs_name)
if vcs.is_usable:
return vcs
raise OSError("No such directory .hg/ or .git/ ")
raise OSError("No such directory .git/ or .hg/ ")