81 lines
4.5 KiB
Python
81 lines
4.5 KiB
Python
def get_cursor_session_token(tab, max_attempts: int = 3, retry_interval: int = 2) -> Optional[Tuple[str, str]]:
|
||
"""
|
||
获取Cursor会话token
|
||
|
||
Args:
|
||
tab: 浏览器标签页对象
|
||
max_attempts: 最大尝试次数
|
||
retry_interval: 重试间隔(秒)
|
||
|
||
Returns:
|
||
Tuple[str, str] | None: 成功返回(userId, accessToken)元组,失败返回None
|
||
"""
|
||
logging.info("开始获取会话令牌")
|
||
|
||
# 首先尝试使用UUID深度登录方式
|
||
logging.info("尝试使用深度登录方式获取token")
|
||
|
||
def _generate_pkce_pair():
|
||
"""生成PKCE验证对"""
|
||
code_verifier = secrets.token_urlsafe(43) # 生成随机的code_verifier
|
||
code_challenge_digest = hashlib.sha256(code_verifier.encode('utf-8')).digest() # 对verifier进行SHA256哈希
|
||
code_challenge = base64.urlsafe_b64encode(code_challenge_digest).decode('utf-8').rstrip('=') # Base64编码并移除填充字符
|
||
return code_verifier, code_challenge
|
||
|
||
attempts = 0
|
||
while attempts < max_attempts:
|
||
try:
|
||
verifier, challenge = _generate_pkce_pair() # 生成PKCE验证对
|
||
id = uuid.uuid4() # 生成唯一的UUID
|
||
client_login_url = f"https://www.cursor.com/cn/loginDeepControl?challenge={challenge}&uuid={id}&mode=login" # 构建深度登录URL
|
||
|
||
logging.info(f"访问深度登录URL: {client_login_url}")
|
||
tab.get(client_login_url) # 访问登录页面
|
||
save_screenshot(tab, f"deeplogin_attempt_{attempts}") # 保存登录页面截图
|
||
|
||
if tab.ele("xpath=//span[contains(text(), 'Yes, Log In')]", timeout=5): # 查找登录确认按钮
|
||
logging.info("点击确认登录按钮")
|
||
tab.ele("xpath=//span[contains(text(), 'Yes, Log In')]").click() # 点击确认登录
|
||
time.sleep(1.5) # 等待页面响应
|
||
|
||
auth_poll_url = f"https://api2.cursor.sh/auth/poll?uuid={id}&verifier={verifier}" # 构建认证轮询URL
|
||
headers = {
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Cursor/0.48.6 Chrome/132.0.6834.210 Electron/34.3.4 Safari/537.36",
|
||
"Accept": "*/*"
|
||
} # 设置请求头
|
||
|
||
logging.info(f"轮询认证状态: {auth_poll_url}")
|
||
response = requests.get(auth_poll_url, headers=headers, timeout=5) # 发送认证状态轮询请求
|
||
|
||
if response.status_code == 200: # 请求成功
|
||
data = response.json() # 解析JSON响应
|
||
accessToken = data.get("accessToken", None) # 获取访问令牌
|
||
authId = data.get("authId", "") # 获取认证ID
|
||
|
||
if accessToken: # 如果成功获取到访问令牌
|
||
userId = ""
|
||
if len(authId.split("|")) > 1: # 从authId中提取userId
|
||
userId = authId.split("|")[1]
|
||
|
||
logging.info("成功获取账号token和userId")
|
||
return userId, accessToken # 返回用户ID和访问令牌
|
||
else:
|
||
logging.error(f"API请求失败,状态码: {response.status_code}") # 记录API请求失败
|
||
else:
|
||
logging.warning("未找到登录确认按钮") # 记录未找到登录按钮
|
||
|
||
attempts += 1 # 增加尝试次数
|
||
if attempts < max_attempts: # 如果还有重试机会
|
||
wait_time = retry_interval * attempts # 计算等待时间,逐步增加
|
||
logging.warning(f"第 {attempts} 次尝试未获取到token,{wait_time}秒后重试...")
|
||
save_screenshot(tab, f"token_attempt_{attempts}") # 保存失败截图
|
||
time.sleep(wait_time) # 等待指定时间后重试
|
||
|
||
except Exception as e: # 捕获所有异常
|
||
logging.error(f"深度登录获取token失败: {str(e)}") # 记录异常信息
|
||
attempts += 1 # 增加尝试次数
|
||
save_screenshot(tab, f"token_error_{attempts}") # 保存错误截图
|
||
if attempts < max_attempts: # 如果还有重试机会
|
||
wait_time = retry_interval * attempts # 计算等待时间
|
||
logging.warning(f"将在 {wait_time} 秒后重试...")
|
||
time.sleep(wait_time) # 等待指定时间后重试 |