cursor_auto_register/cursor_pro_keep_alive_backup.py

496 lines
18 KiB
Python
Raw Permalink Normal View History

2025-05-11 23:06:57 +08:00
import sys
import psutil
import time
import random
from logger import info, warning, error
import traceback
from config import (
LOGIN_URL,
SIGN_UP_URL,
SETTINGS_URL,
EMAIL_DOMAINS,
REGISTRATION_MAX_RETRIES,
EMAIL_TYPE,
EMAIL_CODE_TYPE
)
if sys.stdout.encoding != "utf-8":
sys.stdout.reconfigure(encoding="utf-8")
if sys.stderr.encoding != "utf-8":
sys.stderr.reconfigure(encoding="utf-8")
from browser_utils import BrowserManager
from get_email_code import EmailVerificationHandler
from datetime import datetime # 添加这行导入
TOTAL_USAGE = 0
def handle_turnstile(tab):
info("=============正在检测 Turnstile 验证=============")
max_count = 5
try:
count = 1
while True:
if count > max_count:
error("Turnstile 验证次数超过最大限制,退出")
return False
info(f"正在进行 Turnstile 第 {count} 次验证中...")
try:
# 检查页面状态但不直接返回先检查是否有Turnstile验证需要处理
page_ready = False
if tab.ele("@name=password"):
info("检测到密码输入页面,检查是否有验证需要处理...")
page_ready = True
elif tab.ele("@data-index=0"):
info("检测到验证码输入页面,检查是否有验证需要处理...")
page_ready = True
elif tab.ele("Account Settings"):
info("检测到账户设置页面,检查是否有验证需要处理...")
page_ready = True
# 即使页面已经准备好也检查是否有Turnstile验证需要处理
info("检测 Turnstile 验证...")
try:
challengeCheck = (
tab.ele("@id=cf-turnstile", timeout=2)
.child()
.shadow_root.ele("tag:iframe")
.ele("tag:body")
.sr("tag:input")
)
if challengeCheck:
info("检测到 Turnstile 验证,正在处理...")
challengeCheck.click()
time.sleep(2)
info("Turnstile 验证通过")
return True
else:
info("未检测到 Turnstile 验证")
except:
pass
# 如果页面已准备好且没有验证需要处理,则可以返回
if page_ready:
info("页面已准备好,没有检测到需要处理的验证")
break
except:
pass
time.sleep(random.uniform(1, 2))
count += 1
return True # 返回True表示页面已准备好
except Exception as e:
info(f"Turnstile 验证失败: {str(e)}")
return False
def get_cursor_session_token(tab, max_attempts=5, retry_interval=3):
try:
tab.get(SETTINGS_URL)
time.sleep(5)
try:
usage_selector = (
"css:div.col-span-2 > div > div > div > div > "
"div:nth-child(1) > div.flex.items-center.justify-between.gap-2 > "
"span.font-mono.text-sm\\/\\[0\\.875rem\\]"
)
usage_ele = tab.ele(usage_selector)
total_usage = "unknown"
if usage_ele:
total_usage = usage_ele.text.split("/")[-1].strip()
global TOTAL_USAGE
TOTAL_USAGE = total_usage
info(f"使用限制: {total_usage}")
else:
warning("未能找到使用量元素")
except Exception as e:
warning(f"获取使用量信息失败: {str(e)}")
# 继续执行,不要因为获取使用量失败而中断整个流程
info("获取Cookie中...")
attempts = 0
while attempts < max_attempts:
try:
cookies = tab.cookies()
for cookie in cookies:
if cookie.get("name") == "WorkosCursorSessionToken":
user = cookie["value"].split("%3A%3A")[0]
token = cookie["value"].split("%3A%3A")[1]
info(f"获取到账号Token: {token}, 用户: {user}")
return token, user
attempts += 1
if attempts < max_attempts:
warning(
f"未找到Cursor会话Token重试中... ({attempts}/{max_attempts})"
)
time.sleep(retry_interval)
else:
info("未找到Cursor会话Token已达到最大尝试次数")
except Exception as e:
info(f"获取Token出错: {str(e)}")
attempts += 1
if attempts < max_attempts:
info(
f"重试获取Token等待时间: {retry_interval}秒,尝试次数: {attempts}/{max_attempts}"
)
time.sleep(retry_interval)
return False
except Exception as e:
warning(f"获取Token过程出错: {str(e)}")
return False
def sign_up_account(browser, tab, account_info):
info("=============开始注册账号=============")
info(
f"账号信息: 邮箱: {account_info['email']}, 密码: {account_info['password']}, 姓名: {account_info['first_name']} {account_info['last_name']}"
)
if EMAIL_TYPE == "zmail":
EmailVerificationHandler.create_zmail_email(account_info)
tab.get(SIGN_UP_URL)
tab.wait(2)
if tab.ele("@name=cf-turnstile-response"):
error("开屏就是检测啊大佬你的IP或UA需要换一下了啊有问题了...要等一下")
try:
if tab.ele("@name=first_name"):
info("=============正在填写个人信息=============")
tab.actions.click("@name=first_name").input(account_info["first_name"])
info(f"已输入名字: {account_info['first_name']}")
time.sleep(random.uniform(1, 3))
tab.actions.click("@name=last_name").input(account_info["last_name"])
info(f"已输入姓氏: {account_info['last_name']}")
time.sleep(random.uniform(1, 3))
tab.actions.click("@name=email").input(account_info["email"])
info(f"已输入邮箱: {account_info['email']}")
time.sleep(random.uniform(1, 3))
info("=============提交个人信息=============")
tab.actions.click("@type=submit")
time.sleep(random.uniform(0.2, 1))
if (
tab.ele("verify the user is human. Please try again.")
or tab.ele("Can't verify the user is human. Please try again.")
or tab.ele("Cant verify the user is human. Please try again.")
):
info("检测到turnstile验证失败IP问题、UA问题、域名问题...正在重试...")
return "EMAIL_USED"
except Exception as e:
info(f"填写个人信息失败: {str(e)}")
return "ERROR"
handle_turnstile(tab)
if tab.ele("verify the user is human. Please try again.") or tab.ele(
"Can't verify the user is human. Please try again."
):
info("检测到turnstile验证失败正在重试...")
return "EMAIL_USED"
try:
if tab.ele("@name=password"):
info(f"设置密码:{account_info['password']}")
tab.ele("@name=password").input(account_info["password"])
time.sleep(random.uniform(1, 2))
info("提交密码...")
tab.ele("@type=submit").click()
info("密码设置成功,等待系统响应....")
except Exception as e:
info(f"密码设置失败: {str(e)}")
return "ERROR"
info("处理最终验证...")
handle_turnstile(tab)
if tab.ele("This email is not available."):
info("邮箱已被使用")
return "EMAIL_USED"
if tab.ele("Sign up is restricted."):
info("注册限制")
return "SIGNUP_RESTRICTED"
# 创建邮件处理器
email_handler = EmailVerificationHandler()
i = 0
while i < 5:
try:
time.sleep(random.uniform(0.2, 1))
if tab.ele("Account Settings"):
info("注册成功,已进入账号设置页面")
break
if tab.ele("@data-index=0"):
info("等待输入验证码...")
# 切换到邮箱标签页
code = email_handler.get_verification_code(
source_email=account_info["email"]
)
if code is None:
info("未获取到验证码...系统异常,正在退出....")
return "EMAIL_GET_CODE_FAILED"
info(f"输入验证码: {code}")
i = 0
for digit in code:
tab.ele(f"@data-index={i}").input(digit)
time.sleep(random.uniform(0.3, 0.6))
i += 1
info("验证码输入完成")
time.sleep(random.uniform(3, 5))
# 在验证码输入完成后检测是否出现了Turnstile验证
info("检查是否出现了Turnstile验证...")
try:
turnstile_element = tab.ele("@id=cf-turnstile", timeout=3)
if turnstile_element:
info("检测到验证码输入后出现Turnstile验证正在处理...")
handle_turnstile(tab)
except:
info("未检测到Turnstile验证继续下一步")
break
except Exception as e:
info(f"验证码处理失败: {str(e)}")
return "ERROR"
info("完成最终验证...")
handle_turnstile(tab)
time.sleep(random.uniform(3, 5))
info("账号注册流程完成")
return "SUCCESS"
class EmailGenerator:
def __init__(
self,
):
# 将密码生成移到这里,避免类定义时执行随机密码生成
self.default_first_name = self.generate_random_name()
self.default_last_name = self.generate_random_name()
# 从配置文件获取域名配置
self.domains = EMAIL_DOMAINS
info(f"当前可用域名: {self.domains}")
self.email = None
self.password = None
def generate_random_password(self, length=12):
"""生成随机密码 - 改进密码生成算法,确保包含各类字符"""
chars = "abcdefghijklmnopqrstuvwxyz"
upper_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
digits = "0123456789"
special = "!@#$%^&*"
# 确保密码包含至少一个大写字母、一个数字和一个特殊字符
password = [
random.choice(chars),
random.choice(upper_chars),
random.choice(digits),
random.choice(special),
]
# 添加剩余随机字符
password.extend(
random.choices(chars + upper_chars + digits + special, k=length - 4)
)
# 打乱密码顺序
random.shuffle(password)
return "".join(password)
def generate_random_name(self, length=6):
"""生成随机用户名"""
first_letter = random.choice("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
rest_letters = "".join(
random.choices("abcdefghijklmnopqrstuvwxyz", k=length - 1)
)
return first_letter + rest_letters
def generate_email(self, length=8):
"""生成随机邮箱地址,使用随机域名"""
random_str = "".join(
random.choices("abcdefghijklmnopqrstuvwxyz1234567890", k=length)
)
timestamp = str(int(time.time()))[-4:] # 使用时间戳后4位
# 随机选择一个域名
domain = random.choice(self.domains)
return f"{random_str}@{domain}"
def get_account_info(self):
"""获取账号信息,确保每次调用都生成新的邮箱和密码"""
self.email = self.generate_email()
self.password = self.generate_random_password()
return {
"email": self.email,
"password": self.password,
"first_name": self.default_first_name.capitalize(),
"last_name": self.default_last_name.capitalize(),
}
def _save_account_info(self, user, token, total_usage):
try:
from database import get_session, AccountModel
import asyncio
import time
async def save_to_db():
info(f"开始保存账号信息: {self.email}")
async with get_session() as session:
# 检查账号是否已存在
from sqlalchemy import select
result = await session.execute(
select(AccountModel).where(AccountModel.email == self.email)
)
existing_account = result.scalar_one_or_none()
if existing_account:
info(f"更新现有账号信息 (ID: {existing_account.id})")
existing_account.token = token
existing_account.user = user
existing_account.password = self.password
existing_account.usage_limit = str(total_usage)
# 如果账号状态是删除,更新为活跃
if existing_account.status == "deleted":
existing_account.status = "active"
# 不更新id保留原始id值
else:
info("创建新账号记录")
# 生成毫秒级时间戳作为id
timestamp_ms = int(time.time() * 1000)
account = AccountModel(
email=self.email,
password=self.password,
token=token,
user=user,
usage_limit=str(total_usage),
created_at=datetime.now().strftime("%Y-%m-%d %H:%M"),
status="active", # 设置默认状态为活跃
id=timestamp_ms, # 设置毫秒时间戳id
)
session.add(account)
await session.commit()
info(f"账号 {self.email} 信息保存成功")
return True
return asyncio.run(save_to_db())
except Exception as e:
info(f"保存账号信息失败: {str(e)}")
return False
def cleanup_and_exit(browser_manager=None, exit_code=0):
"""清理资源并退出程序"""
try:
if browser_manager:
info("正在关闭浏览器")
if hasattr(browser_manager, "browser"):
browser_manager.browser.quit()
current_process = psutil.Process()
children = current_process.children(recursive=True)
for child in children:
try:
child.terminate()
except:
pass
info("程序正常退出")
sys.exit(exit_code)
except Exception as e:
info(f"清理退出时发生错误: {str(e)}")
sys.exit(1)
def main():
browser_manager = None
max_retries = REGISTRATION_MAX_RETRIES # 从配置文件获取
current_retry = 0
try:
email_handler = EmailVerificationHandler()
if email_handler.check():
info('邮箱服务连接正常,开始注册!')
else:
if EMAIL_CODE_TYPE == "API":
error('邮箱服务连接失败并且验证码为API获取结束注册!')
return
else:
info('邮箱服务连接失败,并且验证码为手动输入,等待输入验证码...')
email_generator = EmailGenerator()
browser_manager = BrowserManager()
browser = browser_manager.init_browser()
while current_retry < max_retries:
try:
account_info = email_generator.get_account_info()
info(
f"初始化账号信息成功 => 邮箱: {account_info['email']}, 用户名: {account_info['first_name']}, 密码: {account_info['password']}"
)
signup_tab = browser.new_tab(LOGIN_URL)
browser.activate_tab(signup_tab)
signup_tab.run_js("try { turnstile.reset() } catch(e) { }")
result = sign_up_account(browser, signup_tab, account_info)
if result == "SUCCESS":
token, user = get_cursor_session_token(signup_tab)
info(f"获取到账号Token: {token}, 用户: {user}")
if token:
email_generator._save_account_info(user, token, TOTAL_USAGE)
info("注册流程完成")
cleanup_and_exit(browser_manager, 0)
else:
info("获取Cursor会话Token失败")
current_retry += 1
elif result in [
"EMAIL_USED",
"SIGNUP_RESTRICTED",
"VERIFY_FAILED",
"EMAIL_GET_CODE_FAILED",
]:
info(f"遇到问题: {result},尝试切换邮箱...")
continue # 使用新邮箱重试注册
else: # ERROR
info("遇到错误,准备重试...")
current_retry += 1
# 关闭标签页,准备下一次尝试
signup_tab.close()
time.sleep(2)
except Exception as e:
info(f"当前尝试发生错误: {str(e)}")
current_retry += 1
time.sleep(2)
try:
# 尝试关闭可能存在的标签页
if "signup_tab" in locals():
signup_tab.close()
except:
pass
info(f"达到最大重试次数 {max_retries},注册失败")
except Exception as e:
info(f"主程序错误: {str(e)}")
info(f"错误详情: {traceback.format_exc()}")
cleanup_and_exit(browser_manager, 1)
finally:
cleanup_and_exit(browser_manager, 1)