cursor_auto_register/cursor_pro_keep_alive.py

579 lines
22 KiB
Python
Raw Normal View History

2025-05-11 23:06:57 +08:00
import sys
import psutil
import time
import random
from logger import info, warning, error
import traceback
from config import (
LOGIN_URL,
SIGN_UP_URL,
SETTINGS_URL,
EMAIL_DOMAINS,
REGISTRATION_MAX_RETRIES,
EMAIL_TYPE,
EMAIL_CODE_TYPE
)
import secrets
import hashlib
import base64
import uuid
import requests
from typing import Optional, Tuple
if sys.stdout.encoding != "utf-8":
sys.stdout.reconfigure(encoding="utf-8")
if sys.stderr.encoding != "utf-8":
sys.stderr.reconfigure(encoding="utf-8")
from browser_utils import BrowserManager
from get_email_code import EmailVerificationHandler
from datetime import datetime # 添加这行导入
TOTAL_USAGE = 0
def handle_turnstile(tab, max_retries: int = 5, retry_interval: tuple = (1, 2)) -> bool:
"""
处理Turnstile验证
Args:
tab: 浏览器标签对象
max_retries: 最大重试次数
retry_interval: 重试间隔范围(最小值, 最大值)
Returns:
bool: 验证是否成功
"""
info("=============正在检测 Turnstile 验证=============")
# 初始化重试计数器
retry_count = 0
try:
# 在最大重试次数内循环尝试验证
while retry_count < max_retries:
# 增加重试计数
retry_count += 1
info(f"正在进行 Turnstile 第 {retry_count} 次验证中...")
try:
# 检查页面状态但不直接返回先检查是否有Turnstile验证需要处理
page_ready = False
if tab.ele("@name=password"):
info("检测到密码输入页面,检查是否有验证需要处理...")
page_ready = True
elif tab.ele("@data-index=0"):
info("检测到验证码输入页面,检查是否有验证需要处理...")
page_ready = True
elif tab.ele("Account Settings"):
info("检测到账户设置页面,检查是否有验证需要处理...")
page_ready = True
# 初始化元素变量
element = None
try:
# 尝试通过层级结构定位到Turnstile验证框的容器元素
element = (
tab.ele(".main-content") # 找到 .main-content 元素
.ele("tag:div") # 找到第一个子 div
.ele("tag:div") # 找到第二个子 div
.ele("tag:div") # 找到第三个子 div
)
except Exception as e:
# 如果无法通过第一种方式找到元素,忽略异常继续执行
pass
challenge_check = None
if element:
# 如果找到了容器元素,则在其中定位验证框的输入元素
try:
challenge_check = (
element
.shadow_root.ele("tag:iframe") # 找到shadow DOM中的iframe
.ele("tag:body") # 找到iframe中的body
.sr("tag:input") # 找到body中的input元素
)
except Exception as e:
pass
else:
# 如果没有找到容器元素,则尝试另一种方式定位验证框
try:
challenge_check = (
tab.ele("@id=cf-turnstile", timeout=2) # 通过id直接找到turnstile元素
.child() # 获取其子元素
.shadow_root.ele("tag:iframe") # 找到shadow DOM中的iframe
.ele("tag:body") # 找到iframe中的body
.sr("tag:input") # 找到body中的input元素
)
except Exception as e:
pass
if challenge_check:
# 如果找到了验证输入元素,记录日志
info("检测到 Turnstile 验证,正在处理...")
# 点击前随机延迟,模拟人工操作
time.sleep(random.uniform(1, 3))
# 点击验证元素触发验证
challenge_check.click()
# 等待验证处理
time.sleep(2)
info("Turnstile 验证通过")
return True
else:
info("未检测到 Turnstile 验证")
# 如果页面已准备好且没有验证需要处理,则可以返回
if page_ready:
info("页面已准备好,没有检测到需要处理的验证")
return True
except Exception as e:
# 记录当前尝试失败的详细信息
info(f"当前验证尝试失败: {str(e)}")
# 在下一次尝试前随机延迟
time.sleep(random.uniform(*retry_interval))
# 超过最大重试次数,验证失败
error(f"Turnstile 验证次数超过最大限制 {max_retries},退出")
return False
except Exception as e:
# 捕获整个验证过程中的异常
error_msg = f"Turnstile 验证失败: {str(e)}"
error(error_msg)
return False
def get_cursor_session_token(tab, max_attempts: int = 3, retry_interval: int = 2) -> Optional[Tuple[str, str]]:
"""
获取Cursor会话token
Args:
tab: 浏览器标签页对象
max_attempts: 最大尝试次数
retry_interval: 重试间隔()
Returns:
Tuple[str, str] | None: 成功返回(userId, accessToken)元组失败返回None
"""
info("开始获取会话令牌")
# 首先尝试使用UUID深度登录方式
info("尝试使用深度登录方式获取token")
def _generate_pkce_pair():
"""生成PKCE验证对"""
code_verifier = secrets.token_urlsafe(43)
code_challenge_digest = hashlib.sha256(code_verifier.encode('utf-8')).digest()
code_challenge = base64.urlsafe_b64encode(code_challenge_digest).decode('utf-8').rstrip('=')
return code_verifier, code_challenge
attempts = 0
while attempts < max_attempts:
try:
verifier, challenge = _generate_pkce_pair()
id = uuid.uuid4()
client_login_url = f"https://www.cursor.com/cn/loginDeepControl?challenge={challenge}&uuid={id}&mode=login"
info(f"访问深度登录URL: {client_login_url}")
tab.get(client_login_url)
# save_screenshot(tab, f"deeplogin_attempt_{attempts}")
if tab.ele("xpath=//span[contains(text(), 'Yes, Log In')]", timeout=5):
info("点击确认登录按钮")
tab.ele("xpath=//span[contains(text(), 'Yes, Log In')]").click()
time.sleep(1.5)
auth_poll_url = f"https://api2.cursor.sh/auth/poll?uuid={id}&verifier={verifier}"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Cursor/0.48.6 Chrome/132.0.6834.210 Electron/34.3.4 Safari/537.36",
"Accept": "*/*"
}
info(f"轮询认证状态: {auth_poll_url}")
response = requests.get(auth_poll_url, headers=headers, timeout=5)
if response.status_code == 200:
data = response.json()
accessToken = data.get("accessToken", None)
authId = data.get("authId", "")
if accessToken:
userId = ""
if len(authId.split("|")) > 1:
userId = authId.split("|")[1]
info("成功获取账号token和userId")
return accessToken, userId
else:
error(f"API请求失败状态码: {response.status_code}")
else:
warning("未找到登录确认按钮")
attempts += 1
if attempts < max_attempts:
wait_time = retry_interval * attempts # 逐步增加等待时间
warning(f"{attempts} 次尝试未获取到token{wait_time}秒后重试...")
# save_screenshot(tab, f"token_attempt_{attempts}")
time.sleep(wait_time)
except Exception as e:
error(f"深度登录获取token失败: {str(e)}")
attempts += 1
# save_screenshot(tab, f"token_error_{attempts}")
if attempts < max_attempts:
wait_time = retry_interval * attempts
warning(f"将在 {wait_time} 秒后重试...")
time.sleep(wait_time)
def sign_up_account(browser, tab, account_info):
info("=============开始注册账号=============")
info(
f"账号信息: 邮箱: {account_info['email']}, 密码: {account_info['password']}, 姓名: {account_info['first_name']} {account_info['last_name']}"
)
if EMAIL_TYPE == "zmail":
EmailVerificationHandler.create_zmail_email(account_info)
tab.get(SIGN_UP_URL)
tab.wait(2)
if tab.ele("@name=cf-turnstile-response"):
error("开屏就是检测啊大佬你的IP或UA需要换一下了啊有问题了...要等一下")
try:
if tab.ele("@name=first_name"):
info("=============正在填写个人信息=============")
tab.actions.click("@name=first_name").input(account_info["first_name"])
info(f"已输入名字: {account_info['first_name']}")
time.sleep(random.uniform(1, 3))
tab.actions.click("@name=last_name").input(account_info["last_name"])
info(f"已输入姓氏: {account_info['last_name']}")
time.sleep(random.uniform(1, 3))
tab.actions.click("@name=email").input(account_info["email"])
info(f"已输入邮箱: {account_info['email']}")
time.sleep(random.uniform(1, 3))
info("=============提交个人信息=============")
tab.actions.click("@type=submit")
time.sleep(random.uniform(0.2, 1))
if (
tab.ele("verify the user is human. Please try again.")
or tab.ele("Can't verify the user is human. Please try again.")
or tab.ele("Can't verify the user is human. Please try again.")
):
info("检测到turnstile验证失败IP问题、UA问题、域名问题...正在重试...")
return "EMAIL_USED"
except Exception as e:
info(f"填写个人信息失败: {str(e)}")
return "ERROR"
handle_turnstile(tab)
if tab.ele("verify the user is human. Please try again.") or tab.ele(
"Can't verify the user is human. Please try again."
):
info("检测到turnstile验证失败正在重试...")
return "EMAIL_USED"
try:
if tab.ele("@name=password"):
info(f"设置密码:{account_info['password']}")
tab.ele("@name=password").input(account_info["password"])
time.sleep(random.uniform(1, 2))
info("提交密码...")
tab.ele("@type=submit").click()
info("密码设置成功,等待系统响应....")
except Exception as e:
info(f"密码设置失败: {str(e)}")
return "ERROR"
info("处理最终验证...")
handle_turnstile(tab)
if tab.ele("This email is not available."):
info("邮箱已被使用")
return "EMAIL_USED"
if tab.ele("Sign up is restricted."):
info("注册限制")
return "SIGNUP_RESTRICTED"
# 创建邮件处理器
email_handler = EmailVerificationHandler()
i = 0
while i < 5:
try:
time.sleep(random.uniform(0.2, 1))
if tab.ele("Account Settings"):
info("注册成功,已进入账号设置页面")
break
if tab.ele("@data-index=0"):
info("等待输入验证码...")
# 切换到邮箱标签页
code = email_handler.get_verification_code(
source_email=account_info["email"]
)
if code is None:
info("未获取到验证码...系统异常,正在退出....")
return "EMAIL_GET_CODE_FAILED"
info(f"输入验证码: {code}")
i = 0
for digit in code:
tab.ele(f"@data-index={i}").input(digit)
time.sleep(random.uniform(0.3, 0.6))
i += 1
info("验证码输入完成")
time.sleep(random.uniform(3, 5))
# 在验证码输入完成后检测是否出现了Turnstile验证
info("检查是否出现了Turnstile验证...")
try:
turnstile_element = tab.ele("@id=cf-turnstile", timeout=3)
if turnstile_element:
info("检测到验证码输入后出现Turnstile验证正在处理...")
handle_turnstile(tab)
except:
info("未检测到Turnstile验证继续下一步")
break
except Exception as e:
info(f"验证码处理失败: {str(e)}")
return "ERROR"
info("完成最终验证...")
handle_turnstile(tab)
time.sleep(random.uniform(3, 5))
info("账号注册流程完成")
return "SUCCESS"
class EmailGenerator:
def __init__(
self,
):
# 将密码生成移到这里,避免类定义时执行随机密码生成
self.default_first_name = self.generate_random_name()
self.default_last_name = self.generate_random_name()
# 从配置文件获取域名配置
self.domains = EMAIL_DOMAINS
info(f"当前可用域名: {self.domains}")
self.email = None
self.password = None
def generate_random_password(self, length=12):
"""生成随机密码 - 改进密码生成算法,确保包含各类字符"""
chars = "abcdefghijklmnopqrstuvwxyz"
upper_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
digits = "0123456789"
special = "!@#$%^&*"
# 确保密码包含至少一个大写字母、一个数字和一个特殊字符
password = [
random.choice(chars),
random.choice(upper_chars),
random.choice(digits),
random.choice(special),
]
# 添加剩余随机字符
password.extend(
random.choices(chars + upper_chars + digits + special, k=length - 4)
)
# 打乱密码顺序
random.shuffle(password)
return "".join(password)
def generate_random_name(self, length=6):
"""生成随机用户名"""
first_letter = random.choice("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
rest_letters = "".join(
random.choices("abcdefghijklmnopqrstuvwxyz", k=length - 1)
)
return first_letter + rest_letters
def generate_email(self, length=8):
"""生成随机邮箱地址,使用随机域名"""
random_str = "".join(
random.choices("abcdefghijklmnopqrstuvwxyz1234567890", k=length)
)
timestamp = str(int(time.time()))[-4:] # 使用时间戳后4位
# 随机选择一个域名
domain = random.choice(self.domains)
return f"{random_str}@{domain}"
def get_account_info(self):
"""获取账号信息,确保每次调用都生成新的邮箱和密码"""
self.email = self.generate_email()
self.password = self.generate_random_password()
return {
"email": self.email,
"password": self.password,
"first_name": self.default_first_name.capitalize(),
"last_name": self.default_last_name.capitalize(),
}
def _save_account_info(self, user, token, total_usage):
try:
from database import get_session, AccountModel
import asyncio
import time
async def save_to_db():
info(f"开始保存账号信息: {self.email}")
async with get_session() as session:
# 检查账号是否已存在
from sqlalchemy import select
result = await session.execute(
select(AccountModel).where(AccountModel.email == self.email)
)
existing_account = result.scalar_one_or_none()
if existing_account:
info(f"更新现有账号信息 (ID: {existing_account.id})")
existing_account.token = token
existing_account.user = user
existing_account.password = self.password
existing_account.usage_limit = str(total_usage)
# 如果账号状态是删除,更新为活跃
if existing_account.status == "deleted":
existing_account.status = "active"
# 不更新id保留原始id值
else:
info("创建新账号记录")
# 生成毫秒级时间戳作为id
timestamp_ms = int(time.time() * 1000)
account = AccountModel(
email=self.email,
password=self.password,
token=token,
user=user,
usage_limit=str(total_usage),
created_at=datetime.now().strftime("%Y-%m-%d %H:%M"),
status="active", # 设置默认状态为活跃
id=timestamp_ms, # 设置毫秒时间戳id
)
session.add(account)
await session.commit()
info(f"账号 {self.email} 信息保存成功")
return True
return asyncio.run(save_to_db())
except Exception as e:
info(f"保存账号信息失败: {str(e)}")
return False
def cleanup_and_exit(browser_manager=None, exit_code=0):
"""清理资源并退出程序"""
try:
if browser_manager:
info("正在关闭浏览器")
if hasattr(browser_manager, "browser"):
browser_manager.browser.quit()
current_process = psutil.Process()
children = current_process.children(recursive=True)
for child in children:
try:
child.terminate()
except:
pass
info("程序正常退出")
sys.exit(exit_code)
except Exception as e:
info(f"清理退出时发生错误: {str(e)}")
sys.exit(1)
def main():
browser_manager = None
max_retries = REGISTRATION_MAX_RETRIES # 从配置文件获取
current_retry = 0
try:
email_handler = EmailVerificationHandler()
if email_handler.check():
info('邮箱服务连接正常,开始注册!')
else:
if EMAIL_CODE_TYPE == "API":
error('邮箱服务连接失败并且验证码为API获取结束注册!')
return
else:
info('邮箱服务连接失败,并且验证码为手动输入,等待输入验证码...')
email_generator = EmailGenerator()
browser_manager = BrowserManager()
browser = browser_manager.init_browser()
while current_retry < max_retries:
try:
account_info = email_generator.get_account_info()
info(
f"初始化账号信息成功 => 邮箱: {account_info['email']}, 用户名: {account_info['first_name']}, 密码: {account_info['password']}"
)
signup_tab = browser.new_tab(LOGIN_URL)
browser.activate_tab(signup_tab)
signup_tab.run_js("try { turnstile.reset() } catch(e) { }")
result = sign_up_account(browser, signup_tab, account_info)
if result == "SUCCESS":
token, user = get_cursor_session_token(signup_tab)
info(f"获取到账号Token: {token}, 用户: {user}")
if token:
email_generator._save_account_info(user, token, TOTAL_USAGE)
info("注册流程完成")
cleanup_and_exit(browser_manager, 0)
else:
info("获取Cursor会话Token失败")
current_retry += 1
elif result in [
"EMAIL_USED",
"SIGNUP_RESTRICTED",
"VERIFY_FAILED",
"EMAIL_GET_CODE_FAILED",
]:
info(f"遇到问题: {result},尝试切换邮箱...")
continue # 使用新邮箱重试注册
else: # ERROR
info("遇到错误,准备重试...")
current_retry += 1
# 关闭标签页,准备下一次尝试
signup_tab.close()
time.sleep(2)
except Exception as e:
info(f"当前尝试发生错误: {str(e)}")
current_retry += 1
time.sleep(2)
try:
# 尝试关闭可能存在的标签页
if "signup_tab" in locals():
signup_tab.close()
except:
pass
info(f"达到最大重试次数 {max_retries},注册失败")
except Exception as e:
info(f"主程序错误: {str(e)}")
info(f"错误详情: {traceback.format_exc()}")
cleanup_and_exit(browser_manager, 1)
finally:
cleanup_and_exit(browser_manager, 1)