/ python 生产实战 60 秒系统安全认证实战 /
上节主要讲解了目前主流的认证规范/协议以及对 JWT 进行了深入的研究和分析并在最后给出了在生产环境中如何去生成一个有效的 Token,基于 Python 语言那在生产环境中是如何进行有效的安全认证的呢?上节我们也基于 JWT 的 Token 的认证过程进行了登陆认证、请求认证的理论分析以及用图示的方式给出了数据的流向,本节我们再带大家从代码层面走一次流程,一方面加深大家对上节理论部分的理解,另一方面也是给大家在做工程的过程中提供一套"模版"快速应用在项目中
1
准备工具
1.1
密码安全
为了数据安全,我们利用 PassLib 对入库的用户密码进行加密处理,推荐的加密算法是"Bcrypt"。我们需要安装依赖包:
pip install passlib pip install bcrypt
简单的介绍一下这两个库: 1.passlib 是 python2&3 的密码散列库,它提供超过 30 种密码散列算法的跨平台实现,以及作为管理现有密码哈希的框架。它被设计成有用的 对于范围广泛的任务,从验证/etc/shadow 中找到的散列到 为多用户应用程序提供全强度密码哈希。2.bcrypt 模块是一个用于在 Python 中生成强哈希值的库。 注意:若对于以上两个库的详细内容比较感兴趣的小伙伴可以自行到 python 官网查看相关资料,我们本节的核心是做整个流程的实现这个具体的库内容就先不做详细介绍了。
2
用户登陆流程
用户通过终端发送 username 和 password 到后端。后端收到数据后,进行一下操作:1.用户信息校验:查询当前系统是否存在该用户,以及密码是否正确 2.如果用户存在,则生成 JWT token 并返回,JWT payload 中可以携带自定义数据
# -*- encoding: utf-8 -*- from datetime import datetime, timedelta from typing import Optional from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm import jwt from pydantic import BaseModel from passlib.context import CryptContext # to get a string like this run: # openssl rand -hex 32 SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 25 # 模拟数据库数据 fake_users_db = { "hiashiniu": { "username": "hiashiniu", "full_name": "HaiShiNiu", "email": "haishiniu@bbs.com", "hashed_password": "$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC", "disabled": False, } } class Token(BaseModel): access_token: str token_type: str class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None class UserInDB(User): hashed_password: str pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") app = FastAPI() # 校验密码 def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) # 密码哈希 def get_password_hash(password): # pwd_context.hash('123456789') # '$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC' return pwd_context.hash(password) # 模拟从数据库读取用户信息 def get_user(db, username: str): user_dict = db.get(username,None) if user_dict: return UserInDB(**user_dict) # 用户信息校验:username和password分别校验 def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) print(user) if not user: return False if not verify_password(password, user.hashed_password): return False return user # 生成token,带有过期时间 def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() # 有一个实效性 默认的时间25分钟 if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt @app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): # 首先校验用户信息 print(form_data.password) print(form_data.username) user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # 生成并返回token信息 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}
3
请求数据流
终端获取到 token 信息后,必须在后续请求的 Authorization 头信息中带有 Bearer token,才能被允许访问。我们添加一个校验函数,对请求的合法性进行校验,读取 token 内容解析并进行验证。
# -*- encoding: utf-8 -*- async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except jwt.PyJWTError as ex: print(ex) raise credentials_exception user = get_user(fake_users_db, username=username) if user is None: raise credentials_exception return user @app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_user)): return current_user
4
效果展示
本小节我们整合上述代码进行一下效果展示
# -*- encoding: utf-8 -*- from datetime import datetime, timedelta from typing import Optional from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm import jwt from pydantic import BaseModel from passlib.context import CryptContext # to get a string like this run: # openssl rand -hex 32 SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 25 # 模拟数据库数据 fake_users_db = { "hiashiniu": { "username": "hiashiniu", "full_name": "HaiShiNiu", "email": "haishiniu@bbs.com", "hashed_password": "$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC", "disabled": False, } } class Token(BaseModel): access_token: str token_type: str class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None class UserInDB(User): hashed_password: str pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") app = FastAPI() # 校验密码 def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) # 密码哈希 def get_password_hash(password): # pwd_context.hash('123456789') # '$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC' return pwd_context.hash(password) # 模拟从数据库读取用户信息 def get_user(db, username: str): user_dict = db.get(username,None) if user_dict: return UserInDB(**user_dict) # 用户信息校验:username和password分别校验 def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user # 生成token,带有过期时间 def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() # 有一个实效性 默认的时间25分钟 if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt @app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): # 首先校验用户信息 print(form_data.password) print(form_data.username) user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # 生成并返回token信息 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except jwt.PyJWTError as ex: print(ex) raise credentials_exception user = get_user(fake_users_db, username=username) if user is None: raise credentials_exception return user @app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_user)): return current_user
1.使用命令启动项目:
uvicorn main:app --reload
2.使用 Postman 模拟登陆获取验证 Token
3.使用 Postman 模拟携带 Token 获取正常逻辑信息
5
总结
本节核心:从代码层面实战了 登陆认证、请求认证的数据流转,让我们对数据安全有了新的认识。
原创不易,只愿能帮助那些需要这些内容的同行或刚入行的小伙伴,你的每次 点赞、分享 都是我继续创作下去的动力,我希望能在推广 python 技术的道路上尽我一份力量,欢迎在评论区向我提问,我都会一一解答,记得一键三连支持一下哦!
AnalyticDB for MySQL是云端托管的PB级高并发低延时数据仓库 通过AnalyticDB for...
云计算服务正在以前所未有的速度在各行各业快速普及,成为IT应用的最主流实现形...
场景描述 最近使用 Redis 遇到了一个类似分布式锁的场景,跟 Redis 实现分布式锁...
日前 阿里云云效联合阿里云大学团队 面向全国高校学子正式启动了83行代码重构大...
客户介绍 闲鱼是依托阿里电商体系的前台型业务,有非常独特的业务特点和用户诉求...
前言 语言的内存管理是语言设计的一个重要方面。它是决定语言性能的重要因素。无...
本文转载自微信公众号「见贤思编程」,作者泰斗贤若如 。转载本文请联系见贤思编...
文本作者:刘晓国,Elastic 公司社区布道师。新加坡国立大学硕士,西北工业大学...
鉴于近期加密货币大涨,导致很多小(韭)白(菜)纷纷入场,然后很多人都在问显卡挖...
近期进展 在 ffmpeg-go init 之后,项目也收到了一些关注,还有几个同学发邮件探...