linxyun-sso/linxyun/core.py

67 lines
2.1 KiB
Python
Raw Normal View History

2024-12-16 14:12:07 +08:00
import requests
from fastapi import FastAPI
from linxyun.utils.result import Result, SysCodes
from linxyun.utils.logger import get_logger
from linxyun.config import Config
from linxyun.sso.middleware.security import SecurityMiddleware
from expiringdict import ExpiringDict
import json
logger = get_logger(__name__)
class Linxyun:
def __init__(self, config: Config):
logger.info(f"Linxyun init{config}")
self.user_auth_dict = ExpiringDict(max_len=10000, max_age_seconds=60*60*24) # 24小时
self.config = config
def add_security_middleware(self, app: FastAPI):
app.add_middleware(SecurityMiddleware, linxyun=self)
def get_api_url(self, path: str) -> str:
# 基础 URL 和项目名称
base_url = f"{self.config.url}/{self.config.project}"
# 如果 path 为空,返回 base_url
if not path:
return base_url
# 如果 path 不以 / 开头,添加 /
if not path.startswith("/"):
path = "/" + path
# 如果 path 不以 .action 结尾,添加 .action
if not path.endswith(".action"):
path += ".action"
# 返回完整的 URL
return base_url + path
def user_login_auth(self, token: str) -> Result:
if not token:
return Result.error(SysCodes.USER_NOT_LOGIN)
url = self.get_api_url("userLoginAuth")
resp = requests.post(url, json={"LoginID": token})
if resp.status_code != 200:
return Result.error(SysCodes.REQUEST_FAILED)
json_resp = resp.json()
if json_resp.get("success") is False:
return Result.error(SysCodes.LOGIN_FAIL)
data = json_resp.get("data")
if not data:
return Result.error(SysCodes.USER_NOT_LOGIN)
data = json.loads(data)
logger.info(f"登录成功:{data}")
self.user_auth_dict[token] = data
return Result.ok(data)
def get_user_auth(self, token: str) -> Result:
if token in self.user_auth_dict:
return Result.ok(self.user_auth_dict[token])
return self.user_login_auth(token)