git_updater.py 6.27 KB
"""
Git自动更新模块
检查远程仓库更新并自动拉取
"""

import os
import sys
import subprocess
import git
from git import Repo
from typing import Optional
import tempfile
import shutil


class GitUpdater:
    """Git自动更新管理器"""

    def __init__(self):
        self.repo_url = "http://gitlab.zb100.com:10080/chaijin/EtsyCustomerNotify.git"
        self.ssh_key = "XKc2v_hs8-qkougieWvx"
        self.current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
        self.repo = None
        self._init_repo()

    def _init_repo(self):
        """初始化Git仓库"""
        try:
            if os.path.exists(os.path.join(self.current_dir, '.git')):
                self.repo = Repo(self.current_dir)
            else:
                print("当前目录不是Git仓库,将初始化...")
                self._clone_repository()
        except Exception as e:
            print(f"Git仓库初始化失败: {e}")

    def _clone_repository(self):
        """克隆远程仓库"""
        try:
            # 如果当前目录有文件,先备份
            if os.listdir(self.current_dir):
                backup_dir = f"{self.current_dir}_backup"
                if os.path.exists(backup_dir):
                    shutil.rmtree(backup_dir)
                shutil.copytree(self.current_dir, backup_dir)

            # 克隆仓库到临时目录
            temp_dir = tempfile.mkdtemp()
            self.repo = Repo.clone_from(
                self.repo_url,
                temp_dir,
                env={"GIT_SSH_COMMAND": f"ssh -i {self.ssh_key}"}
            )

            # 移动文件到当前目录
            for item in os.listdir(temp_dir):
                src = os.path.join(temp_dir, item)
                dst = os.path.join(self.current_dir, item)
                if os.path.exists(dst):
                    if os.path.isdir(dst):
                        shutil.rmtree(dst)
                    else:
                        os.remove(dst)
                shutil.move(src, dst)

            # 清理临时目录
            shutil.rmtree(temp_dir)

            # 重新初始化repo对象
            self.repo = Repo(self.current_dir)
            print("仓库克隆成功")

        except Exception as e:
            print(f"仓库克隆失败: {e}")

    def get_current_commit(self) -> Optional[str]:
        """获取当前commit ID"""
        try:
            if self.repo:
                return self.repo.head.commit.hexsha
        except Exception as e:
            print(f"获取当前commit失败: {e}")
        return None

    def get_remote_commit(self) -> Optional[str]:
        """获取远程最新commit ID"""
        try:
            if self.repo:
                # 设置SSH密钥环境变量
                env = os.environ.copy()
                env["GIT_SSH_COMMAND"] = f"ssh -i {self.ssh_key}"

                # 获取远程更新
                origin = self.repo.remote('origin')
                origin.fetch(env=env)

                # 获取远程主分支的最新commit
                remote_ref = self.repo.refs['origin/main']  # 或 origin/master
                return remote_ref.commit.hexsha

        except Exception as e:
            print(f"获取远程commit失败: {e}")
            # 尝试master分支
            try:
                remote_ref = self.repo.refs['origin/master']
                return remote_ref.commit.hexsha
            except:
                pass

        return None

    def has_updates(self) -> bool:
        """检查是否有更新"""
        current = self.get_current_commit()
        remote = self.get_remote_commit()

        if current and remote:
            return current != remote

        return False

    def pull_updates(self) -> bool:
        """拉取更新"""
        try:
            if self.repo:
                # 设置SSH密钥环境变量
                env = os.environ.copy()
                env["GIT_SSH_COMMAND"] = f"ssh -i {self.ssh_key}"

                origin = self.repo.remote('origin')
                origin.pull(env=env)

                print("更新拉取成功")
                return True

        except Exception as e:
            print(f"更新拉取失败: {e}")

        return False

    def install_dependencies(self) -> bool:
        """安装依赖"""
        try:
            # 检查requirements.txt是否存在
            requirements_file = os.path.join(self.current_dir, 'requirements.txt')
            if os.path.exists(requirements_file):
                result = subprocess.run([
                    sys.executable, '-m', 'pip', 'install', '-r', requirements_file
                ], capture_output=True, text=True)

                if result.returncode == 0:
                    print("依赖安装成功")
                    return True
                else:
                    print(f"依赖安装失败: {result.stderr}")

        except Exception as e:
            print(f"依赖安装异常: {e}")

        return False

    def restart_application(self):
        """重启应用程序"""
        try:
            # 获取当前Python解释器和脚本路径
            python_exe = sys.executable
            script_path = os.path.join(self.current_dir, 'main.py')

            # 启动新进程
            subprocess.Popen([python_exe, script_path])

            # 退出当前进程
            sys.exit(0)

        except Exception as e:
            print(f"重启应用失败: {e}")

    def check_and_update(self) -> bool:
        """检查并执行更新(主要方法)"""
        try:
            print("检查更新...")

            if not self.repo:
                print("Git仓库未初始化")
                return False

            if self.has_updates():
                print("发现新版本,开始更新...")

                if self.pull_updates():
                    if self.install_dependencies():
                        print("更新完成,需要重启应用")
                        return True
                    else:
                        print("依赖安装失败,继续使用当前版本")
                else:
                    print("更新拉取失败,继续使用当前版本")
            else:
                print("当前已是最新版本")

        except Exception as e:
            print(f"更新检查异常: {e}")

        return False