67 lines
2.1 KiB
Python
67 lines
2.1 KiB
Python
|
import requests
|
|||
|
from fastapi import FastAPI
|
|||
|
from linxyun.utils.result import Result, SysCodes
|
|||
|
from linxyun.utils.logger import get_logger
|
|||
|
from linxyun.config import Config
|
|||
|
from linxyun.sso.middleware.security import SecurityMiddleware
|
|||
|
from expiringdict import ExpiringDict
|
|||
|
import json
|
|||
|
|
|||
|
logger = get_logger(__name__)
|
|||
|
|
|||
|
class Linxyun:
|
|||
|
def __init__(self, config: Config):
|
|||
|
logger.info(f"Linxyun init:{config}")
|
|||
|
self.user_auth_dict = ExpiringDict(max_len=10000, max_age_seconds=60*60*24) # 24小时
|
|||
|
self.config = config
|
|||
|
|
|||
|
def add_security_middleware(self, app: FastAPI):
|
|||
|
app.add_middleware(SecurityMiddleware, linxyun=self)
|
|||
|
|
|||
|
def get_api_url(self, path: str) -> str:
|
|||
|
# 基础 URL 和项目名称
|
|||
|
base_url = f"{self.config.url}/{self.config.project}"
|
|||
|
|
|||
|
# 如果 path 为空,返回 base_url
|
|||
|
if not path:
|
|||
|
return base_url
|
|||
|
|
|||
|
# 如果 path 不以 / 开头,添加 /
|
|||
|
if not path.startswith("/"):
|
|||
|
path = "/" + path
|
|||
|
|
|||
|
# 如果 path 不以 .action 结尾,添加 .action
|
|||
|
if not path.endswith(".action"):
|
|||
|
path += ".action"
|
|||
|
|
|||
|
# 返回完整的 URL
|
|||
|
return base_url + path
|
|||
|
|
|||
|
def user_login_auth(self, token: str) -> Result:
|
|||
|
if not token:
|
|||
|
return Result.error(SysCodes.USER_NOT_LOGIN)
|
|||
|
url = self.get_api_url("userLoginAuth")
|
|||
|
resp = requests.post(url, json={"LoginID": token})
|
|||
|
if resp.status_code != 200:
|
|||
|
return Result.error(SysCodes.REQUEST_FAILED)
|
|||
|
json_resp = resp.json()
|
|||
|
if json_resp.get("success") is False:
|
|||
|
return Result.error(SysCodes.LOGIN_FAIL)
|
|||
|
data = json_resp.get("data")
|
|||
|
if not data:
|
|||
|
return Result.error(SysCodes.USER_NOT_LOGIN)
|
|||
|
data = json.loads(data)
|
|||
|
logger.info(f"登录成功:{data}")
|
|||
|
self.user_auth_dict[token] = data
|
|||
|
return Result.ok(data)
|
|||
|
|
|||
|
|
|||
|
def get_user_auth(self, token: str) -> Result:
|
|||
|
if token in self.user_auth_dict:
|
|||
|
return Result.ok(self.user_auth_dict[token])
|
|||
|
return self.user_login_auth(token)
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|