JWT
HTTP協議是無狀態協議,為了解決這個問題產生了cookie和session技術。
傳統的session-cookie機制
瀏覽器發起第一次請求到服務器,服務器發現瀏覽器沒有提供session id,就認為這是第一次請求,會返回一個新的session id給瀏覽器端。
瀏覽器只要不關閉,這個session id就會隨著每一次請求重新發給服務器端,服務器端查找這個session id,如果查到,就認為是同一個會話。如果沒有查到,就認為是新的請求。
session是會話級的,可以在一個session中創建很多數據,連接斷開session清除,包括session id。
session還得有過期的機制,一段時間如果沒有發起請求,就清除session。瀏覽器端也會清除響應的cookie信息。
服務器端保存著大量session信息,很消耗服務器內存,而且如果多服務器部署,還要考慮session共享的問題,比如redis、memcached等方案。
無session方案
既然服務端就是需要一個ID來表示身份,那么不使用session也可以創建一個ID返回給客戶端。
服務端生成一個標識,并使用某種算法對標識簽名。
服務端收到客戶端發來的標識,需要檢查簽名。
這種方案的缺點是,加密、解密需要消耗CPU計算資源,無法讓瀏覽器自己主動檢查過期的數據以清除。
這種技術稱作JWT(Json WEB Token)
jwt中所有數據都是明文傳輸的,只是做了base64,如果是敏感信息,請不要使用jwt。
數據簽名的目的不是為了隱藏數據,而是保證數據不被篡改,確認唯一身份。如果數據篡改了,發回到服務端,服務端使用自己的key再計算一遍,然后進行簽名校驗,一定對不上簽名。
認證是jwt最常用的場景,一旦用戶登錄成功,就會得到token,然后請求中就可以帶上這個token。
服務端校驗通過,就可以被允許訪問資源。甚至可以在不同域名中傳遞,在單點登錄(Single Sign On)中應用廣泛。
PyJWT is a Python library which allows you to encode and decode JSON Web Tokens (JWT).
命令行操作
pip3.7 install pyjwt
pyjwt --key 'secret' encode 'some'='payload'
token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzb21lIjoicGF5bG9hZCJ9.Joh1R2dYzkRvDkqv3sygm5YyK8Gi4ShZqbhK2gxcs2U
pyjwt --key 'secret' decode $token
pyjwt decode --no-verify $token
編碼
import jwt
jwt_encode = jwt.encode({'some': 'payload'}, key='secret', algorithm='HS256')
print(jwt_encode)
# b'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzb21lIjoicGF5bG9hZCJ9.Joh1R2dYzkRvDkqv3sygm5YyK8Gi4ShZqbhK2gxcs2U'
TOKEN由三部分拼接而成
- HEADER: ALGORITHM & TOKEN TYPE
- PAYLOAD: DATA
- VERIFY SIGNATURE

源碼實現過程
- 先用json.dumps對payload進行json格式轉換,并encode()轉換成bytes
- 將header轉換成bytes
- 分別使用base64對json_header和payload編碼
- 將key轉換成bytes
- 使用加密算法和key生成校驗簽名
- 使用base64對簽名編碼
- 將三部分使用b'.'拼接在一起
解碼
jwt_decode = jwt.decode(jwt_encode, key='secret', algorithms=['HS256'])
print(jwt_decode)
# {'some': 'payload'}
不校驗獲取數據
payload_no_verify = jwt.decode(jwt_encode, verify=False)
print(payload_no_verify)
# {'some': 'payload'}
不校驗獲取header
header_no_verify = jwt.get_unverified_header(jwt_encode)
print(header_no_verify)
# {'typ': 'JWT', 'alg': 'HS256'}
過期時間聲明
The “exp” (expiration time) claim identifies the expiration time on or after which the JWT MUST NOT be accepted for processing.
The processing of the “exp” claim requires that the current date/time MUST be before the expiration date/time listed in the “exp” claim.
Implementers MAY provide for some small leeway, usually no more than a few minutes, to account for clock skew.
Its value MUST be a number containing a NumericDate value.
You can pass the expiration time as a UTC UNIX timestamp (an int) or as a datetime, which will be converted into an int.
Expiration time is automatically verified in jwt.decode() and raises jwt.ExpiredSignatureError if the expiration time is in the past.
Expiration time will be compared to the current UTC time, so be sure to use a UTC timestamp or datetime.
You can turn off expiration time verification with the verify_exp parameter in the options argument.
import time
from datetime import datetime, timedelta
d1 = jwt.encode({'exp': int(time.time()) + 5}, key='secret')
d2 = jwt.encode({'exp': datetime.utcnow() + timedelta(seconds=5)}, key='secret')
# time.sleep(10)
print(jwt.decode(d1, key='secret'))
print(jwt.decode(d2, key='secret'))
# Signature has expired
手動 base64 解碼
from base64 import urlsafe_b64decode
data = {'name': 'tom', 'email': '123@qq.com'}
key = 'secret'
token = jwt.encode(payload=data, key=key)
print(token)
# b'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJuYW1lIjoidG9tIiwiZW1haWwiOiIxMjNAcXEuY29tIn0.wB5vRj2K2-_ZPMRJIrdRL4HIPpByZuNpgvLTVbNeSV4'
print(jwt.decode(token, key=key))
# {'name': 'tom', 'email': '123@qq.com'}
header, payload, signature = token.split(b'.')
print(header)
print(payload)
print(signature)
# b'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9'
# b'eyJuYW1lIjoidG9tIiwiZW1haWwiOiIxMjNAcXEuY29tIn0'
# b'wB5vRj2K2-_ZPMRJIrdRL4HIPpByZuNpgvLTVbNeSV4'
def addequal(b: bytes):
# 補齊base64編碼的等號
equals = 4 - len(b) % 4
return b + b'=' * equals
print('header =', urlsafe_b64decode(addequal(header)))
print('payload =', urlsafe_b64decode(addequal(payload)))
print('signature =', urlsafe_b64decode(addequal(signature)))
# header = b'{"typ":"JWT","alg":"HS256"}'
# payload = b'{"name":"tom","email":"123@qq.com"}'
# signature = b'\xc0\x1eoF=\x8a\xdb\xef\xd9<\xc4I"\xb7Q/\x81\xc8>\x90rf\xe3i\x82\xf2\xd3U\xb3^I^'
實際使用示例
from django.http import JsonResponse, HttpRequest, HttpResponseBadRequest
import json
from .models import User
import jwt
import bcrypt
from django.conf import settings
import time
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
# Create your views here.
key = settings.SECRET_KEY
print(settings, settings.SECRET_KEY)
expire = 24 * 60 * 60 * 365 # 過期時間
def gen_token(user_id):
data = {
'user_id': user_id,
'exp': int(time.time()) + expire,
}
return jwt.encode(payload=data, key=key).decode()
# 注冊
def reg(request: HttpRequest):
print(request, type(request))
# <WSGIRequest: POST '/user/reg/'> <class 'django.core.handlers.wsgi.WSGIRequest'>
print(request.GET)
print(request.POST)
print(request.body)
# b'{\n\t"email": "b@qq.com",\n\t"name": "b",\n\t"password": "123"\n}'
payload = json.loads(request.body) # 請求數據在request body中
print(payload)
try:
email = payload['email']
query = User.manager.filter(email=email)
# <QuerySet []> <class 'django.db.models.query.QuerySet'>
print(query.query) # 查看SQL語句
# SELECT `user`.`id`, `user`.`name`, `user`.`email`, `user`.`password` FROM `user` WHERE `user`.`email` = 1234@qq.com
if query.first():
return HttpResponseBadRequest('該郵箱已注冊')
name = payload['name']
password = payload['password']
confirm = payload['confirm']
print(password, confirm)
if password != confirm:
return HttpResponseBadRequest('兩次輸入密碼不一致')
# 添加一個user
user = User()
user.email = email
user.name = name
user.password = bcrypt.hashpw(password.encode(), bcrypt.gensalt())
user.save()
return JsonResponse({'user_id': user.id}, status=201)
except Exception as e:
return HttpResponseBadRequest(e)
# 認證
def auth(view):
def wrapper(request: HttpRequest):
# print(request.META)
token = request.META.get('HTTP_JWT')
print(token)
if not token:
return HttpResponseBadRequest('請先登陸')
try:
data = jwt.decode(token, key=key)
print(data)
user_id = data.get('user_id', -1)
user = User.manager.filter(pk=user_id).get()
request.user = user
except jwt.ExpiredSignatureError as e:
return HttpResponseBadRequest(e)
except ObjectDoesNotExist:
return HttpResponseBadRequest("user does't exist")
except MultipleObjectsReturned:
return HttpResponseBadRequest("more than one user was found")
ret = view(request)
return ret
return wrapper
# @auth
def login(request: HttpRequest):
payload = json.loads(request.body)
try:
email = payload['email']
password = payload['password']
print(email, password)
user = User.manager.filter(email=email).first()
# print(user.password)
matched = bcrypt.checkpw(password.encode(), user.password.encode())
if user and matched:
token = gen_token(user.id)
res = JsonResponse({
"user": {
"user_id": user.id,
"name": user.name,
"email": user.email
}, "token": token
})
res.set_cookie('jwt', value=token)
return res
else:
return HttpResponseBadRequest('用戶名或密碼錯誤')
# raise Exception
except:
return HttpResponseBadRequest('登錄失敗')
參考:
https://jwt.io/
https://pyjwt.readthedocs.io/en/latest/
https://github.com/jpadilla/pyjwt
https://zhuanlan.zhihu.com/p/38942172
https://auth0.com/learn/json-web-tokens/
https://developer.atlassian.com/cloud/jira/software/understanding-jwt/
https://segmentfault.com/a/1190000018058541
浙公網安備 33010602011771號