# SPDX-License-Identifier: LGPL-2.1-or-later # *************************************************************************** # * * # * Copyright (c) 2022 FreeCAD Project Association * # * * # * This file is part of FreeCAD. * # * * # * FreeCAD is free software: you can redistribute it and/or modify it * # * under the terms of the GNU Lesser General Public License as * # * published by the Free Software Foundation, either version 2.1 of the * # * License, or (at your option) any later version. * # * * # * FreeCAD is distributed in the hope that it will be useful, but * # * WITHOUT ANY WARRANTY; without even the implied warranty of * # * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * # * Lesser General Public License for more details. * # * * # * You should have received a copy of the GNU Lesser General Public * # * License along with FreeCAD. If not, see * # * . * # * * # *************************************************************************** """ Wrapper around git executable to simplify calling git commands from Python. """ # pylint: disable=too-few-public-methods import os import platform import shutil import subprocess from typing import List, Dict, Optional import time import addonmanager_utilities as utils import addonmanager_freecad_interface as fci translate = fci.translate class NoGitFound(RuntimeError): """Could not locate the git executable on this system.""" class GitFailed(RuntimeError): """The call to git returned an error of some kind""" def _ref_format_string() -> str: return ( "--format=%(refname:lstrip=2)\t%(upstream:lstrip=2)\t%(authordate:rfc)\t%(" "authorname)\t%(subject)" ) def _parse_ref_table(text: str): rows = text.splitlines() result = [] for row in rows: columns = row.split("\t") result.append( { "ref_name": columns[0], "upstream": columns[1], "date": columns[2], "author": columns[3], "subject": columns[4], } ) return result class GitManager: """A class to manage access to git: mostly just provides a simple wrapper around the basic command-line calls. Provides optional asynchronous access to clone and update.""" def __init__(self): self.git_exe = None self._find_git() if not self.git_exe: raise NoGitFound() def clone(self, remote, local_path, args: List[str] = None): """Clones the remote to the local path""" final_args = ["clone", "--recurse-submodules"] if args: final_args.extend(args) final_args.extend([remote, local_path]) self._synchronous_call_git(final_args) def async_clone(self, remote, local_path, progress_monitor, args: List[str] = None): """Clones the remote to the local path, sending periodic progress updates to the passed progress_monitor. Returns a handle that can be used to cancel the job.""" def checkout(self, local_path, spec, args: List[str] = None): """Checks out a specific git revision, tag, or branch. Any valid argument to git checkout can be submitted.""" old_dir = os.getcwd() os.chdir(local_path) final_args = ["checkout"] if args: final_args.extend(args) final_args.append(spec) self._synchronous_call_git(final_args) os.chdir(old_dir) def dirty(self, local_path: str) -> bool: """Check for local changes""" old_dir = os.getcwd() os.chdir(local_path) result = False final_args = ["diff-index", "HEAD"] try: stdout = self._synchronous_call_git(final_args) if stdout: result = True except GitFailed: result = False os.chdir(old_dir) return result def detached_head(self, local_path: str) -> bool: """Check for detached head state""" old_dir = os.getcwd() os.chdir(local_path) result = False final_args = ["rev-parse", "--abbrev-ref", "--symbolic-full-name", "HEAD"] try: stdout = self._synchronous_call_git(final_args) if stdout == "HEAD": result = True except GitFailed: result = False os.chdir(old_dir) return result def update(self, local_path): """Fetches and pulls the local_path from its remote""" old_dir = os.getcwd() os.chdir(local_path) try: self._synchronous_call_git(["fetch"]) self._synchronous_call_git(["pull"]) self._synchronous_call_git(["submodule", "update", "--init", "--recursive"]) except GitFailed as e: fci.Console.PrintWarning( translate( "AddonsInstaller", "Basic git update failed with the following message:", ) + str(e) + "\n" ) fci.Console.PrintWarning( translate( "AddonsInstaller", "Backing up the original directory and re-cloning", ) + "...\n" ) remote = self.get_remote(local_path) with open(os.path.join(local_path, "ADDON_DISABLED"), "w", encoding="utf-8") as f: f.write( "This is a backup of an addon that failed to update cleanly so " "was re-cloned. It was disabled by the Addon Manager's git update " "facility and can be safely deleted if the addon is working " "properly." ) os.chdir("..") os.rename(local_path, local_path + ".backup" + str(time.time())) self.clone(remote, local_path) os.chdir(old_dir) def status(self, local_path) -> str: """Gets the v1 porcelain status""" old_dir = os.getcwd() os.chdir(local_path) try: status = self._synchronous_call_git(["status", "-sb", "--porcelain"]) except GitFailed as e: os.chdir(old_dir) raise e os.chdir(old_dir) return status def reset(self, local_path, args: List[str] = None): """Executes the git reset command""" old_dir = os.getcwd() os.chdir(local_path) final_args = ["reset"] if args: final_args.extend(args) try: self._synchronous_call_git(final_args) except GitFailed as e: os.chdir(old_dir) raise e os.chdir(old_dir) def async_fetch_and_update(self, local_path, progress_monitor, args=None): """Same as fetch_and_update, but asynchronous""" def update_available(self, local_path) -> bool: """Returns True if an update is available from the remote, or false if not""" old_dir = os.getcwd() os.chdir(local_path) try: self._synchronous_call_git(["fetch"]) status = self._synchronous_call_git(["status", "-sb", "--porcelain"]) except GitFailed as e: os.chdir(old_dir) raise e os.chdir(old_dir) return "behind" in status def current_tag(self, local_path) -> str: """Get the name of the currently checked-out tag if HEAD is detached""" old_dir = os.getcwd() os.chdir(local_path) try: tag = self._synchronous_call_git(["describe", "--tags"]).strip() except GitFailed as e: os.chdir(old_dir) raise e os.chdir(old_dir) return tag def current_branch(self, local_path) -> str: """Get the name of the current branch""" old_dir = os.getcwd() os.chdir(local_path) try: # This only works with git 2.22 and later (June 2019) # branch = self._synchronous_call_git(["branch", "--show-current"]).strip() # This is more universal (albeit more opaque to the reader): branch = self._synchronous_call_git(["rev-parse", "--abbrev-ref", "HEAD"]).strip() except GitFailed as e: os.chdir(old_dir) raise e os.chdir(old_dir) return branch def repair(self, remote, local_path): """Assumes that local_path is supposed to be a local clone of the given remote, and ensures that it is. Note that any local changes in local_path will be destroyed. This is achieved by archiving the old path, cloning an entirely new copy, and then deleting the old directory.""" original_cwd = os.getcwd() # Make sure we are not currently in that directory, otherwise on Windows the # "rename" will fail. To guarantee we aren't in it, change to it, then shift # up one. os.chdir(local_path) os.chdir("..") backup_path = local_path + ".backup" + str(time.time()) os.rename(local_path, backup_path) try: self.clone(remote, local_path) except GitFailed as e: fci.Console.PrintError( translate("AddonsInstaller", "Failed to clone {} into {} using git").format( remote, local_path ) ) os.chdir(original_cwd) raise e os.chdir(original_cwd) shutil.rmtree(backup_path, ignore_errors=True) def get_remote(self, local_path) -> str: """Get the repository that this local path is set to fetch from""" old_dir = os.getcwd() os.chdir(local_path) try: response = self._synchronous_call_git(["remote", "-v", "show"]) except GitFailed as e: os.chdir(old_dir) raise e lines = response.split("\n") result = "(unknown remote)" for line in lines: if line.endswith("(fetch)"): # The line looks like: # origin https://some/sort/of/path (fetch) segments = line.split() if len(segments) == 3: result = segments[1] break fci.Console.PrintWarning("Error parsing the results from git remote -v show:\n") fci.Console.PrintWarning(line + "\n") os.chdir(old_dir) return result def get_branches(self, local_path) -> List[str]: """Get a list of all available branches (local and remote)""" old_dir = os.getcwd() os.chdir(local_path) try: stdout = self._synchronous_call_git(["branch", "-a", "--format=%(refname:lstrip=2)"]) except GitFailed as e: os.chdir(old_dir) raise e os.chdir(old_dir) branches = [] for branch in stdout.split("\n"): branches.append(branch) return branches def get_branches_with_info(self, local_path) -> List[Dict[str, str]]: """Get a list of branches, where each entry is a dictionary with status information about the branch.""" old_dir = os.getcwd() os.chdir(local_path) try: stdout = self._synchronous_call_git(["branch", "-a", _ref_format_string()]) return _parse_ref_table(stdout) except GitFailed as e: os.chdir(old_dir) raise e def get_tags_with_info(self, local_path) -> List[Dict[str, str]]: """Get a list of branches, where each entry is a dictionary with status information about the branch.""" old_dir = os.getcwd() os.chdir(local_path) try: stdout = self._synchronous_call_git(["tag", "-l", _ref_format_string()]) return _parse_ref_table(stdout) except GitFailed as e: os.chdir(old_dir) raise e def get_last_committers(self, local_path, n=10): """Examine the last n entries of the commit history, and return a list of all the committers, their email addresses, and how many commits each one is responsible for. """ old_dir = os.getcwd() os.chdir(local_path) authors = self._synchronous_call_git(["log", f"-{n}", "--format=%cN"]).split("\n") emails = self._synchronous_call_git(["log", f"-{n}", "--format=%cE"]).split("\n") os.chdir(old_dir) result_dict = {} for author, email in zip(authors, emails): if not author or not email: continue if author not in result_dict: result_dict[author] = {} result_dict[author]["email"] = [email] result_dict[author]["count"] = 1 else: if email not in result_dict[author]["email"]: # Same author name, new email address -- treat it as the same # person with a second email, instead of as a whole new person result_dict[author]["email"].append(email) result_dict[author]["count"] += 1 return result_dict def get_last_authors(self, local_path, n=10): """Examine the last n entries of the commit history, and return a list of all the authors, their email addresses, and how many commits each one is responsible for. """ old_dir = os.getcwd() os.chdir(local_path) authors = self._synchronous_call_git(["log", f"-{n}", "--format=%aN"]) emails = self._synchronous_call_git(["log", f"-{n}", "--format=%aE"]) os.chdir(old_dir) result_dict = {} for author, email in zip(authors, emails): if author not in result_dict: result_dict[author]["email"] = [email] result_dict[author]["count"] = 1 else: if email not in result_dict[author]["email"]: # Same author name, new email address -- treat it as the same # person with a second email, instead of as a whole new person result_dict[author]["email"].append(email) result_dict[author]["count"] += 1 return result_dict def migrate_branch(self, local_path: str, old_branch: str, new_branch: str) -> None: """Rename a branch (used when the remote branch name changed). Assumes that "origin" exists.""" old_dir = os.getcwd() os.chdir(local_path) try: self._synchronous_call_git(["branch", "-m", old_branch, new_branch]) self._synchronous_call_git(["fetch", "origin"]) self._synchronous_call_git(["branch", "--unset-upstream"]) self._synchronous_call_git(["branch", f"--set-upstream-to=origin/{new_branch}"]) self._synchronous_call_git(["pull"]) except GitFailed as e: fci.Console.PrintWarning( translate( "AddonsInstaller", "Git branch rename failed with the following message:", ) + str(e) + "\n" ) os.chdir(old_dir) raise e os.chdir(old_dir) def _find_git(self): # Find git. In preference order # A) The value of the GitExecutable user preference # B) The executable located in the same directory as FreeCAD and called "git" # C) The result of a shutil search for your system's "git" executable prefs = fci.ParamGet("User parameter:BaseApp/Preferences/Addons") git_exe = prefs.GetString("GitExecutable", "Not set") if not git_exe or git_exe == "Not set" or not os.path.exists(git_exe): fc_dir = fci.DataPaths().home_dir git_exe = os.path.join(fc_dir, "bin", "git") if "Windows" in platform.system(): git_exe += ".exe" if platform.system() == "Darwin" and not self._xcode_command_line_tools_are_installed(): return if not git_exe or not os.path.exists(git_exe): git_exe = shutil.which("git") if not git_exe or not os.path.exists(git_exe): return prefs.SetString("GitExecutable", git_exe) self.git_exe = git_exe def _xcode_command_line_tools_are_installed(self) -> bool: """On Macs, there is *always* an executable called "git", but sometimes it's just a script that tells the user to install XCode's Command Line tools. So the existence of git on the Mac actually requires us to check for that installation.""" try: subprocess.check_output(["xcode-select", "-p"]) return True except subprocess.CalledProcessError: return False def _synchronous_call_git(self, args: List[str]) -> str: """Calls git and returns its output.""" final_args = [self.git_exe] final_args.extend(args) try: proc = utils.run_interruptable_subprocess(final_args) except subprocess.CalledProcessError as e: raise GitFailed( f"Git returned a non-zero exit status: {e.returncode}\n" + f"Called with: {' '.join(final_args)}\n\n" + f"Returned stderr:\n{e.stderr}" ) from e return proc.stdout def initialize_git() -> Optional[GitManager]: """If git is enabled, locate the git executable if necessary and return a new GitManager object. The executable location is saved in user preferences for reuse, and git can be disabled by setting the disableGit parameter in the Addons preference group. Returns None if for any of those reasons we aren't using git.""" git_manager = None pref = fci.ParamGet("User parameter:BaseApp/Preferences/Addons") disable_git = pref.GetBool("disableGit", False) if not disable_git: try: git_manager = GitManager() except NoGitFound: pass return git_manager