import sys import psutil import time import random from logger import info, warning, error import traceback from config import ( LOGIN_URL, SIGN_UP_URL, SETTINGS_URL, EMAIL_DOMAINS, REGISTRATION_MAX_RETRIES, EMAIL_TYPE, EMAIL_CODE_TYPE ) import secrets import hashlib import base64 import uuid import requests from typing import Optional, Tuple if sys.stdout.encoding != "utf-8": sys.stdout.reconfigure(encoding="utf-8") if sys.stderr.encoding != "utf-8": sys.stderr.reconfigure(encoding="utf-8") from browser_utils import BrowserManager from get_email_code import EmailVerificationHandler from datetime import datetime # 添加这行导入 TOTAL_USAGE = 0 def handle_turnstile(tab, max_retries: int = 5, retry_interval: tuple = (1, 2)) -> bool: """ 处理Turnstile验证 Args: tab: 浏览器标签对象 max_retries: 最大重试次数 retry_interval: 重试间隔范围(最小值, 最大值) Returns: bool: 验证是否成功 """ info("=============正在检测 Turnstile 验证=============") # 初始化重试计数器 retry_count = 0 try: # 在最大重试次数内循环尝试验证 while retry_count < max_retries: # 增加重试计数 retry_count += 1 info(f"正在进行 Turnstile 第 {retry_count} 次验证中...") try: # 检查页面状态,但不直接返回,先检查是否有Turnstile验证需要处理 page_ready = False if tab.ele("@name=password"): info("检测到密码输入页面,检查是否有验证需要处理...") page_ready = True elif tab.ele("@data-index=0"): info("检测到验证码输入页面,检查是否有验证需要处理...") page_ready = True elif tab.ele("Account Settings"): info("检测到账户设置页面,检查是否有验证需要处理...") page_ready = True # 初始化元素变量 element = None try: # 尝试通过层级结构定位到Turnstile验证框的容器元素 element = ( tab.ele(".main-content") # 找到 .main-content 元素 .ele("tag:div") # 找到第一个子 div .ele("tag:div") # 找到第二个子 div .ele("tag:div") # 找到第三个子 div ) except Exception as e: # 如果无法通过第一种方式找到元素,忽略异常继续执行 pass challenge_check = None if element: # 如果找到了容器元素,则在其中定位验证框的输入元素 try: challenge_check = ( element .shadow_root.ele("tag:iframe") # 找到shadow DOM中的iframe .ele("tag:body") # 找到iframe中的body .sr("tag:input") # 找到body中的input元素 ) except Exception as e: pass else: # 如果没有找到容器元素,则尝试另一种方式定位验证框 try: challenge_check = ( tab.ele("@id=cf-turnstile", timeout=2) # 通过id直接找到turnstile元素 .child() # 获取其子元素 .shadow_root.ele("tag:iframe") # 找到shadow DOM中的iframe .ele("tag:body") # 找到iframe中的body .sr("tag:input") # 找到body中的input元素 ) except Exception as e: pass if challenge_check: # 如果找到了验证输入元素,记录日志 info("检测到 Turnstile 验证,正在处理...") # 点击前随机延迟,模拟人工操作 time.sleep(random.uniform(1, 3)) # 点击验证元素触发验证 challenge_check.click() # 等待验证处理 time.sleep(2) info("Turnstile 验证通过") return True else: info("未检测到 Turnstile 验证") # 如果页面已准备好且没有验证需要处理,则可以返回 if page_ready: info("页面已准备好,没有检测到需要处理的验证") return True except Exception as e: # 记录当前尝试失败的详细信息 info(f"当前验证尝试失败: {str(e)}") # 在下一次尝试前随机延迟 time.sleep(random.uniform(*retry_interval)) # 超过最大重试次数,验证失败 error(f"Turnstile 验证次数超过最大限制 {max_retries},退出") return False except Exception as e: # 捕获整个验证过程中的异常 error_msg = f"Turnstile 验证失败: {str(e)}" error(error_msg) return False def get_cursor_session_token(tab, max_attempts: int = 3, retry_interval: int = 2) -> Optional[Tuple[str, str]]: """ 获取Cursor会话token Args: tab: 浏览器标签页对象 max_attempts: 最大尝试次数 retry_interval: 重试间隔(秒) Returns: Tuple[str, str] | None: 成功返回(userId, accessToken)元组,失败返回None """ info("开始获取会话令牌") # 首先尝试使用UUID深度登录方式 info("尝试使用深度登录方式获取token") def _generate_pkce_pair(): """生成PKCE验证对""" code_verifier = secrets.token_urlsafe(43) code_challenge_digest = hashlib.sha256(code_verifier.encode('utf-8')).digest() code_challenge = base64.urlsafe_b64encode(code_challenge_digest).decode('utf-8').rstrip('=') return code_verifier, code_challenge attempts = 0 while attempts < max_attempts: try: verifier, challenge = _generate_pkce_pair() id = uuid.uuid4() client_login_url = f"https://www.cursor.com/cn/loginDeepControl?challenge={challenge}&uuid={id}&mode=login" info(f"访问深度登录URL: {client_login_url}") tab.get(client_login_url) # save_screenshot(tab, f"deeplogin_attempt_{attempts}") if tab.ele("xpath=//span[contains(text(), 'Yes, Log In')]", timeout=5): info("点击确认登录按钮") tab.ele("xpath=//span[contains(text(), 'Yes, Log In')]").click() time.sleep(1.5) auth_poll_url = f"https://api2.cursor.sh/auth/poll?uuid={id}&verifier={verifier}" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Cursor/0.48.6 Chrome/132.0.6834.210 Electron/34.3.4 Safari/537.36", "Accept": "*/*" } info(f"轮询认证状态: {auth_poll_url}") response = requests.get(auth_poll_url, headers=headers, timeout=5) if response.status_code == 200: data = response.json() accessToken = data.get("accessToken", None) authId = data.get("authId", "") if accessToken: userId = "" if len(authId.split("|")) > 1: userId = authId.split("|")[1] info("成功获取账号token和userId") return accessToken, userId else: error(f"API请求失败,状态码: {response.status_code}") else: warning("未找到登录确认按钮") attempts += 1 if attempts < max_attempts: wait_time = retry_interval * attempts # 逐步增加等待时间 warning(f"第 {attempts} 次尝试未获取到token,{wait_time}秒后重试...") # save_screenshot(tab, f"token_attempt_{attempts}") time.sleep(wait_time) except Exception as e: error(f"深度登录获取token失败: {str(e)}") attempts += 1 # save_screenshot(tab, f"token_error_{attempts}") if attempts < max_attempts: wait_time = retry_interval * attempts warning(f"将在 {wait_time} 秒后重试...") time.sleep(wait_time) def sign_up_account(browser, tab, account_info): info("=============开始注册账号=============") info( f"账号信息: 邮箱: {account_info['email']}, 密码: {account_info['password']}, 姓名: {account_info['first_name']} {account_info['last_name']}" ) if EMAIL_TYPE == "zmail": EmailVerificationHandler.create_zmail_email(account_info) tab.get(SIGN_UP_URL) tab.wait(2) if tab.ele("@name=cf-turnstile-response"): error("开屏就是检测啊,大佬你的IP或UA需要换一下了啊,有问题了...要等一下") try: if tab.ele("@name=first_name"): info("=============正在填写个人信息=============") tab.actions.click("@name=first_name").input(account_info["first_name"]) info(f"已输入名字: {account_info['first_name']}") time.sleep(random.uniform(1, 3)) tab.actions.click("@name=last_name").input(account_info["last_name"]) info(f"已输入姓氏: {account_info['last_name']}") time.sleep(random.uniform(1, 3)) tab.actions.click("@name=email").input(account_info["email"]) info(f"已输入邮箱: {account_info['email']}") time.sleep(random.uniform(1, 3)) info("=============提交个人信息=============") tab.actions.click("@type=submit") time.sleep(random.uniform(0.2, 1)) if ( tab.ele("verify the user is human. Please try again.") or tab.ele("Can't verify the user is human. Please try again.") or tab.ele("Can't verify the user is human. Please try again.") ): info("检测到turnstile验证失败,(IP问题、UA问题、域名问题)...正在重试...") return "EMAIL_USED" except Exception as e: info(f"填写个人信息失败: {str(e)}") return "ERROR" handle_turnstile(tab) if tab.ele("verify the user is human. Please try again.") or tab.ele( "Can't verify the user is human. Please try again." ): info("检测到turnstile验证失败,正在重试...") return "EMAIL_USED" try: if tab.ele("@name=password"): info(f"设置密码:{account_info['password']}") tab.ele("@name=password").input(account_info["password"]) time.sleep(random.uniform(1, 2)) info("提交密码...") tab.ele("@type=submit").click() info("密码设置成功,等待系统响应....") except Exception as e: info(f"密码设置失败: {str(e)}") return "ERROR" info("处理最终验证...") handle_turnstile(tab) if tab.ele("This email is not available."): info("邮箱已被使用") return "EMAIL_USED" if tab.ele("Sign up is restricted."): info("注册限制") return "SIGNUP_RESTRICTED" # 创建邮件处理器 email_handler = EmailVerificationHandler() i = 0 while i < 5: try: time.sleep(random.uniform(0.2, 1)) if tab.ele("Account Settings"): info("注册成功,已进入账号设置页面") break if tab.ele("@data-index=0"): info("等待输入验证码...") # 切换到邮箱标签页 code = email_handler.get_verification_code( source_email=account_info["email"] ) if code is None: info("未获取到验证码...系统异常,正在退出....") return "EMAIL_GET_CODE_FAILED" info(f"输入验证码: {code}") i = 0 for digit in code: tab.ele(f"@data-index={i}").input(digit) time.sleep(random.uniform(0.3, 0.6)) i += 1 info("验证码输入完成") time.sleep(random.uniform(3, 5)) # 在验证码输入完成后检测是否出现了Turnstile验证 info("检查是否出现了Turnstile验证...") try: turnstile_element = tab.ele("@id=cf-turnstile", timeout=3) if turnstile_element: info("检测到验证码输入后出现Turnstile验证,正在处理...") handle_turnstile(tab) except: info("未检测到Turnstile验证,继续下一步") break except Exception as e: info(f"验证码处理失败: {str(e)}") return "ERROR" info("完成最终验证...") handle_turnstile(tab) time.sleep(random.uniform(3, 5)) info("账号注册流程完成") return "SUCCESS" class EmailGenerator: def __init__( self, ): # 将密码生成移到这里,避免类定义时执行随机密码生成 self.default_first_name = self.generate_random_name() self.default_last_name = self.generate_random_name() # 从配置文件获取域名配置 self.domains = EMAIL_DOMAINS info(f"当前可用域名: {self.domains}") self.email = None self.password = None def generate_random_password(self, length=12): """生成随机密码 - 改进密码生成算法,确保包含各类字符""" chars = "abcdefghijklmnopqrstuvwxyz" upper_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" digits = "0123456789" special = "!@#$%^&*" # 确保密码包含至少一个大写字母、一个数字和一个特殊字符 password = [ random.choice(chars), random.choice(upper_chars), random.choice(digits), random.choice(special), ] # 添加剩余随机字符 password.extend( random.choices(chars + upper_chars + digits + special, k=length - 4) ) # 打乱密码顺序 random.shuffle(password) return "".join(password) def generate_random_name(self, length=6): """生成随机用户名""" first_letter = random.choice("ABCDEFGHIJKLMNOPQRSTUVWXYZ") rest_letters = "".join( random.choices("abcdefghijklmnopqrstuvwxyz", k=length - 1) ) return first_letter + rest_letters def generate_email(self, length=8): """生成随机邮箱地址,使用随机域名""" random_str = "".join( random.choices("abcdefghijklmnopqrstuvwxyz1234567890", k=length) ) timestamp = str(int(time.time()))[-4:] # 使用时间戳后4位 # 随机选择一个域名 domain = random.choice(self.domains) return f"{random_str}@{domain}" def get_account_info(self): """获取账号信息,确保每次调用都生成新的邮箱和密码""" self.email = self.generate_email() self.password = self.generate_random_password() return { "email": self.email, "password": self.password, "first_name": self.default_first_name.capitalize(), "last_name": self.default_last_name.capitalize(), } def _save_account_info(self, user, token, total_usage): try: from database import get_session, AccountModel import asyncio import time async def save_to_db(): info(f"开始保存账号信息: {self.email}") async with get_session() as session: # 检查账号是否已存在 from sqlalchemy import select result = await session.execute( select(AccountModel).where(AccountModel.email == self.email) ) existing_account = result.scalar_one_or_none() if existing_account: info(f"更新现有账号信息 (ID: {existing_account.id})") existing_account.token = token existing_account.user = user existing_account.password = self.password existing_account.usage_limit = str(total_usage) # 如果账号状态是删除,更新为活跃 if existing_account.status == "deleted": existing_account.status = "active" # 不更新id,保留原始id值 else: info("创建新账号记录") # 生成毫秒级时间戳作为id timestamp_ms = int(time.time() * 1000) account = AccountModel( email=self.email, password=self.password, token=token, user=user, usage_limit=str(total_usage), created_at=datetime.now().strftime("%Y-%m-%d %H:%M"), status="active", # 设置默认状态为活跃 id=timestamp_ms, # 设置毫秒时间戳id ) session.add(account) await session.commit() info(f"账号 {self.email} 信息保存成功") return True return asyncio.run(save_to_db()) except Exception as e: info(f"保存账号信息失败: {str(e)}") return False def cleanup_and_exit(browser_manager=None, exit_code=0): """清理资源并退出程序""" try: if browser_manager: info("正在关闭浏览器") if hasattr(browser_manager, "browser"): browser_manager.browser.quit() current_process = psutil.Process() children = current_process.children(recursive=True) for child in children: try: child.terminate() except: pass info("程序正常退出") sys.exit(exit_code) except Exception as e: info(f"清理退出时发生错误: {str(e)}") sys.exit(1) def main(): browser_manager = None max_retries = REGISTRATION_MAX_RETRIES # 从配置文件获取 current_retry = 0 try: email_handler = EmailVerificationHandler() if email_handler.check(): info('邮箱服务连接正常,开始注册!') else: if EMAIL_CODE_TYPE == "API": error('邮箱服务连接失败,并且验证码为API获取,结束注册!') return else: info('邮箱服务连接失败,并且验证码为手动输入,等待输入验证码...') email_generator = EmailGenerator() browser_manager = BrowserManager() browser = browser_manager.init_browser() while current_retry < max_retries: try: account_info = email_generator.get_account_info() info( f"初始化账号信息成功 => 邮箱: {account_info['email']}, 用户名: {account_info['first_name']}, 密码: {account_info['password']}" ) signup_tab = browser.new_tab(LOGIN_URL) browser.activate_tab(signup_tab) signup_tab.run_js("try { turnstile.reset() } catch(e) { }") result = sign_up_account(browser, signup_tab, account_info) if result == "SUCCESS": token, user = get_cursor_session_token(signup_tab) info(f"获取到账号Token: {token}, 用户: {user}") if token: email_generator._save_account_info(user, token, TOTAL_USAGE) info("注册流程完成") cleanup_and_exit(browser_manager, 0) else: info("获取Cursor会话Token失败") current_retry += 1 elif result in [ "EMAIL_USED", "SIGNUP_RESTRICTED", "VERIFY_FAILED", "EMAIL_GET_CODE_FAILED", ]: info(f"遇到问题: {result},尝试切换邮箱...") continue # 使用新邮箱重试注册 else: # ERROR info("遇到错误,准备重试...") current_retry += 1 # 关闭标签页,准备下一次尝试 signup_tab.close() time.sleep(2) except Exception as e: info(f"当前尝试发生错误: {str(e)}") current_retry += 1 time.sleep(2) try: # 尝试关闭可能存在的标签页 if "signup_tab" in locals(): signup_tab.close() except: pass info(f"达到最大重试次数 {max_retries},注册失败") except Exception as e: info(f"主程序错误: {str(e)}") info(f"错误详情: {traceback.format_exc()}") cleanup_and_exit(browser_manager, 1) finally: cleanup_and_exit(browser_manager, 1)