cursor_auto_register/cursor_pro_keep_alive.py
2025-05-11 23:06:57 +08:00

579 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import psutil
import time
import random
from logger import info, warning, error
import traceback
from config import (
LOGIN_URL,
SIGN_UP_URL,
SETTINGS_URL,
EMAIL_DOMAINS,
REGISTRATION_MAX_RETRIES,
EMAIL_TYPE,
EMAIL_CODE_TYPE
)
import secrets
import hashlib
import base64
import uuid
import requests
from typing import Optional, Tuple
if sys.stdout.encoding != "utf-8":
sys.stdout.reconfigure(encoding="utf-8")
if sys.stderr.encoding != "utf-8":
sys.stderr.reconfigure(encoding="utf-8")
from browser_utils import BrowserManager
from get_email_code import EmailVerificationHandler
from datetime import datetime # 添加这行导入
TOTAL_USAGE = 0
def handle_turnstile(tab, max_retries: int = 5, retry_interval: tuple = (1, 2)) -> bool:
"""
处理Turnstile验证
Args:
tab: 浏览器标签对象
max_retries: 最大重试次数
retry_interval: 重试间隔范围(最小值, 最大值)
Returns:
bool: 验证是否成功
"""
info("=============正在检测 Turnstile 验证=============")
# 初始化重试计数器
retry_count = 0
try:
# 在最大重试次数内循环尝试验证
while retry_count < max_retries:
# 增加重试计数
retry_count += 1
info(f"正在进行 Turnstile 第 {retry_count} 次验证中...")
try:
# 检查页面状态但不直接返回先检查是否有Turnstile验证需要处理
page_ready = False
if tab.ele("@name=password"):
info("检测到密码输入页面,检查是否有验证需要处理...")
page_ready = True
elif tab.ele("@data-index=0"):
info("检测到验证码输入页面,检查是否有验证需要处理...")
page_ready = True
elif tab.ele("Account Settings"):
info("检测到账户设置页面,检查是否有验证需要处理...")
page_ready = True
# 初始化元素变量
element = None
try:
# 尝试通过层级结构定位到Turnstile验证框的容器元素
element = (
tab.ele(".main-content") # 找到 .main-content 元素
.ele("tag:div") # 找到第一个子 div
.ele("tag:div") # 找到第二个子 div
.ele("tag:div") # 找到第三个子 div
)
except Exception as e:
# 如果无法通过第一种方式找到元素,忽略异常继续执行
pass
challenge_check = None
if element:
# 如果找到了容器元素,则在其中定位验证框的输入元素
try:
challenge_check = (
element
.shadow_root.ele("tag:iframe") # 找到shadow DOM中的iframe
.ele("tag:body") # 找到iframe中的body
.sr("tag:input") # 找到body中的input元素
)
except Exception as e:
pass
else:
# 如果没有找到容器元素,则尝试另一种方式定位验证框
try:
challenge_check = (
tab.ele("@id=cf-turnstile", timeout=2) # 通过id直接找到turnstile元素
.child() # 获取其子元素
.shadow_root.ele("tag:iframe") # 找到shadow DOM中的iframe
.ele("tag:body") # 找到iframe中的body
.sr("tag:input") # 找到body中的input元素
)
except Exception as e:
pass
if challenge_check:
# 如果找到了验证输入元素,记录日志
info("检测到 Turnstile 验证,正在处理...")
# 点击前随机延迟,模拟人工操作
time.sleep(random.uniform(1, 3))
# 点击验证元素触发验证
challenge_check.click()
# 等待验证处理
time.sleep(2)
info("Turnstile 验证通过")
return True
else:
info("未检测到 Turnstile 验证")
# 如果页面已准备好且没有验证需要处理,则可以返回
if page_ready:
info("页面已准备好,没有检测到需要处理的验证")
return True
except Exception as e:
# 记录当前尝试失败的详细信息
info(f"当前验证尝试失败: {str(e)}")
# 在下一次尝试前随机延迟
time.sleep(random.uniform(*retry_interval))
# 超过最大重试次数,验证失败
error(f"Turnstile 验证次数超过最大限制 {max_retries},退出")
return False
except Exception as e:
# 捕获整个验证过程中的异常
error_msg = f"Turnstile 验证失败: {str(e)}"
error(error_msg)
return False
def get_cursor_session_token(tab, max_attempts: int = 3, retry_interval: int = 2) -> Optional[Tuple[str, str]]:
"""
获取Cursor会话token
Args:
tab: 浏览器标签页对象
max_attempts: 最大尝试次数
retry_interval: 重试间隔(秒)
Returns:
Tuple[str, str] | None: 成功返回(userId, accessToken)元组失败返回None
"""
info("开始获取会话令牌")
# 首先尝试使用UUID深度登录方式
info("尝试使用深度登录方式获取token")
def _generate_pkce_pair():
"""生成PKCE验证对"""
code_verifier = secrets.token_urlsafe(43)
code_challenge_digest = hashlib.sha256(code_verifier.encode('utf-8')).digest()
code_challenge = base64.urlsafe_b64encode(code_challenge_digest).decode('utf-8').rstrip('=')
return code_verifier, code_challenge
attempts = 0
while attempts < max_attempts:
try:
verifier, challenge = _generate_pkce_pair()
id = uuid.uuid4()
client_login_url = f"https://www.cursor.com/cn/loginDeepControl?challenge={challenge}&uuid={id}&mode=login"
info(f"访问深度登录URL: {client_login_url}")
tab.get(client_login_url)
# save_screenshot(tab, f"deeplogin_attempt_{attempts}")
if tab.ele("xpath=//span[contains(text(), 'Yes, Log In')]", timeout=5):
info("点击确认登录按钮")
tab.ele("xpath=//span[contains(text(), 'Yes, Log In')]").click()
time.sleep(1.5)
auth_poll_url = f"https://api2.cursor.sh/auth/poll?uuid={id}&verifier={verifier}"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Cursor/0.48.6 Chrome/132.0.6834.210 Electron/34.3.4 Safari/537.36",
"Accept": "*/*"
}
info(f"轮询认证状态: {auth_poll_url}")
response = requests.get(auth_poll_url, headers=headers, timeout=5)
if response.status_code == 200:
data = response.json()
accessToken = data.get("accessToken", None)
authId = data.get("authId", "")
if accessToken:
userId = ""
if len(authId.split("|")) > 1:
userId = authId.split("|")[1]
info("成功获取账号token和userId")
return accessToken, userId
else:
error(f"API请求失败状态码: {response.status_code}")
else:
warning("未找到登录确认按钮")
attempts += 1
if attempts < max_attempts:
wait_time = retry_interval * attempts # 逐步增加等待时间
warning(f"{attempts} 次尝试未获取到token{wait_time}秒后重试...")
# save_screenshot(tab, f"token_attempt_{attempts}")
time.sleep(wait_time)
except Exception as e:
error(f"深度登录获取token失败: {str(e)}")
attempts += 1
# save_screenshot(tab, f"token_error_{attempts}")
if attempts < max_attempts:
wait_time = retry_interval * attempts
warning(f"将在 {wait_time} 秒后重试...")
time.sleep(wait_time)
def sign_up_account(browser, tab, account_info):
info("=============开始注册账号=============")
info(
f"账号信息: 邮箱: {account_info['email']}, 密码: {account_info['password']}, 姓名: {account_info['first_name']} {account_info['last_name']}"
)
if EMAIL_TYPE == "zmail":
EmailVerificationHandler.create_zmail_email(account_info)
tab.get(SIGN_UP_URL)
tab.wait(2)
if tab.ele("@name=cf-turnstile-response"):
error("开屏就是检测啊大佬你的IP或UA需要换一下了啊有问题了...要等一下")
try:
if tab.ele("@name=first_name"):
info("=============正在填写个人信息=============")
tab.actions.click("@name=first_name").input(account_info["first_name"])
info(f"已输入名字: {account_info['first_name']}")
time.sleep(random.uniform(1, 3))
tab.actions.click("@name=last_name").input(account_info["last_name"])
info(f"已输入姓氏: {account_info['last_name']}")
time.sleep(random.uniform(1, 3))
tab.actions.click("@name=email").input(account_info["email"])
info(f"已输入邮箱: {account_info['email']}")
time.sleep(random.uniform(1, 3))
info("=============提交个人信息=============")
tab.actions.click("@type=submit")
time.sleep(random.uniform(0.2, 1))
if (
tab.ele("verify the user is human. Please try again.")
or tab.ele("Can't verify the user is human. Please try again.")
or tab.ele("Can't verify the user is human. Please try again.")
):
info("检测到turnstile验证失败IP问题、UA问题、域名问题...正在重试...")
return "EMAIL_USED"
except Exception as e:
info(f"填写个人信息失败: {str(e)}")
return "ERROR"
handle_turnstile(tab)
if tab.ele("verify the user is human. Please try again.") or tab.ele(
"Can't verify the user is human. Please try again."
):
info("检测到turnstile验证失败正在重试...")
return "EMAIL_USED"
try:
if tab.ele("@name=password"):
info(f"设置密码:{account_info['password']}")
tab.ele("@name=password").input(account_info["password"])
time.sleep(random.uniform(1, 2))
info("提交密码...")
tab.ele("@type=submit").click()
info("密码设置成功,等待系统响应....")
except Exception as e:
info(f"密码设置失败: {str(e)}")
return "ERROR"
info("处理最终验证...")
handle_turnstile(tab)
if tab.ele("This email is not available."):
info("邮箱已被使用")
return "EMAIL_USED"
if tab.ele("Sign up is restricted."):
info("注册限制")
return "SIGNUP_RESTRICTED"
# 创建邮件处理器
email_handler = EmailVerificationHandler()
i = 0
while i < 5:
try:
time.sleep(random.uniform(0.2, 1))
if tab.ele("Account Settings"):
info("注册成功,已进入账号设置页面")
break
if tab.ele("@data-index=0"):
info("等待输入验证码...")
# 切换到邮箱标签页
code = email_handler.get_verification_code(
source_email=account_info["email"]
)
if code is None:
info("未获取到验证码...系统异常,正在退出....")
return "EMAIL_GET_CODE_FAILED"
info(f"输入验证码: {code}")
i = 0
for digit in code:
tab.ele(f"@data-index={i}").input(digit)
time.sleep(random.uniform(0.3, 0.6))
i += 1
info("验证码输入完成")
time.sleep(random.uniform(3, 5))
# 在验证码输入完成后检测是否出现了Turnstile验证
info("检查是否出现了Turnstile验证...")
try:
turnstile_element = tab.ele("@id=cf-turnstile", timeout=3)
if turnstile_element:
info("检测到验证码输入后出现Turnstile验证正在处理...")
handle_turnstile(tab)
except:
info("未检测到Turnstile验证继续下一步")
break
except Exception as e:
info(f"验证码处理失败: {str(e)}")
return "ERROR"
info("完成最终验证...")
handle_turnstile(tab)
time.sleep(random.uniform(3, 5))
info("账号注册流程完成")
return "SUCCESS"
class EmailGenerator:
def __init__(
self,
):
# 将密码生成移到这里,避免类定义时执行随机密码生成
self.default_first_name = self.generate_random_name()
self.default_last_name = self.generate_random_name()
# 从配置文件获取域名配置
self.domains = EMAIL_DOMAINS
info(f"当前可用域名: {self.domains}")
self.email = None
self.password = None
def generate_random_password(self, length=12):
"""生成随机密码 - 改进密码生成算法,确保包含各类字符"""
chars = "abcdefghijklmnopqrstuvwxyz"
upper_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
digits = "0123456789"
special = "!@#$%^&*"
# 确保密码包含至少一个大写字母、一个数字和一个特殊字符
password = [
random.choice(chars),
random.choice(upper_chars),
random.choice(digits),
random.choice(special),
]
# 添加剩余随机字符
password.extend(
random.choices(chars + upper_chars + digits + special, k=length - 4)
)
# 打乱密码顺序
random.shuffle(password)
return "".join(password)
def generate_random_name(self, length=6):
"""生成随机用户名"""
first_letter = random.choice("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
rest_letters = "".join(
random.choices("abcdefghijklmnopqrstuvwxyz", k=length - 1)
)
return first_letter + rest_letters
def generate_email(self, length=8):
"""生成随机邮箱地址,使用随机域名"""
random_str = "".join(
random.choices("abcdefghijklmnopqrstuvwxyz1234567890", k=length)
)
timestamp = str(int(time.time()))[-4:] # 使用时间戳后4位
# 随机选择一个域名
domain = random.choice(self.domains)
return f"{random_str}@{domain}"
def get_account_info(self):
"""获取账号信息,确保每次调用都生成新的邮箱和密码"""
self.email = self.generate_email()
self.password = self.generate_random_password()
return {
"email": self.email,
"password": self.password,
"first_name": self.default_first_name.capitalize(),
"last_name": self.default_last_name.capitalize(),
}
def _save_account_info(self, user, token, total_usage):
try:
from database import get_session, AccountModel
import asyncio
import time
async def save_to_db():
info(f"开始保存账号信息: {self.email}")
async with get_session() as session:
# 检查账号是否已存在
from sqlalchemy import select
result = await session.execute(
select(AccountModel).where(AccountModel.email == self.email)
)
existing_account = result.scalar_one_or_none()
if existing_account:
info(f"更新现有账号信息 (ID: {existing_account.id})")
existing_account.token = token
existing_account.user = user
existing_account.password = self.password
existing_account.usage_limit = str(total_usage)
# 如果账号状态是删除,更新为活跃
if existing_account.status == "deleted":
existing_account.status = "active"
# 不更新id保留原始id值
else:
info("创建新账号记录")
# 生成毫秒级时间戳作为id
timestamp_ms = int(time.time() * 1000)
account = AccountModel(
email=self.email,
password=self.password,
token=token,
user=user,
usage_limit=str(total_usage),
created_at=datetime.now().strftime("%Y-%m-%d %H:%M"),
status="active", # 设置默认状态为活跃
id=timestamp_ms, # 设置毫秒时间戳id
)
session.add(account)
await session.commit()
info(f"账号 {self.email} 信息保存成功")
return True
return asyncio.run(save_to_db())
except Exception as e:
info(f"保存账号信息失败: {str(e)}")
return False
def cleanup_and_exit(browser_manager=None, exit_code=0):
"""清理资源并退出程序"""
try:
if browser_manager:
info("正在关闭浏览器")
if hasattr(browser_manager, "browser"):
browser_manager.browser.quit()
current_process = psutil.Process()
children = current_process.children(recursive=True)
for child in children:
try:
child.terminate()
except:
pass
info("程序正常退出")
sys.exit(exit_code)
except Exception as e:
info(f"清理退出时发生错误: {str(e)}")
sys.exit(1)
def main():
browser_manager = None
max_retries = REGISTRATION_MAX_RETRIES # 从配置文件获取
current_retry = 0
try:
email_handler = EmailVerificationHandler()
if email_handler.check():
info('邮箱服务连接正常,开始注册!')
else:
if EMAIL_CODE_TYPE == "API":
error('邮箱服务连接失败并且验证码为API获取结束注册!')
return
else:
info('邮箱服务连接失败,并且验证码为手动输入,等待输入验证码...')
email_generator = EmailGenerator()
browser_manager = BrowserManager()
browser = browser_manager.init_browser()
while current_retry < max_retries:
try:
account_info = email_generator.get_account_info()
info(
f"初始化账号信息成功 => 邮箱: {account_info['email']}, 用户名: {account_info['first_name']}, 密码: {account_info['password']}"
)
signup_tab = browser.new_tab(LOGIN_URL)
browser.activate_tab(signup_tab)
signup_tab.run_js("try { turnstile.reset() } catch(e) { }")
result = sign_up_account(browser, signup_tab, account_info)
if result == "SUCCESS":
token, user = get_cursor_session_token(signup_tab)
info(f"获取到账号Token: {token}, 用户: {user}")
if token:
email_generator._save_account_info(user, token, TOTAL_USAGE)
info("注册流程完成")
cleanup_and_exit(browser_manager, 0)
else:
info("获取Cursor会话Token失败")
current_retry += 1
elif result in [
"EMAIL_USED",
"SIGNUP_RESTRICTED",
"VERIFY_FAILED",
"EMAIL_GET_CODE_FAILED",
]:
info(f"遇到问题: {result},尝试切换邮箱...")
continue # 使用新邮箱重试注册
else: # ERROR
info("遇到错误,准备重试...")
current_retry += 1
# 关闭标签页,准备下一次尝试
signup_tab.close()
time.sleep(2)
except Exception as e:
info(f"当前尝试发生错误: {str(e)}")
current_retry += 1
time.sleep(2)
try:
# 尝试关闭可能存在的标签页
if "signup_tab" in locals():
signup_tab.close()
except:
pass
info(f"达到最大重试次数 {max_retries},注册失败")
except Exception as e:
info(f"主程序错误: {str(e)}")
info(f"错误详情: {traceback.format_exc()}")
cleanup_and_exit(browser_manager, 1)
finally:
cleanup_and_exit(browser_manager, 1)