chrome_controller.py 13.9 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
"""
Chrome浏览器控制模块
使用Selenium控制Chrome并监控网络请求
"""

import json
import time
import random
import numpy as np
from datetime import datetime
from typing import Dict, List, Optional, Callable

# 使用undetected-chromedriver替代标准Selenium
try:
    import undetected_chromedriver as uc
    USING_UNDETECTED = True
except ImportError:
    # 如果未安装undetected-chromedriver,回退到标准Selenium
    from selenium import webdriver
    from selenium.webdriver.chrome.options import Options
    USING_UNDETECTED = False
    print("警告: 未安装undetected-chromedriver,使用标准Selenium(检测风险较高)")
    print("建议安装: pip install undetected-chromedriver")

from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException


class ChromeController:
    """Chrome浏览器控制器"""

    def __init__(self):
        self.driver = None
        self.wait = None
        self.network_logs = []
        self.is_running = False

    def start_browser(self) -> bool:
        """启动Chrome浏览器"""
        try:
            if USING_UNDETECTED:
                # 使用undetected-chromedriver(推荐)
                chrome_options = uc.ChromeOptions()

                # 保留必要的配置
                chrome_options.add_argument("--disable-extensions")
                chrome_options.add_argument("--no-sandbox")
                chrome_options.add_argument("--disable-dev-shm-usage")

                # 启用网络日志
                chrome_options.set_capability('goog:loggingPrefs', {'performance': 'ALL'})

                # 创建undetected-chromedriver实例
                # version_main=None 让它自动检测Chrome版本
                self.driver = uc.Chrome(options=chrome_options, version_main=None)

                print("Chrome浏览器启动成功 (undetected-chromedriver)")

            else:
                # 回退到标准Selenium
                chrome_options = Options()
                chrome_options.add_argument("--disable-blink-features=AutomationControlled")
                chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
                chrome_options.add_experimental_option('useAutomationExtension', False)
                chrome_options.add_argument("--disable-extensions")
                chrome_options.add_argument("--no-sandbox")
                chrome_options.add_argument("--disable-dev-shm-usage")
                chrome_options.set_capability('goog:loggingPrefs', {'performance': 'ALL'})

                self.driver = webdriver.Chrome(options=chrome_options)
                self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")

                print("Chrome浏览器启动成功 (标准Selenium)")

            self.wait = WebDriverWait(self.driver, 10)
            self.is_running = True
            return True

        except Exception as e:
            print(f"Chrome浏览器启动失败: {e}")
            return False

    def stop_browser(self):
        """关闭浏览器"""
        try:
            if self.driver:
                self.driver.quit()
                self.is_running = False
                print("Chrome浏览器已关闭")
        except Exception as e:
            print(f"关闭浏览器异常: {e}")

    def navigate_to(self, url: str) -> bool:
        """导航到指定URL"""
        try:
            if not self.driver:
                return False

            self.driver.get(url)
            time.sleep(2)  # 等待页面加载
            return True

        except Exception as e:
            print(f"导航失败: {e}")
            return False

    def wait_for_element(self, by: By, value: str, timeout: int = 10):
        """等待元素出现"""
        try:
            wait = WebDriverWait(self.driver, timeout)
            return wait.until(EC.presence_of_element_located((by, value)))
        except TimeoutException:
            return None

    def find_element_safe(self, by: By, value: str):
        """安全查找元素"""
        try:
            return self.driver.find_element(by, value)
        except NoSuchElementException:
            return None

    def find_elements_safe(self, by: By, value: str) -> List:
        """安全查找多个元素"""
        try:
            return self.driver.find_elements(by, value)
        except NoSuchElementException:
            return []

    def click_element_safe(self, element) -> bool:
        """安全点击元素"""
        try:
            if element:
                # 滚动到元素可见
                self.driver.execute_script("arguments[0].scrollIntoView(true);", element)
                time.sleep(0.5)

                # 点击元素
                element.click()
                return True
        except Exception as e:
            print(f"点击元素失败: {e}")

        return False

    def send_text_safe(self, element, text: str) -> bool:
        """安全输入文本 - 模拟真实人类输入行为"""
        try:
            if not element:
                return False

            element.clear()
            time.sleep(random.uniform(0.3, 0.8))  # 思考时间

            i = 0
            while i < len(text):
                # 随机决定单次输入长度(1-5个字符)
                chunk_size = min(
                    random.choices([1, 2, 3, 4, 5], weights=[20, 30, 25, 15, 10])[0],
                    len(text) - i
                )
                element.send_keys(text[i:i+chunk_size])
                i += chunk_size

                # 停顿时间:长尾分布(大部分0.05-0.3秒,偶尔1秒+)
                pause = np.random.lognormal(mean=-2, sigma=0.8)
                time.sleep(max(0.03, min(pause, 2.0)))  # 限制在0.03-2秒之间

                # 5%概率模拟打错字并退格
                if random.random() < 0.05 and i < len(text):
                    element.send_keys(random.choice(['a', 's', 'd', 'f']))
                    time.sleep(random.uniform(0.1, 0.3))
                    element.send_keys(Keys.BACKSPACE)
                    time.sleep(random.uniform(0.05, 0.15))

                # 10%概率模拟停顿思考(较长暂停)
                if random.random() < 0.1:
                    time.sleep(random.uniform(0.5, 1.5))

            return True

        except Exception as e:
            print(f"输入文本失败: {e}")
            return False

    def get_network_logs(self) -> List[Dict]:
        """获取网络请求日志"""
        try:
            if not self.driver:
                return []

            logs = self.driver.get_log('performance')
            network_logs = []

            for log in logs:
                message = json.loads(log['message'])
                if message['message']['method'] in ['Network.responseReceived', 'Network.requestWillBeSent']:
                    network_logs.append(message)

            return network_logs

        except Exception as e:
            print(f"获取网络日志失败: {e}")
            return []

    def find_api_response(self, url_pattern: str, timeout: int = 30) -> Optional[Dict]:
        """查找特定API响应"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            logs = self.get_network_logs()

            for log in logs:
                message = log['message']

                if message['method'] == 'Network.responseReceived':
                    response = message['params']['response']
                    request_url = response.get('url', '')

                    if url_pattern in request_url:
                        # 尝试获取响应体
                        request_id = message['params']['requestId']
                        try:
                            response_body = self.driver.execute_cdp_cmd(
                                'Network.getResponseBody',
                                {'requestId': request_id}
                            )

                            if response_body and 'body' in response_body:
                                try:
                                    return json.loads(response_body['body'])
                                except json.JSONDecodeError:
                                    pass

                        except Exception:
                            pass

            time.sleep(1)

        return None

    def random_delay(self, min_seconds: float = 1.0, max_seconds: float = 3.0):
        """随机延迟(已废弃,保留向后兼容)"""
        delay = random.uniform(min_seconds, max_seconds)
        time.sleep(delay)

    def smart_delay(self, base_seconds: int = 15) -> float:
        """智能延迟 - 基于时间段和长尾分布的人性化延迟

        Args:
            base_seconds: 基础延迟秒数(默认15秒)

        Returns:
            实际延迟的秒数
        """
        current_hour = datetime.now().hour

        # 根据时间段调整延迟倍数
        if 9 <= current_hour <= 18:
            # 白天工作时间:基础延迟
            multiplier = 1.0
        elif 6 <= current_hour < 9 or 18 < current_hour <= 23:
            # 早晚非工作时间:延迟增加50%
            multiplier = 1.5
        else:
            # 深夜:延迟增加200%(正常人不会半夜发消息)
            multiplier = 3.0

        # 长尾分布延迟(Gamma分布更符合人类行为)
        # shape=2, scale=base_seconds/2 产生偏右的分布
        delay = np.random.gamma(shape=2, scale=base_seconds/2) * multiplier

        # 10%概率有长时间中断(30-120秒)- 模拟分心、查看其他消息等
        if random.random() < 0.1:
            delay += random.uniform(30, 120)

        # 限制最小和最大延迟
        delay = max(5, min(delay, 300))  # 5秒到5分钟之间

        time.sleep(delay)
        return delay

    def should_take_break(self, messages_sent: int) -> bool:
        """判断是否需要休息 - 基于已发送消息数的概率

        Args:
            messages_sent: 已发送消息数

        Returns:
            是否需要休息
        """
        # 发送越多,休息概率越高
        break_probability = min(0.8, messages_sent * 0.02)
        return random.random() < break_probability

    def take_random_break(self) -> float:
        """随机休息 - 模拟人类休息行为

        Returns:
            实际休息的秒数
        """
        # 短休息(1-3分钟): 70%
        # 中休息(5-15分钟): 25%
        # 长休息(30-60分钟): 5%
        break_type = random.choices(['short', 'medium', 'long'], weights=[70, 25, 5])[0]

        if break_type == 'short':
            duration = random.uniform(60, 180)
            print(f"短暂休息 {duration/60:.1f} 分钟...")
        elif break_type == 'medium':
            duration = random.uniform(300, 900)
            print(f"中等休息 {duration/60:.1f} 分钟...")
        else:
            duration = random.uniform(1800, 3600)
            print(f"长时间休息 {duration/60:.1f} 分钟...")

        time.sleep(duration)
        return duration

    def human_like_scroll(self):
        """人性化滚动"""
        try:
            # 随机滚动距离
            scroll_distance = random.randint(100, 500)
            self.driver.execute_script(f"window.scrollBy(0, {scroll_distance});")
            time.sleep(random.uniform(0.5, 1.5))

            # 有时向上滚动一点
            if random.random() < 0.3:
                scroll_back = random.randint(50, 200)
                self.driver.execute_script(f"window.scrollBy(0, -{scroll_back});")
                time.sleep(random.uniform(0.3, 0.8))

        except Exception as e:
            print(f"滚动异常: {e}")

    def get_page_source(self) -> str:
        """获取页面源代码"""
        try:
            if self.driver:
                return self.driver.page_source
        except Exception as e:
            print(f"获取页面源码失败: {e}")

        return ""

    def execute_script(self, script: str, *args):
        """执行JavaScript脚本"""
        try:
            if self.driver:
                return self.driver.execute_script(script, *args)
        except Exception as e:
            print(f"执行脚本失败: {e}")

        return None

    def take_screenshot(self, filename: str) -> bool:
        """截图"""
        try:
            if self.driver:
                return self.driver.save_screenshot(filename)
        except Exception as e:
            print(f"截图失败: {e}")

        return False

    def get_current_url(self) -> str:
        """获取当前URL"""
        try:
            if self.driver:
                return self.driver.current_url
        except Exception:
            pass

        return ""

    def refresh_page(self):
        """刷新页面"""
        try:
            if self.driver:
                self.driver.refresh()
                time.sleep(2)
        except Exception as e:
            print(f"刷新页面失败: {e}")

    def is_element_visible(self, by: By, value: str) -> bool:
        """检查元素是否可见"""
        try:
            element = self.find_element_safe(by, value)
            return element and element.is_displayed()
        except Exception:
            return False

    def wait_for_page_load(self, timeout: int = 30):
        """等待页面加载完成"""
        try:
            WebDriverWait(self.driver, timeout).until(
                lambda driver: driver.execute_script("return document.readyState") == "complete"
            )
        except TimeoutException:
            print("页面加载超时")

    def __enter__(self):
        """上下文管理器入口"""
        self.start_browser()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """上下文管理器出口"""
        self.stop_browser()