<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Loading

      在 django-ninja 中實現(xiàn)類似騰訊阿里云的應用鑒權(quán)機制

      前言

      本文章介紹如何使用基于 AppClient 模型的 Django-Ninja API 鑒權(quán)機制。

      這也是上次說的中臺項目衍生物

      中臺項目相關(guān)的文章,我大概還會再寫一篇

      這個系列的文章注定是沒什么人看的,畢竟還是小眾了一些

      不過我還是得寫,沒有讀者也要記錄,以后需要的時候就能用上

      PS: 本文基于使用 DjangoStarter 框架,默認讀者具備 DjangoStarter 3.0 以上版本的項目結(jié)構(gòu)、基礎(chǔ)設施

      限于篇幅關(guān)系,本文無法貼出全部代碼,有興趣的同學可以在公眾號后臺回復「API鑒權(quán)」獲取完整代碼

      概述

      這套鑒權(quán)機制提供了以下功能:

      • 多種認證方式:支持查詢參數(shù)、請求頭和 Bearer Token 三種方式
      • IP 白名單:支持單個 IP 和 CIDR 網(wǎng)段限制
      • 權(quán)限范圍控制:基于 scopes 的細粒度權(quán)限管理
      • 狀態(tài)檢查:自動驗證 AppClient 的啟用狀態(tài)
      • 安全性:包含完整的認證和授權(quán)流程

      搭配官方文檔食用更佳哦~

      https://django-ninja.dev/guides/authentication/

      使用方法

      為了能有個直觀的體驗

      我先介紹使用方法

      后面再來講實現(xiàn)

      1. 創(chuàng)建 AppClient

      首先在 Django Admin 或通過代碼創(chuàng)建一個 AppClient:

      from apps.app.models import AppClient
      
      app_client = AppClient.objects.create(
          app_id="my-app",
          app_name="我的應用",
          secret_key="your-secret-key-here",
          allowed_ips="192.168.1.0/24,10.0.0.1",  # 可選:IP 限制
          scopes="project:read,project:write,user:read",  # 可選:權(quán)限范圍
          status=AppClient.Status.ACTIVE
      )
      

      2. 在 API 中使用認證

      from ninja import Router
      from core.authentication import (
         api_key_auth,
         api_key_header_auth,
         api_key_bearer_auth,
         require_app_client_scopes,
         AppClientScopes,
      )
      
      router = Router()
      
      
      # 使用查詢參數(shù)認證
      @router.get("/data", auth=api_key_auth)
      def get_data(request):
         app_client = request.auth
         return {"data": "success", "client": app_client.app_name}
      
      
      # 使用請求頭認證
      @router.get("/info", auth=api_key_header_auth)
      def get_info(request):
         return {"info": "success"}
      
      
      # 使用 Bearer Token 認證
      @router.get("/status", auth=api_key_bearer_auth)
      def get_status(request):
         return {"status": "success"}
      

      3. 添加權(quán)限控制

      # 需要特定權(quán)限
      @router.get("/projects", auth=app_client_api_key)
      @require_app_client_scopes([AppClientScopes.PROJECT_READ])
      def list_projects(request):
          return {"projects": []}
      
      # 需要多個權(quán)限
      @router.post("/projects", auth=app_client_api_key)
      @require_app_client_scopes([AppClientScopes.PROJECT_WRITE, AppClientScopes.USER_READ])
      def create_project(request):
          return {"message": "項目創(chuàng)建成功"}
      
      # 手動檢查權(quán)限
      @router.get("/custom-check", auth=app_client_api_key)
      def custom_permission_check(request):
          from core.authentication import AppClientScopeChecker
          
          if AppClientScopeChecker.has_any_scope(request, [AppClientScopes.PROJECT_READ, AppClientScopes.PROJECT_WRITE]):
              return {"access": "granted"}
          else:
              return {"access": "denied"}
      

      認證方式

      1. 查詢參數(shù)認證 (APIKeyQuery)

      使用方式:

      GET /api/endpoint?api_key=your-secret-key
      

      代碼示例:

      from core.authentication import api_key_auth
      
      
      @router.get("/data", auth=api_key_auth)
      def get_data(request):
         return {"data": "success"}
      

      2. 請求頭認證 (APIKeyHeader)

      使用方式:

      GET /api/endpoint
      X-API-Key: your-secret-key
      

      代碼示例:

      from core.authentication import api_key_header_auth
      
      
      @router.get("/data", auth=api_key_header_auth)
      def get_data(request):
         return {"data": "success"}
      

      3. Bearer Token 認證 (HttpBearer)

      使用方式:

      GET /api/endpoint
      Authorization: Bearer your-secret-key
      

      代碼示例:

      from core.authentication import api_key_bearer_auth
      
      
      @router.get("/data", auth=api_key_bearer_auth)
      def get_data(request):
         return {"data": "success"}
      

      權(quán)限范圍 (Scopes) 系統(tǒng)

      事先說明,scopes 方式只是實現(xiàn)起來最簡單,但不代表最合適

      畢竟在 Django 框架中,本來就有一套權(quán)限體系了,我也有想過接入那套權(quán)限體系,可以很方便地在后臺編輯每個應用的權(quán)限

      不過因為實現(xiàn)起來會稍微麻煩一點點,為了快速上線,我就先自己搞了個 scopes 系統(tǒng)了~

      后續(xù)肯定是要試試 接入 Django 權(quán)限體系 的~!

      權(quán)限格式

      權(quán)限范圍使用 resource:action 格式,例如:

      • project:read - 項目讀取權(quán)限
      • project:write - 項目寫入權(quán)限
      • user:* - 用戶相關(guān)所有權(quán)限
      • * - 超級管理員權(quán)限

      預定義權(quán)限常量

      from core.authentication import AppClientScopes
      
      # 項目權(quán)限
      AppClientScopes.PROJECT_READ     # "project:read"
      AppClientScopes.PROJECT_WRITE    # "project:write"
      AppClientScopes.PROJECT_DELETE   # "project:delete"
      AppClientScopes.PROJECT_ALL      # "project:*"
      
      # 用戶權(quán)限
      AppClientScopes.USER_READ        # "user:read"
      AppClientScopes.USER_WRITE       # "user:write"
      AppClientScopes.USER_DELETE      # "user:delete"
      AppClientScopes.USER_ALL         # "user:*"
      
      # 超級權(quán)限
      AppClientScopes.SUPER_ADMIN      # "*"
      

      權(quán)限檢查方法

      1. 裝飾器方式(推薦)

      from core.authentication import require_app_client_scopes, AppClientScopes
      
      @router.get("/projects", auth=app_client_api_key)
      @require_app_client_scopes([AppClientScopes.PROJECT_READ])
      def list_projects(request):
          return {"projects": []}
      

      2. 手動檢查方式

      from core.authentication import AppClientScopeChecker
      
      @router.get("/data", auth=app_client_api_key)
      def get_data(request):
          # 檢查是否具有所有指定權(quán)限
          if AppClientScopeChecker.check_scopes(request, [AppClientScopes.PROJECT_READ]):
              return {"data": "success"}
          
          # 檢查是否具有任意一個權(quán)限
          if AppClientScopeChecker.has_any_scope(request, [AppClientScopes.PROJECT_READ, AppClientScopes.PROJECT_WRITE]):
              return {"data": "partial access"}
          
          # 獲取客戶端所有權(quán)限
          scopes = AppClientScopeChecker.get_client_scopes(request)
          return {"error": "權(quán)限不足", "your_scopes": scopes}
      

      IP 白名單

      配置 IP 限制

      在 AppClient 的 allowed_ips 字段中配置允許的 IP 地址:

      # 單個 IP
      app_client.allowed_ips = "192.168.1.100"
      
      # 多個 IP
      app_client.allowed_ips = "192.168.1.100,10.0.0.1,203.0.113.5"
      
      # CIDR 網(wǎng)段
      app_client.allowed_ips = "192.168.1.0/24,10.0.0.0/8"
      
      # 混合配置
      app_client.allowed_ips = "192.168.1.100,10.0.0.0/8,203.0.113.0/24"
      
      # 不限制 IP(留空或 None)
      app_client.allowed_ips = None
      

      IP 獲取邏輯

      系統(tǒng)會按以下優(yōu)先級獲取客戶端 IP:

      1. HTTP_X_FORWARDED_FOR 頭部的第一個 IP(適用于代理/負載均衡環(huán)境)
      2. REMOTE_ADDR(直接連接的 IP)

      錯誤處理

      認證失敗

      當認證失敗時,會返回 HTTP 401 錯誤:

      {
          "detail": "Unauthorized"
      }
      

      權(quán)限不足

      當權(quán)限檢查失敗時,會返回 HTTP 403 錯誤:

      {
          "detail": "缺少權(quán)限: project:write"
      }
      

      自定義錯誤處理

      from core.authentication import AppClientPermissionError
      
      @router.get("/custom", auth=app_client_api_key)
      def custom_endpoint(request):
          if not some_condition:
              raise AppClientPermissionError("自定義權(quán)限錯誤信息")
          return {"data": "success"}
      

      測試 API

      項目提供了完整的測試 API,可以用來驗證認證和權(quán)限功能:

      # 測試 API Key 認證
      GET /api/app/test/api-key?api_key=your-secret-key
      
      # 測試請求頭認證
      GET /api/app/test/api-key-header
      X-API-Key: your-secret-key
      
      # 測試 Bearer Token 認證
      GET /api/app/test/bearer
      Authorization: Bearer your-secret-key
      
      # 測試權(quán)限范圍
      GET /api/app/test/scopes/project-read?api_key=your-secret-key
      GET /api/app/test/scopes/project-write?api_key=your-secret-key
      GET /api/app/test/scopes/multiple?api_key=your-secret-key
      

      最佳實踐

      以下是一些最佳實踐的建議

      無論是使用什么技術(shù)、框架,實際開發(fā)中都可以參考一下

      1. 密鑰管理

      • 使用強密鑰(建議 32 字符以上)
      • 定期輪換密鑰
      • 不要在代碼中硬編碼密鑰
      • 使用環(huán)境變量或安全的配置管理系統(tǒng)

      2. 權(quán)限設計

      • 遵循最小權(quán)限原則
      • 使用細粒度的權(quán)限范圍
      • 定期審查和清理不必要的權(quán)限
      • 為不同的應用場景創(chuàng)建不同的 AppClient

      3. IP 限制

      • 在生產(chǎn)環(huán)境中啟用 IP 白名單
      • 使用 CIDR 網(wǎng)段而不是單個 IP(便于擴展)
      • 考慮 CDN 和代理的 IP 轉(zhuǎn)發(fā)

      4. 監(jiān)控和日志

      import logging
      
      logger = logging.getLogger(__name__)
      
      @router.get("/sensitive-data", auth=app_client_api_key)
      @require_app_client_scopes([AppClientScopes.PROJECT_READ])
      def get_sensitive_data(request):
          app_client = request.auth
          logger.info(f"AppClient {app_client.app_id} accessed sensitive data")
          return {"data": "sensitive information"}
      

      集成到現(xiàn)有 API

      添加路由

      config/apis.py 中添加 app 路由:

      from apps.app.apis import router as app_router
      
      api.add_router('app', app_router)
      

      混合認證

      這是 django-ninja 提供的新功能

      可以在同一個 API 中同時支持多種認證方式:

      from ninja.security import django_auth
      from core.authentication import api_key_auth
      
      
      # 同時支持 Django 用戶認證和 AppClient 認證
      @router.get("/user-data", auth=[django_auth, api_key_auth])
      def get_data(request):
         return {"user": request.user.username}
      

      更新主 API 配置

      除此之外,還可以在主 API 配置全局認證和其他功能

      from django.contrib.admin.views.decorators import staff_member_required
      
      api = NinjaAPI(
          title=f'{settings.DJANGO_STARTER["project_info"]["name"]} APIs',
          description=settings.DJANGO_STARTER["project_info"]["description"],
          renderer=JSONRespRenderer(),
          csrf=True,
          auth=[django_auth, api_key_auth, api_key_header_auth, api_key_bearer_auth],
          urls_namespace='api',
          docs=Swagger(settings={"persistAuthorization": True}),
          docs_decorator=staff_member_required,
      )
      

      這里簡單介紹一下

      我這段配置:

      • 使用 auth=[django_auth, api_key_auth, api_key_header_auth, api_key_bearer_auth] 支持多種認證方式(其實還有一個JWT的,不過我這里沒考慮)
      • 使用 docs_decorator=staff_member_required, 配置了訪問swagger文檔必須是已登錄的 staff_member

      實現(xiàn)

      OK,最后再介紹一下具體實現(xiàn) (應該沒人會堅持看到這么后面吧…)

      限于篇幅關(guān)系,本文無法貼出全部代碼,有興趣的同學可以在公眾號后臺回復「API鑒權(quán)」獲取完整代碼

      模型

      首先是 AppClient 模型,代碼位于 src/apps/app/models.py 內(nèi)

      除了字段定義,還添加了一些方法用于 獲取權(quán)限范圍列表獲取允許的 IP 地址列表 之類的功能

      按照DDD思想來講,這個模型算是一個 充血模型 ??

      import uuid
      from django.db import models
      from django.core.validators import RegexValidator
      from django_starter.db.models import ModelExt
      
      class AppClient(ModelExt):
          """應用客戶端模型,用于管理第三方應用的訪問權(quán)限"""
      
          class Status(models.TextChoices):
              ACTIVE = 'active', '啟用'
              INACTIVE = 'inactive', '禁用'
      
          id = models.BigAutoField(
              primary_key=True,
              verbose_name='主鍵ID',
              help_text='系統(tǒng)自動生成的唯一標識符',
              db_comment='物理主鍵,自增ID'
          )
      
          guid = models.UUIDField(
              default=uuid.uuid4,
              unique=True,
              editable=False,
              verbose_name='全局唯一標識',
              help_text='系統(tǒng)自動生成的UUID,用作邏輯主鍵',
              db_comment='邏輯主鍵,GUID'
          )
      
          app_id = models.CharField(
              max_length=100,
              unique=True,
              verbose_name='應用標識',
              help_text='用于標識調(diào)用方系統(tǒng)的唯一ID,建議使用有意義的命名',
              db_comment='應用標識,用于標識調(diào)用方系統(tǒng)',
              validators=[
                  RegexValidator(
                      regex=r'^[a-zA-Z0-9_-]+$',
                      message='應用標識只能包含字母、數(shù)字、下劃線和連字符'
                  )
              ]
          )
      
          app_name = models.CharField(
              max_length=200,
              verbose_name='應用名稱',
              help_text='調(diào)用方應用的顯示名稱或備注信息',
              db_comment='應用名稱,調(diào)用方的名稱或備注信息'
          )
      
          secret_key = models.CharField(
              max_length=100,
              verbose_name='密鑰',
              help_text='用于API簽名驗證的密鑰,請妥善保管',
              db_comment='密鑰,用于簽名驗證',
          )
      
          allowed_ips = models.TextField(
              blank=True,
              null=True,
              verbose_name='允許訪問的IP',
              help_text='允許訪問的IP地址列表,多個IP用逗號分隔,留空表示不限制',
              db_comment='允許訪問的 IP 列表,逗號分隔'
          )
      
          scopes = models.TextField(
              blank=True,
              null=True,
              verbose_name='權(quán)限范圍',
              help_text='應用的權(quán)限范圍,格式如:project:read,project:write',
              db_comment='權(quán)限范圍,如 project:read,project:write'
          )
      
          status = models.CharField(
              max_length=10,
              choices=Status.choices,
              default=Status.ACTIVE,
              verbose_name='狀態(tài)',
              help_text='應用的啟用狀態(tài)',
              db_comment='啟用/禁用狀態(tài)'
          )
      
          class Meta:
              db_table = 'app_client'
              verbose_name = '應用客戶端'
              verbose_name_plural = '應用客戶端'
              ordering = ['-create_time']
              indexes = [
                  models.Index(fields=['app_id']),
                  models.Index(fields=['status']),
                  models.Index(fields=['create_time']),
              ]
      
          def __str__(self):
              return f'{self.app_name} ({self.app_id})'
      
          def is_active(self):
              """檢查應用是否處于活躍狀態(tài)"""
              # todo 為什么這個 is_deleted 會變成 str 啊??
              # return self.status == self.Status.ACTIVE and not self.is_deleted
              return self.status == self.Status.ACTIVE
      
          def get_scopes_list(self):
              """獲取權(quán)限范圍列表"""
              if not self.scopes:
                  return []
              return [scope.strip() for scope in self.scopes.split(',') if scope.strip()]
      
          def get_allowed_ips_list(self):
              """獲取允許的IP地址列表"""
              if not self.allowed_ips:
                  return []
              return [ip.strip() for ip in self.allowed_ips.split(',') if ip.strip()]
      
          def save(self, *args, **kwargs):
              """重寫save方法,添加自定義邏輯"""
              # 確保app_name不為空
              if not self.app_name:
                  self.app_name = self.app_id
      
              super().save(*args, **kwargs)
      

      測試接口

      添加一些測試接口,方便后續(xù)調(diào)試

      代碼位于 src/apps/app/apis/client/apis.py

      from typing import List
      from ninja import Router
      from django.http import HttpRequest
      from core.authentication import (
          api_key_auth,
          api_key_header_auth,
          api_key_bearer_auth,
          require_app_client_scopes,
          AppClientScopes,
          AppClientScopeChecker,
      )
      from apps.app.models import AppClient
      from .schemas import AppClientSchema, AppClientCreateSchema
      
      router = Router(tags=['AppClient API'])
      
      @router.get("/test/api-key", auth=api_key_auth)
      def test_api_key_auth(request: HttpRequest):
          """測試API Key認證(查詢參數(shù)方式)
          
          使用方式: GET /api/app/test/api-key?api_key=your_secret_key
          """
          app_client = request.auth
          return {
              "message": "API Key認證成功",
              "app_id": app_client.app_id,
              "app_name": app_client.app_name,
              "scopes": app_client.get_scopes_list(),
          }
      
      
      @router.get("/test/api-key-header", auth=api_key_header_auth)
      def test_api_key_header_auth(request: HttpRequest):
          """測試API Key認證(請求頭方式)
          
          使用方式: GET /api/app/test/api-key-header
          Headers: X-API-Key: your_secret_key
          """
          app_client = request.auth
          return {
              "message": "API Key Header認證成功",
              "app_id": app_client.app_id,
              "app_name": app_client.app_name,
              "scopes": app_client.get_scopes_list(),
          }
      
      
      @router.get("/test/bearer", auth=api_key_bearer_auth)
      def test_bearer_auth(request: HttpRequest):
          """測試Bearer Token認證
          
          使用方式: GET /api/app/test/bearer
          Headers: Authorization: Bearer your_secret_key
          """
          app_client = request.auth
          return {
              "message": "Bearer Token認證成功",
              "app_id": app_client.app_id,
              "app_name": app_client.app_name,
              "scopes": app_client.get_scopes_list(),
          }
      
      
      @router.get("/test/scopes/project-read", auth=api_key_auth)
      @require_app_client_scopes([AppClientScopes.PROJECT_READ])
      def test_project_read_scope(request: HttpRequest):
          """測試項目讀取權(quán)限
          
          需要權(quán)限: project:read
          """
          return {
              "message": "項目讀取權(quán)限驗證成功",
              "required_scope": AppClientScopes.PROJECT_READ,
              "client_scopes": AppClientScopeChecker.get_client_scopes(request),
          }
      
      
      @router.get("/test/scopes/project-write", auth=api_key_auth)
      @require_app_client_scopes([AppClientScopes.PROJECT_WRITE])
      def test_project_write_scope(request: HttpRequest):
          """測試項目寫入權(quán)限
          
          需要權(quán)限: project:write
          """
          return {
              "message": "項目寫入權(quán)限驗證成功",
              "required_scope": AppClientScopes.PROJECT_WRITE,
              "client_scopes": AppClientScopeChecker.get_client_scopes(request),
          }
      
      
      @router.get("/test/scopes/multiple", auth=api_key_auth)
      @require_app_client_scopes([AppClientScopes.PROJECT_READ, AppClientScopes.USER_READ])
      def test_multiple_scopes(request: HttpRequest):
          """測試多個權(quán)限要求
          
          需要權(quán)限: project:read AND user:read
          """
          return {
              "message": "多權(quán)限驗證成功",
              "required_scopes": [AppClientScopes.PROJECT_READ, AppClientScopes.USER_READ],
              "client_scopes": AppClientScopeChecker.get_client_scopes(request),
          }
      
      
      @router.get("/test/scopes/any", auth=api_key_auth)
      def test_any_scope(request: HttpRequest):
          """測試任意權(quán)限檢查
          
          檢查是否具有項目或用戶相關(guān)的任意權(quán)限
          """
          has_project_access = AppClientScopeChecker.has_any_scope(
              request, [AppClientScopes.PROJECT_READ, AppClientScopes.PROJECT_WRITE]
          )
          has_user_access = AppClientScopeChecker.has_any_scope(
              request, [AppClientScopes.USER_READ, AppClientScopes.USER_WRITE]
          )
          
          return {
              "message": "權(quán)限檢查完成",
              "has_project_access": has_project_access,
              "has_user_access": has_user_access,
              "client_scopes": AppClientScopeChecker.get_client_scopes(request),
          }
      
      
      @router.get("/info", auth=api_key_auth)
      def get_client_info(request: HttpRequest):
          """獲取當前認證的AppClient信息"""
          app_client = request.auth
          return {
              "app_id": app_client.app_id,
              "app_name": app_client.app_name,
              "status": app_client.status,
              "scopes": app_client.get_scopes_list(),
              "allowed_ips": app_client.get_allowed_ips_list(),
              "create_time": app_client.create_time,
          }
      
      
      @router.get("/clients", auth=api_key_auth)
      @require_app_client_scopes([AppClientScopes.SUPER_ADMIN])
      def list_app_clients(request: HttpRequest) -> List[dict]:
          """列出所有AppClient(需要超級管理員權(quán)限)
          
          需要權(quán)限: *
          """
          clients = AppClient.objects.filter(is_deleted=False)
          return [
              {
                  "app_id": client.app_id,
                  "app_name": client.app_name,
                  "status": client.status,
                  "scopes": client.get_scopes_list(),
                  "create_time": client.create_time,
              }
              for client in clients
          ]
      

      管理命令

      為了方便使用,我又添加了倆命令

      顧名思義,創(chuàng)建新的 AppClient列出所有

      • src/apps/app/management/commands/create_app_client.py
      • src/apps/app/management/commands/list_app_clients.py

      限于篇幅原因,無法貼出全部代碼,有興趣的同學可以在公眾號后臺回復「API鑒權(quán)」獲取完整代碼

      核心邏輯

      關(guān)于鑒權(quán)的核心邏輯,我單獨創(chuàng)建一個 core package 來存放

      目錄結(jié)構(gòu)如下

       core
       ├─ exceptions
       │  ├─ __init__.py
       │  └─ permissions.py
       ├─ authentication
       │  ├─ __init__.py
       │  ├─ README.md
       │  ├─ permissions.py
       │  ├─ app.py
       │  └─ api_key.py
       └─ __init__.py
      

      最關(guān)鍵的代碼就是 src/core/authentication/app.py 中的這幾個認證實例

      代碼中有詳細注釋,一看就能理解。

      如果覺得這個還比較復雜,后續(xù)我再介紹一個極簡的實現(xiàn)方式。

      class AppClientAuthMixin:
          """AppClient認證混入類,提供通用的認證邏輯"""
          
          def _get_client_ip(self, request: HttpRequest) -> str:
              """獲取客戶端真實IP地址"""
              x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
              if x_forwarded_for:
                  ip = x_forwarded_for.split(',')[0].strip()
              else:
                  ip = request.META.get('REMOTE_ADDR')
              return ip
          
          def _validate_ip_access(self, app_client: AppClient, request: HttpRequest) -> bool:
              """驗證IP訪問權(quán)限"""
              allowed_ips = app_client.get_allowed_ips_list()
              if not allowed_ips:  # 如果沒有設置IP限制,則允許所有IP
                  return True
              
              client_ip = self._get_client_ip(request)
              
              for allowed_ip in allowed_ips:
                  try:
                      # 支持單個IP和CIDR網(wǎng)段
                      if '/' in allowed_ip:
                          network = ipaddress.ip_network(allowed_ip, strict=False)
                          if ipaddress.ip_address(client_ip) in network:
                              return True
                      else:
                          if client_ip == allowed_ip:
                              return True
                  except (ipaddress.AddressValueError, ValueError):
                      # 忽略無效的IP格式
                      continue
              
              return False
          
          def _authenticate_app_client(self, request: HttpRequest, key: str) -> Optional[AppClient]:
              """認證AppClient的通用邏輯"""
              try:
                  app_client = AppClient.objects.get(secret_key=key)
                  
                  # 檢查應用是否處于活躍狀態(tài)
                  if not app_client.is_active():
                      return None
                  
                  # 驗證IP訪問權(quán)限
                  if not self._validate_ip_access(app_client, request):
                      return None
                  
                  # 將權(quán)限范圍添加到request中,供后續(xù)權(quán)限檢查使用
                  request.app_client_scopes = app_client.get_scopes_list()
                  
                  return app_client
                  
              except AppClient.DoesNotExist:
                  return None
      
      
      class AppClientApiKey(AppClientAuthMixin, APIKeyQuery):
          """基于查詢參數(shù)的AppClient API Key認證"""
          param_name = "api_key"
      
          def authenticate(self, request: HttpRequest, key: str) -> Optional[AppClient]:
              return self._authenticate_app_client(request, key)
      
      
      class AppClientApiKeyHeader(AppClientAuthMixin, APIKeyHeader):
          """基于請求頭的AppClient API Key認證"""
          param_name = "X-API-Key"
      
          def authenticate(self, request: HttpRequest, key: str) -> Optional[AppClient]:
              return self._authenticate_app_client(request, key)
      
      
      class AppClientBearer(AppClientAuthMixin, HttpBearer):
          """基于Bearer Token的AppClient認證"""
      
          def authenticate(self, request: HttpRequest, token: str) -> Optional[AppClient]:
              return self._authenticate_app_client(request, token)
      
      
      # 創(chuàng)建認證實例,可以在API中直接使用
      api_key_auth = AppClientApiKey()
      api_key_header_auth = AppClientApiKeyHeader()
      api_key_bearer_auth = AppClientBearer()
      

      極簡實現(xiàn)方式

      這套完善的鑒權(quán)邏輯,如果你覺得太復雜了

      沒關(guān)系,我這還做了個極簡版本,去掉了權(quán)限、IP限制等亂七八糟的東西,只校驗 api-key 是否有效

      實現(xiàn)代碼在 src/core/authentication/api_key.py

      自定義的 ApiKey 類繼承自 ninja.security.APIKeyQuery 類,表明 api_key 參數(shù)需要放在 GET query params

      from ninja.security import APIKeyQuery
      from apps.app.models import AppClient
      
      
      class ApiKey(APIKeyQuery):
          param_name = "api_key"
      
          def authenticate(self, request, key):
              try:
                  c = AppClient.objects.get(secret_key=key)
                  print(f'api key authenticated: {c.secret_key}, app id: {c.app_id}')
                  return c
              except AppClient.DoesNotExist:
                  pass
      

      使用也很簡單

      from apps.app.models import AppClient
      from core.authentication.api_key import ApiKey
      
      @router.get("/data", auth=ApiKey())
      def get_data(request):
         assert isinstance(request.auth, AppClient)
         return {"data": "success"}
      

      客戶端請求的時候需要在 GET query params 里帶上 AppClientsecret_key

      校驗通過的話,ninja 會自動在 request.auth 添加 AppClient 實例,在接口代碼里可以直接獲取這個實例。

      小結(jié)

      這篇文章居然寫了兩萬多字??

      這還是在刪減了很多代碼的情況下…

      再啰嗦一次,有興趣的同學可以在公眾號后臺回復「API鑒權(quán)」獲取完整代碼

      這么長的文章,大概不會有人有耐心看到最后吧~

      溜了,溜了~

      posted @ 2025-07-10 12:22  程序設計實驗室  閱讀(257)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲精品欧美综合二区| 99久久亚洲综合精品成人网| 老色批国产在线观看精品| 2020国产欧洲精品网站| 国产精品va在线观看无码不卡| 亚洲精品有码在线观看| 在线观看成人av天堂不卡| 免费观看的AV毛片的网站不卡| 国产首页一区二区不卡| 风流老熟女一区二区三区| 国产美女自慰在线观看| 免费的特黄特色大片| 尹人香蕉久久99天天拍| 国产精品推荐手机在线| 亚洲中文字幕日产无码成人片| 高清中文字幕国产精品| 国产精品天干天干综合网| 欧美xxxxx在线观看| 亚洲av高清一区二区三| 亚洲区一区二区三区亚洲| 精品人妻免费看一区二区三区| 性视频一区| 四虎成人精品在永久在线| 久久精品国产6699国产精| 亚洲国产中文字幕精品| 亚洲超碰97无码中文字幕| 日韩放荡少妇无码视频| 亚洲日本精品一区二区| 人人妻人人做人人爽夜欢视频| 国产呦交精品免费视频| 韩国美女福利视频一区二区 | 国产精品久久久久影院色| 在线中文字幕亚洲日韩2020| 日土县| 国产99精品成人午夜在线| 久久亚洲精品无码播放| 久久99精品久久久久麻豆| 国产精品粉嫩嫩在线观看| 色偷偷www.8888在线观看| 日本强好片久久久久久aaa| 日本免费人成视频在线观看|