在Python后端項目FastApi中使用MongoDB進行數據處理
我在前面隨筆《在SqlSugar的開發框架中增加對低代碼EAV模型(實體-屬性-值)的WebAPI實現支持》中介紹了對于EAV數據存儲的時候,我們把變化字段的數據記錄存儲在MongoDB數據庫里面,這樣除了支持動態化字段變化外,也更好的支持對字段不同類型的查詢處理,之前隨筆介紹的是基于C#操作MongoDB數據庫的處理,由于Python后端FastApi項目的設計初衷是可以平滑更換 SqlSugar項目的Web API的,因此會涉及到在Python項目中對MongoDB的相關操作。本篇隨筆先對Python環境中操作MongoDB數據庫進行相關的介紹。
1、Python環境中操作MongoDB數據庫的準備
如果需要了解相關MongoDB數據庫的相關信息和C#開發MongoDB的內容,可以參考我的隨筆《MongoDB數據庫內容》,里面包含我對這個主題的相關介紹。
具體可以進一步參考官網里面的介紹。
https://www.mongodb.com/zh-cn/docs/
https://www.mongodb.com/zh-cn/docs/drivers/csharp/current/
https://www.mongodb.com/zh-cn/docs/languages/python/
https://www.mongodb.com/zh-cn/docs/drivers/motor/
如果我們需要下載安裝,可以根據不同的操作系統下載對應的社區版本安裝文件即可。
https://www.mongodb.com/zh-cn/products/self-managed/community-edition
安裝完成后,可以使用管理工具進行MongoDB數據庫的連接和創建,如下所示可以根據不同的需要創建不同的集合。

關系型數據庫和MongoDB的數據庫,它們的相關概念,對比關系圖如下所示。

我們安裝MongoDB數據庫后,它是駐留在Window的服務里面,創建服務并順利啟動成功后,然后就可以在系統的服務列表里查看到了,我們確認把它設置為自動啟動的Windows服務即可。

由于我們希望使用異步來操作MongoDB數據庫,推薦使用motor驅動來操作它。Motor是一個異步mongodb driver,支持異步讀寫mongodb。
具體使用我們可以參考官網的介紹:https://www.mongodb.com/zh-cn/docs/drivers/motor/
使用前,我們需要再我們FastAPI項目中安裝MongoDB和Motor的依賴模塊。就是pymonogo和motor
我的requirement.txt文件中包含下面兩個

我們如果沒有初始化安裝,通過pip進行安裝即可。
pip install pymongo motor
使用它的基礎代碼如下所示。
import motor.motor_asyncio client = motor.motor_asyncio.AsyncIOMotorClient() 或者 client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017") 或者 client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://username:password@localhost:27017/dbname')
根據MongoDB官網的說明,MongoDB的適用場景如下:
1)網站實時數據:MongoDB非常適合實時的插入,更新與查詢,并具備網站實時數據存儲所需的復制及高度伸縮性。
2)數據緩存:由于性能很高,MongoDB也適合作為信息基礎設施的緩存層。在系統重啟之后,由MongoDB搭建的持久化緩存層可以避免下層的數據源過載。
3)大尺寸、低價值數據存儲:使用傳統的關系型數據庫存儲一些數據時可能會比較昂貴,在此之前,很多時候程序員往往會選擇傳統的文件進行存儲。
4)高伸縮性場景:MongoDB非常適合由數十或數百臺服務器組成的數據庫。MongoDB的路線圖中已經包含對MapReduce引擎的內置支持。
5)對象或JSON數據存儲:MongoDB的BSON數據格式非常適合文檔化格式的存儲及查詢。
MongoDB數據庫支持常規的增刪改查等操作,其中它的 find方法很強大,可以組合很多條件查詢的方式,如下所示:
db.collection.find({ "key" : value }) 查找key=value的數據 db.collection.find({ "key" : { $gt: value } }) key > value db.collection.find({ "key" : { $lt: value } }) key < value db.collection.find({ "key" : { $gte: value } }) key >= value db.collection.find({ "key" : { $lte: value } }) key <= value db.collection.find({ "key" : { $gt: value1 , $lt: value2 } }) value1 < key <value2 db.collection.find({ "key" : { $ne: value } }) key <> value db.collection.find({ "key" : { $mod : [ 10 , 1 ] } }) 取模運算,條件相當于key % 10 == 1 即key除以10余數為1的 db.collection.find({ "key" : { $nin: [ 1, 2, 3 ] } }) 不屬于,條件相當于key的值不屬于[ 1, 2, 3 ]中任何一個 db.collection.find({ "key" : { $in: [ 1, 2, 3 ] } }) 屬于,條件相當于key等于[ 1, 2, 3 ]中任何一個 db.collection.find({ "key" : { $size: 1 } }) $size 數量、尺寸,條件相當于key的值的數量是1(key必須是數組,一個值的情況不能算是數量為1的數組) db.collection.find({ "key" : { $exists : true|false } }) $exists 字段存在,true返回存在字段key的數據,false返回不存在字度key的數據 db.collection.find({ "key": /^val.*val$/i }) 正則,類似like;“i”忽略大小寫,“m”支持多行 db.collection.find({ $or : [{a : 1}, {b : 2} ] }) $or或 (注意:MongoDB 1.5.3后版本可用),符合條件a=1的或者符合條件b=2的數據都會查詢出來 db.collection.find({ "key": value , $or : [{ a : 1 } , { b : 2 }] }) 符合條件key=value ,同時符合其他兩個條件中任意一個的數據 db.collection.find({ "key.subkey" :value }) 內嵌對象中的值匹配,注意:"key.subkey"必須加引號 db.collection.find({ "key": { $not : /^val.*val$/i } }) 這是一個與其他查詢條件組合使用的操作符,不會單獨使用。上述查詢條件得到的結果集加上$not之后就能獲得相反的集合。
比較符號說明如下:
| 符 號 | 含 義 | 示 例 |
|---|---|---|
$lt |
小于 | {'age': {'$lt': 20}} |
$gt |
大于 | {'age': {'$gt': 20}} |
$lte |
小于等于 | {'age': {'$lte': 20}} |
$gte |
大于等于 | {'age': {'$gte': 20}} |
$ne |
不等于 | {'age': {'$ne': 20}} |
$in |
在范圍內 | {'age': {'$in': [20, 23]}} |
$nin |
不在范圍內 | {'age': {'$nin': [20, 23]}} |
另外,還可以進行正則匹配查詢。例如,查詢名字以 M 開頭的學生數據,示例如下:
results = collection.find({'name': {'$regex': '^M.*'}})
這里使用 $regex 來指定正則匹配,^M.* 代表以 M 開頭的正則表達式。
多條件查詢 $and $or
# and查詢 db.collection.find({ $and : [ { "age" : {$gt : 10 }} , { "gender" : "man" } ] }) #or查詢 db.collection.find({ $or : [ {"age" : {$gt : 10 }}, { "gender" : "man"} ] }) #and查詢 和 or查詢 db.inventory.find( { $and : [ { $or : [ { price : 0.99 }, { price : 1.99 } ] }, { $or : [ { sale : true }, { qty : { $lt : 20 } } ] } ] } )
關于這些操作的更詳細用法,可以在 MongoDB 官方文檔找到: https://docs.mongodb.com/manual/reference/operator/query/。
當然還有插入更新的處理語句也是很特別的。
db.student.insert({name:'student1',subject:['arts','music']})
特別是更新操作需要說明一下,支持常規的$set方法(修改)、$unset方法(移除指定的鍵),還有原子級的$inc方法(數值增減),$rename方法(重命名字段名稱)等等,
db.users.update({"_id" : ObjectId("51826852c75fdd1d8b805801")}, {"$set" : {"hobby" :["swimming","basketball"]}} )
db.users.update({"_id" : ObjectId("51826852c75fdd1d8b805801")},{"$unset" : {"hobby" :1 }} )
db.posts.update({"_id" : ObjectId("5180f1a991c22a72028238e4")}, {"$inc":{"pageviews":1}})
db.students.update( { _id: 1 }, { $rename: { 'nickname': 'alias', 'cell': 'mobile' } }
upsert是一種特殊的更新操作,不是一個操作符。(upsert = up[date]+[in]sert),也就是如果存在則更新,否則就寫入一條新的記錄操作。這個參數是個布爾類型,默認是false。
db.users.update({age :25}, {$inc :{"age" :3}}, true)
另外,Update可以對Json的集合進行處理,如果對于subject對象是一個集合的話,插入或更新其中的字段使用下面的語句
db.student.update({name:'student5'},{$set:{subject:['music']}},{upsert:true});
如果是記錄已經存在,我們可以使用索引數值進行更新其中集合里面的數據,如下所示。
db.student.update({name:'student3'},{$set:{'subject.0':'arts'}});
2、在FastAPI項目中使用pymongo 和motor 操作MongoDB
上面我們大致介紹了一些基礎的MongoDB數據庫信息。
之前隨筆《Python 開發環境的準備以及一些常用類庫模塊的安裝》介紹過我們的FastAPI配置信息,存儲在.env文件中的,在FastAPI項目啟動的時候,根據需要進行讀取加載到對象里面。
如對于配置信息的處理,我們還可以引入 python-dotenv 和 pydantic_settings 來統一管理配置參數。我們的項目.env文件部分配置如下。

最后我們通過pydantic_settings的配置處理,獲得相關的配置對象信息,如下。

其中的MONGO_URL 就是我們具體使用的MogoDB數據庫連接信息了。
為了和數據庫連接一樣方便使用MongoDB的連接,我們可以通過在一個獨立的文件中聲明獲得MongoDB的數據庫和集合對象,并且通過緩存的方式提高使用效率。
from motor.motor_asyncio import AsyncIOMotorClient from motor.motor_asyncio import AsyncIOMotorCollection from functools import lru_cache from core.config import settings class MongoClientManager: def __init__(self, uri: str = settings.MONGO_URL): """初始化 MongoDB 客戶端管理器 Args: uri = mongodb://{MONGO_HOST}:{MONGO_PORT}/{MONGO_DB_NAME}""" self._client = AsyncIOMotorClient(uri) self._db = self._client.get_default_database() # 使用 URI 中指定的 dbname @property def db(self): return self._db def close(self): self._client.close() # 創建全局 Mongo 管理器實例 @lru_cache() def get_mongo_client() -> MongoClientManager: return MongoClientManager() @lru_cache() def get_collection(entity_model: str) -> AsyncIOMotorCollection: """ 獲取指定名稱的集合(collection),類型是 BsonDocument(等價于 Python dict) MongoDB 的集合通常在首次寫入數據時自動創建,因此你只需要獲取對應集合即可,不需要手動“創建” """ db = get_mongo_client().db return db[entity_model]
這樣,我們項目啟動的時候,自動加載配置文件,并在這里初始化MongoDBClient的異步對象和集合緩存對象。
有了這些,我們為了方便,還需要對MongoDB數據庫的操作進行一些的封裝處理,以提高我們對接口的使用遍歷,畢竟我們前面介紹到了MongoDB支持非常復雜的查詢和處理,我們往往只需要一些特殊的接口即可,因此封裝接口有利于我們對接口的使用便利性。
我們在項目的utils目錄中增加一個輔助類mongo_helper.py,用來封裝MongoDB的相關操作的。
我們截取部分代碼,如下所示。
class MongoAsyncHelper: """基于 motor 實現MongoDb異步操作,支持: insert_one / insert_many find_one / find_many update_one / update_many delete_one / delete_many 復雜條件、分頁、排序、模糊查詢等""" def __init__(self, collection: AsyncIOMotorCollection): self.collection = collection async def insert_one(self, data: Dict[str, Any]): """插入單條數據 examples: helper = MongoAsyncHelper(collection) await helper.insert_one({ "name": "John", "age": 25, "created_at": datetime.utcnow()} ) args: data: 待插入的數據 """ return await self.collection.insert_one(data) async def insert_many(self, data_list: List[Dict[str, Any]]): """插入多條數據 examples: helper = MongoAsyncHelper(collection) await helper.insert_many([ {"name": "John", "age": 25, "created_at": datetime.utcnow()}, {"name": "Mary", "age": 30, "created_at": datetime.utcnow()} ]) args: data_list: 待插入的數據列表 """ return await self.collection.insert_many(data_list) async def find_by_id(self, id: str) -> Optional[Dict[str, Any]]: """根據 _id 查詢單條數據""" return await self.find_one({"_id": id}) async def update_by_id(self, id: str, update_data: Dict[str, Any]): """根據 _id 更新單條數據""" return await self.update_one({"_id": id}, {"$set": update_data}) async def delete_by_id(self, id: str): """根據 _id 刪除單條數據""" return await self.delete_one({"_id": id}) ...............
例如,我們在前面介紹的EAV處理中,獲取MongoDB數據庫的指定實體類型(對應表)的所有集合處理,在Python的數據處理層代碼中如下所示。
async def mongo_get_all( self, db: AsyncSession, entitytype_id: str, parentid: str = None, sorting: str = None, ) -> List[dict]: """獲取實體類型的所有記錄 :param db: 數據庫會話 :param entitytype_id: 實體類型ID :param parentid: 父ID :param sorting: 排序條件,格式:name asc 或 name asc,age desc """ items = [] try: entityType: EntityType = await self.__get_entity_type(db, entitytype_id) self.logger.info(f"獲取實體類型:{entitytype_id}, {entityType.name}") if not entityType: return items # 連接 MongoDB collection = get_collection(entityType.entitymodel) mongo_helper = MongoAsyncHelper(collection) # 構建查詢條件 filter = {} if parentid is not None: filter["ParentId"] = parentid sort_by = parse_sort_string(sorting) # print(sort_by) items = await mongo_helper.find_all(filter, sort_by) return items except Exception as e: self.logger.error(f"處理失敗:{str(e)}") return items
首先通過緩存接口get_collection獲得對應實體類型的集合,然后傳遞集合到輔助類 MongoAsyncHelper后構建MongoDB輔助類,就可以利用其接口進行相關的MongoDB數據庫操作處理了。
最后通過類似的處理,結合數據庫操作和MongoDB數據庫操作,我們把EAV的接口服務遷移到了Python中FastAPI項目中了。

最后完成的FastAPI項目的EAV相關接口如下所示。

折疊相關模塊顯示如下所示。

最后切換到Winform項目上,調整接入的Web API數據源,同樣獲得一樣的界面效果即可。
產品數據表。

訂單數據表

專注于代碼生成工具、.Net/Python 框架架構及軟件開發,以及各種Vue.js的前端技術應用。著有Winform開發框架/混合式開發框架、微信開發框架、Bootstrap開發框架、ABP開發框架、SqlSugar開發框架、Python開發框架等框架產品。
??轉載請注明出處:撰寫人:伍華聰??http://www.iqidi.com?
????
浙公網安備 33010602011771號