bumpver/src/pycalver/vcs.py
2020-09-18 17:50:13 +00:00

240 lines
7.1 KiB
Python

# This file is part of the pycalver project
# https://github.com/mbarkhau/pycalver
#
# Copyright (c) 2018-2020 Manuel Barkhau (mbarkhau@gmail.com) - MIT License
# SPDX-License-Identifier: MIT
#
# pycalver/vcs.py (this file) is based on code from the
# bumpversion project: https://github.com/peritus/bumpversion
# Copyright (c) 2013-2014 Filip Noetzel - MIT License
"""Minimal Git and Mercirial API.
If terminology for similar concepts differs between git and
mercurial, then the git terms are used. For example "fetch"
(git) instead of "pull" (hg) .
"""
import os
import sys
import typing as typ
import logging
import tempfile
import subprocess as sp
from pycalver import config
logger = logging.getLogger("pycalver.vcs")
VCS_SUBCOMMANDS_BY_NAME = {
'git': {
'is_usable' : "git rev-parse --git-dir",
'fetch' : "git fetch",
'ls_tags' : "git tag --list",
'status' : "git status --porcelain",
'add_path' : "git add --update {path}",
'commit' : "git commit --file {path}",
'tag' : "git tag --annotate {tag} --message {tag}",
'push_tag' : "git push origin --follow-tags {tag}",
'show_remotes': "git config --get remote.origin.url",
},
'hg': {
'is_usable' : "hg root",
'fetch' : "hg pull",
'ls_tags' : "hg tags",
'status' : "hg status -umard",
'add_path' : "hg add {path}",
'commit' : "hg commit --logfile {path}",
'tag' : "hg tag {tag} --message {tag}",
'push_tag' : "hg push {tag}",
'show_remotes': "hg paths",
},
}
Env = typ.Dict[str, str]
class VCSAPI:
"""Absraction for git and mercurial."""
def __init__(self, name: str, subcommands: typ.Dict[str, str] = None):
self.name = name
if subcommands is None:
self.subcommands = VCS_SUBCOMMANDS_BY_NAME[name]
else:
self.subcommands = subcommands
def __call__(self, cmd_name: str, env: Env = None, **kwargs: str) -> str:
"""Invoke subcommand and return output."""
cmd_tmpl = self.subcommands[cmd_name]
cmd_str = cmd_tmpl.format(**kwargs)
if cmd_name in ("commit", "tag", "push_tag"):
logger.info(cmd_str)
else:
logger.debug(cmd_str)
output_data: bytes = sp.check_output(cmd_str.split(), env=env, stderr=sp.STDOUT)
# TODO (mb 2018-11-15): Detect encoding of output? Use chardet?
_encoding = "utf-8"
return output_data.decode(_encoding)
@property
def is_usable(self) -> bool:
"""Detect availability of subcommand."""
if not os.path.exists(f".{self.name}"):
return False
cmd = self.subcommands['is_usable'].split()
try:
retcode = sp.call(cmd, stderr=sp.PIPE, stdout=sp.PIPE)
return retcode == 0
except OSError as err:
if err.errno == 2:
# git/mercurial is not installed.
return False
else:
raise
@property
def has_remote(self) -> bool:
# pylint:disable=broad-except; Not sure how to anticipate all cases.
try:
output = self('show_remotes')
if output.strip() == "":
return False
else:
return True
except Exception:
return False
def fetch(self) -> None:
"""Fetch updates from remote origin."""
if self.has_remote:
self('fetch')
def status(self, required_files: typ.Set[str]) -> typ.List[str]:
"""Get status lines."""
status_output = self('status')
status_items = [line.split(" ", 1) for line in status_output.splitlines()]
return [
filepath.strip()
for status, filepath in status_items
if filepath.strip() in required_files or status != "??"
]
def ls_tags(self) -> typ.List[str]:
"""List vcs tags on all branches."""
ls_tag_lines = self('ls_tags').splitlines()
logger.debug(f"ls_tags output {ls_tag_lines}")
return [line.strip().split(" ", 1)[0] for line in ls_tag_lines]
def add(self, path: str) -> None:
"""Add updates to be included in next commit."""
try:
self('add_path', path=path)
except sp.CalledProcessError as ex:
if "already tracked!" in str(ex):
# mercurial
return
else:
raise
def commit(self, message: str) -> None:
"""Commit added files."""
message_data = message.encode("utf-8")
tmp_file = tempfile.NamedTemporaryFile("wb", delete=False)
assert " " not in tmp_file.name
fobj: typ.IO[bytes]
with tmp_file as fobj:
fobj.write(message_data)
env: Env = os.environ.copy()
env['HGENCODING'] = "utf-8"
self('commit', env=env, path=tmp_file.name)
os.unlink(tmp_file.name)
def tag(self, tag_name: str) -> None:
"""Create an annotated tag."""
self('tag', tag=tag_name)
def push(self, tag_name: str) -> None:
"""Push changes to origin."""
if self.has_remote:
self('push_tag', tag=tag_name)
def __repr__(self) -> str:
"""Generate string representation."""
return f"VCSAPI(name='{self.name}')"
def get_vcs_api() -> VCSAPI:
"""Detect the appropriate VCS for a repository.
raises OSError if the directory doesn't use a supported VCS.
"""
for vcs_name in VCS_SUBCOMMANDS_BY_NAME:
vcs_api = VCSAPI(name=vcs_name)
if vcs_api.is_usable:
return vcs_api
raise OSError("No such directory .git/ or .hg/ ")
# cli helper methods
def assert_not_dirty(vcs_api: VCSAPI, filepaths: typ.Set[str], allow_dirty: bool) -> None:
dirty_files = vcs_api.status(required_files=filepaths)
if dirty_files:
logger.warning(f"{vcs_api.name} working directory is not clean. Uncomitted file(s):")
for dirty_file in dirty_files:
logger.warning(" " + dirty_file)
if not allow_dirty and dirty_files:
sys.exit(1)
dirty_pattern_files = set(dirty_files) & filepaths
if dirty_pattern_files:
logger.error("Not commiting when pattern files are dirty:")
for dirty_file in dirty_pattern_files:
logger.warning(" " + dirty_file)
sys.exit(1)
def commit(
cfg : config.Config,
vcs_api : VCSAPI,
filepaths : typ.Set[str],
new_version : str,
commit_message: str,
) -> None:
for filepath in filepaths:
vcs_api.add(filepath)
vcs_api.commit(commit_message)
if cfg.commit and cfg.tag:
vcs_api.tag(new_version)
if cfg.commit and cfg.tag and cfg.push:
vcs_api.push(new_version)
def get_tags(fetch: bool) -> typ.List[str]:
try:
vcs_api = get_vcs_api()
logger.debug(f"vcs found: {vcs_api.name}")
if fetch:
logger.info("fetching tags from remote (to turn off use: -n / --no-fetch)")
vcs_api.fetch()
return vcs_api.ls_tags()
except OSError:
logger.debug("No vcs found")
return []