88 lines
3.1 KiB
Python
88 lines
3.1 KiB
Python
import sqlite3
|
||
import os
|
||
import sys
|
||
|
||
|
||
class CursorAuthManager:
|
||
"""Cursor认证信息管理器"""
|
||
|
||
def __init__(self):
|
||
# 判断操作系统
|
||
if sys.platform == "win32": # Windows
|
||
appdata = os.getenv("APPDATA")
|
||
if appdata is None:
|
||
raise EnvironmentError("APPDATA 环境变量未设置")
|
||
self.db_path = os.path.join(
|
||
appdata, "Cursor", "User", "globalStorage", "state.vscdb"
|
||
)
|
||
elif sys.platform == "darwin": # macOS
|
||
self.db_path = os.path.abspath(
|
||
os.path.expanduser(
|
||
"~/Library/Application Support/Cursor/User/globalStorage/state.vscdb"
|
||
)
|
||
)
|
||
elif sys.platform == "linux": # Linux 和其他类Unix系统
|
||
self.db_path = os.path.abspath(
|
||
os.path.expanduser("~/.config/Cursor/User/globalStorage/state.vscdb")
|
||
)
|
||
else:
|
||
raise NotImplementedError(f"不支持的操作系统: {sys.platform}")
|
||
|
||
def update_auth(self, email=None, access_token=None, refresh_token=None):
|
||
"""
|
||
更新Cursor的认证信息
|
||
:param email: 新的邮箱地址
|
||
:param access_token: 新的访问令牌
|
||
:param refresh_token: 新的刷新令牌
|
||
:return: bool 是否成功更新
|
||
"""
|
||
updates = []
|
||
# 登录状态
|
||
updates.append(("cursorAuth/cachedSignUpType", "Auth_0"))
|
||
|
||
if email is not None:
|
||
updates.append(("cursorAuth/cachedEmail", email))
|
||
if access_token is not None:
|
||
updates.append(("cursorAuth/accessToken", access_token))
|
||
if refresh_token is not None:
|
||
updates.append(("cursorAuth/refreshToken", refresh_token))
|
||
|
||
if not updates:
|
||
print("没有提供任何要更新的值")
|
||
return False
|
||
|
||
conn = None
|
||
try:
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
|
||
for key, value in updates:
|
||
# 如果没有更新任何行,说明key不存在,执行插入
|
||
# 检查 accessToken 是否存在
|
||
check_query = "SELECT COUNT(*) FROM itemTable WHERE key = ?"
|
||
cursor.execute(check_query, (key,))
|
||
if cursor.fetchone()[0] == 0:
|
||
insert_query = "INSERT INTO itemTable (key, value) VALUES (?, ?)"
|
||
cursor.execute(insert_query, (key, value))
|
||
else:
|
||
update_query = "UPDATE itemTable SET value = ? WHERE key = ?"
|
||
cursor.execute(update_query, (value, key))
|
||
|
||
if cursor.rowcount > 0:
|
||
print(f"成功更新 {key.split('/')[-1]} : {value}")
|
||
else:
|
||
print(f"未找到 {key.split('/')[-1]} 或值未变化")
|
||
|
||
conn.commit()
|
||
return True
|
||
|
||
except sqlite3.Error as e:
|
||
print("数据库错误:", str(e))
|
||
return False
|
||
except Exception as e:
|
||
print("发生错误:", str(e))
|
||
return False
|
||
finally:
|
||
if conn:
|
||
conn.close()
|