osmo-dev/src/gits

476 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
#
# (C) 2018 by Neels Hofmeyr <neels@hofmeyr.de>
# All rights reserved.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import subprocess
import argparse
import os
import shlex
doc = '''gits: conveniently manage several git subdirectories.
Instead of doing the 'cd foo; git status; cd ../bar; git status' dance, this
helps to save your time with: status, fetch, rebase, ...
'''
def error(*msgs):
sys.stderr.write(''.join(msgs))
sys.stderr.write('\n')
exit(1)
def cmd_to_str(cmd):
return ' '.join(shlex.quote(c) for c in cmd)
def git(git_dir, *args, may_fail=False, section_marker=False, show_cmd=True):
sys.stdout.flush()
sys.stderr.flush()
if section_marker:
print('\n===== %s =====' % git_dir)
sys.stdout.flush()
cmd = ['git', '-C', git_dir] + list(args)
if show_cmd:
print('+ %s' % cmd_to_str(cmd))
sys.stdout.flush()
rc = subprocess.call(cmd)
if rc and not may_fail:
error('git returned error! command: git -C %r %s' %
(git_dir, ' '.join(repr(arg) for arg in args)))
def git_output(git_dir, *args):
return subprocess.check_output(['git', '-C', git_dir, ] + list(args), stderr=subprocess.STDOUT).decode('utf-8')
def git_bool(git_dir, *args):
try:
subprocess.check_output(['git', '-C', git_dir, ] + list(args))
return True
except subprocess.CalledProcessError as e:
return False
def safe_branch_name(branch):
if '/' in branch:
return branch
return 'refs/heads/' + branch
def git_branches(git_dir, obj='refs/heads'):
ret = git_output(git_dir, 'for-each-ref', obj, '--format', '%(refname:short)')
return ret.splitlines()
def git_branch_current(git_dir):
ret = git_output(git_dir, 'rev-parse', '--abbrev-ref', 'HEAD').rstrip()
if ret == 'HEAD':
return None
return ret
def git_branch_upstream(git_dir, branch_name='HEAD'):
'''Return an upstream branch name, or an None if there is none.'''
try:
return git_output(git_dir, 'rev-parse', '--abbrev-ref', '%s@{u}' % branch_name).rstrip()
except subprocess.CalledProcessError:
return None
def git_has_modifications(git_dir):
return not git_bool(git_dir, 'diff', '--quiet', 'HEAD')
def git_can_fast_forward(git_dir, branch, branch_upstream):
return git_bool(git_dir, 'merge-base', '--is-ancestor', branch, branch_upstream)
class AheadBehind:
''' Count revisions ahead/behind of the remote branch.
returns: (ahead, behind) (e.g. (0, 5)) '''
def __init__(s, git_dir, local, remote):
s.git_dir = git_dir
s.local = local
s.remote = remote
s.can_ff = False
if not remote:
s.ahead = 0
s.behind = 0
else:
behind_str = git_output(git_dir, 'rev-list', '--count', '%s..%s' % (safe_branch_name(local), remote))
ahead_str = git_output(git_dir, 'rev-list', '--count', '%s..%s' % (remote, safe_branch_name(local)))
s.ahead = int(ahead_str.rstrip())
s.behind = int(behind_str.rstrip())
s.can_ff = s.behind and git_can_fast_forward(git_dir, local, remote)
def is_diverged(s):
return s.ahead and s.behind
def is_behind(s):
return (not s.ahead) and s.behind
def is_ahead(s):
return s.ahead and not s.behind
def is_sync(s):
return s.ahead == 0 and s.behind == 0
def ff(s):
print('fast-forwarding %s to %s...' % (s.local, s.remote))
if git_branch_current(s.git_dir) != s.local:
git(s.git_dir, 'checkout', s.local)
git(s.git_dir, 'merge', s.remote)
def __str__(s):
# Just the branch
if not s.ahead and not s.behind:
return s.local
# Suffix with ahead/behind
ret = s.local + '['
if s.ahead:
ret += '+' + str(s.ahead)
if s.behind:
ret += '|'
if s.behind:
ret += '-' + str(s.behind)
ret += ']'
return ret
def git_branch_summary(git_dir):
'''return a list of strings: [git_dir, branch-info0, branch-info1,...]
infos are are arbitrary strings like "master[-1]"'''
interesting_branch_names = ('master',)
strs = [git_dir, ]
if git_has_modifications(git_dir):
strs.append('MODS')
branch_current = git_branch_current(git_dir)
for branch in git_branches(git_dir):
is_current = (branch == branch_current)
if not is_current and branch not in interesting_branch_names:
continue
ab = AheadBehind(git_dir, branch, git_branch_upstream(git_dir, branch))
if not ab.ahead and not ab.behind and not is_current:
# skip branches that are "not interesting"
continue
# Branch with ahead/behind upstream info ("master[+1|-5]")
strs.append(str(ab))
return strs
def format_summaries(summaries, sep0=' ', sep1=' '):
first_col = max([len(row[0]) for row in summaries])
first_col_fmt = '%' + str(first_col) + 's'
lines = []
for row in summaries:
lines.append('%s%s%s' % (first_col_fmt %
row[0], sep0, sep1.join(row[1:])))
return '\n'.join(lines)
def git_dirs():
dirs = []
for sub in os.listdir():
git_path = os.path.join(sub, '.git')
if not os.path.isdir(git_path):
continue
dirs.append(sub)
if not dirs:
error('No subdirectories found that are git clones')
return list(sorted(dirs))
def print_status():
infos = [git_branch_summary(git_dir) for git_dir in git_dirs()]
print(format_summaries(infos))
def cmd_do(argv):
for git_dir in git_dirs():
git(git_dir, *argv, may_fail=True, section_marker=True)
def cmd_sh(cmd):
if not cmd:
error('which command do you want to run?')
for git_dir in git_dirs():
print('\n===== %s =====' % git_dir)
print('+ %s' % cmd_to_str(cmd))
sys.stdout.flush()
subprocess.call(cmd, cwd=git_dir)
sys.stdout.flush()
sys.stderr.flush()
class SkipThisRepo(Exception):
pass
def ask(git_dir, *question, valid_answers=('*',)):
while True:
print('\n' + '\n '.join(question))
print(' ' + '\n '.join((
's skip this repo',
't show in tig',
'g show in gitk',
)))
answer = sys.stdin.readline().strip()
if answer == 's':
raise SkipThisRepo()
if answer == 't':
subprocess.call(('tig', '--all'), cwd=git_dir)
continue
if answer == 'g':
subprocess.call(('gitk', '--all'), cwd=git_dir)
continue
for v in valid_answers:
if v == answer:
return answer
if v == '*':
return answer
if v == '+' and len(answer):
return answer
def rebase(git_dir):
orig_branch = git_branch_current(git_dir)
if orig_branch is None:
print('Not on a branch: %s' % git_dir)
raise SkipThisRepo()
upstream_branch = git_branch_upstream(git_dir, orig_branch)
print('Checking for rebase of %r onto %r' % (orig_branch, upstream_branch))
if git_has_modifications(git_dir):
do_commit = ask(git_dir, 'Local mods.',
'c commit to this branch',
'<name> commit to new branch',
'<empty> skip this repo')
if not do_commit:
raise SkipThisRepo()
if do_commit == 'c':
git(git_dir, 'commit', '-am', 'wip', may_fail=True)
else:
git(git_dir, 'checkout', '-b', do_commit)
git(git_dir, 'commit', '-am', 'wip', may_fail=True)
git(git_dir, 'checkout', orig_branch)
if git_has_modifications(git_dir):
error('There still are local modifications')
# Missing upstream branch
if not upstream_branch:
do_set_upstream = ask(git_dir, 'there is no upstream branch for %r' % orig_branch,
'<empty> skip',
'p create upstream branch (git push --set-upstream orgin %s)' % orig_branch,
'm checkout master',
valid_answers=('', 'p', 'm'))
if do_set_upstream == 'p':
git(git_dir, 'push', '--set-upstream', 'origin', orig_branch);
upstream_branch = git_branch_upstream(git_dir, orig_branch)
if not upstream_branch:
error('There still is no upstream branch')
elif do_set_upstream == 'm':
git(git_dir, 'checkout', 'master')
return orig_branch
else:
print('skipping branch, because there is no upstream: %r' % orig_branch)
return orig_branch
while True:
# bu: branch-to-upstream
# bm: branch-to-master
# um: upstream-to-master
upstream_branch = git_branch_upstream(git_dir, orig_branch)
um = AheadBehind(git_dir, upstream_branch, 'origin/master')
bm = AheadBehind(git_dir, orig_branch, 'origin/master')
if bm.can_ff:
bm.ff()
continue
bu = AheadBehind(git_dir, orig_branch, upstream_branch)
if bu.can_ff:
bu.ff()
continue
if not bu.is_sync():
print(str(bu))
if not bm.is_sync():
print('to master: ' + str(bm))
if not um.is_sync():
print('upstream to master: ' + str(um))
options = ['----- %s' % git_dir,
'<empty> skip']
valid_answers = ['']
all_good = True
if um.is_diverged():
all_good = False
if bu.is_diverged():
options.append('rum rebase onto upstream, then onto master')
valid_answers.append('rum')
#if bm.is_diverged():
options.append('rm rebase onto master: git rebase -i origin/master')
valid_answers.append('rm')
if bu.is_diverged():
all_good = False
options.append('ru rebase onto upstream: git rebase -i %s' % upstream_branch)
valid_answers.append('ru')
if bu.is_diverged() or bu.is_ahead():
all_good = False
options.append('P push to overwrite upstream: git push -f')
valid_answers.append('P')
options.append('RU reset to upstream: git reset --hard %s' % upstream_branch)
valid_answers.append('RU')
if orig_branch == 'master' and (bm.is_ahead() or bm.is_diverged()):
all_good = False
options.append('<name> create new branch')
valid_answers.append('+')
if all_good:
break
do = ask(git_dir, *options, valid_answers=valid_answers)
if not do:
break
if do == 'rum' or do == 'ru':
git(git_dir, 'rebase', '-i', upstream_branch)
if do == 'rum' or do == 'rm':
git(git_dir, 'rebase', '-i', 'origin/master')
if do == 'RU':
git(git_dir, 'reset', '--hard', upstream_branch)
if do == 'P':
git(git_dir, 'push', '-f')
if do not in valid_answers:
new_branch = do
# create new branch
print('''git(git_dir, 'checkout', '-b', new_branch)''')
#orig_branch = new_branch
return orig_branch
def cmd_rebase():
skipped = []
for git_dir in git_dirs():
try:
print('\n\n===== %s =====' % git_dir)
sys.stdout.flush()
branch = rebase(git_dir)
if branch != 'master':
mm = AheadBehind(git_dir, 'master', 'origin/master')
if not mm.is_sync():
git(git_dir, 'checkout', 'master')
rebase(git_dir)
git(git_dir, 'checkout', branch)
except SkipThisRepo:
print('\nSkipping %r' % git_dir)
skipped.append(git_dir)
print('\n\n==========\nrebase done.\n')
print_status()
if skipped:
print('\nskipped: %s' % ' '.join(skipped))
def parse_args():
parser = argparse.ArgumentParser(description=doc)
sub = parser.add_subparsers(title='action', dest='action')
sub.required = True
# status
sub.add_parser('status', aliases=['st', 's'],
help='show a branch summary and indicate modifications')
# fetch
fetch = sub.add_parser('fetch', aliases=['f'],
help="run 'git fetch' in each clone (use before rebase)")
fetch.add_argument('remainder', nargs=argparse.REMAINDER,
help='additional arguments to be passed to git fetch')
# rebase
sub.add_parser('rebase', aliases=['r', 're'],
help='interactively ff-merge master, rebase current branches')
# sh
sh = sub.add_parser('sh',
help='run shell command in each clone (`gits sh echo hi`)')
sh.add_argument('remainder', nargs=argparse.REMAINDER,
help='command to run in each clone')
# do
do = sub.add_parser('do',
help='run git command in each clone (`gits do clean -dxf`)')
do.add_argument('remainder', nargs=argparse.REMAINDER,
help='git command to run in each clone')
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
if args.action in ['status', 's', 'st']:
print_status()
elif args.action in ['fetch', 'f']:
cmd_do(['fetch'] + args.remainder)
elif args.action in ['rebase', 'r']:
cmd_rebase()
elif args.action == 'sh':
cmd_sh(args.remainder)
elif args.action == 'do':
cmd_do(args.remainder)
# vim: shiftwidth=4 expandtab tabstop=4