Python異步操作MongoDB --Motor的使用
安裝#
python3 -m pip install motor
創建客戶端#
指定主機和端口號
import motor.motor_asyncio client = motor.motor_asyncio.AsyncIOMotorClient('localhost', 27017)
使用用戶名和密碼
motor.motor_asyncio.AsyncIOMotorClient('mongodb://root:123456@localhost:27017')
獲取數據庫#
MongoDB的單個實例可以支持多個獨立的數據庫。在開放式客戶端中,您可以使用點表示法或括號表示法來獲取對特定數據庫的引用:
db = client.test_database db = client['test_database']
創建對數據庫的引用不會執行I / O,也不需要 await 表達式。
獲取集合#
一個集合是一組存儲在MongoDB中的文檔,并且可以被認為是大致在關系數據庫中的表的當量。獲取Motor中的集合與獲取數據庫的工作方式相同:
collection = db.test_collection collection = db['test_collection']
就像獲取對數據庫的引用一樣,獲取對集合的引用不會產生I / O并且不需要 await 表達式。
插入文檔(insert_one)#
與pymongo一樣,Motor使用Python字典表示MongoDB文檔。要存儲在MongoDB中的文檔,在 await 表達式中調用 insert_one() :
async def do_insert(): document = {'key': 'value'} result = await db.test_collection.insert_one(document) # insert_one只能插入一條數據 print('result %s' % repr(result.inserted_id)) loop = asyncio.get_event_loop() loop.run_until_complete(do_insert()) # result ObjectId('...')
批量插入文檔(insert_many)#
async def do_insert(): result = await db.test_collection.insert_many( [{'i': i} for i in range(2000)]) # insert_many可以插入一條或多條數據,但是必須以列表(list)的形式組織數據 print('inserted %d docs' % (len(result.inserted_ids),)) loop = asyncio.get_event_loop() loop.run_until_complete(do_insert()) # inserted 2000 docs
查詢一個文檔(find_one)#
使用 find_one() 得到匹配查詢的第一個文檔。例如,要獲取密鑰“i”的值小于1的文檔:
async def do_find_one(): document = await db.test_collection.find_one({'i': {'$lt': 1}}) # find_one只能查詢一條數據 pprint.pprint(document) loop = asyncio.get_event_loop() loop.run_until_complete(do_find_one()) # {'_id': ObjectId('...'), 'i': 0}
注意:結果是一個字典匹配我們之前插入的字典。
查詢多個文檔(find)#
使用 find() 要查詢的一組文檔。 find() 沒有I / O,也不需要 await 表達式。它只是創建一個 AsyncIOMotorCursor 實例。當您調用 to_list() 或為循環執行異步時 (async for) ,查詢實際上是在服務器上執行的。
查詢 “ i ” 小于5的所有文檔:
async def do_find(): cursor = db.test_collection.find({'i': {'$lt': 5}}).sort('i') for document in await cursor.to_list(length=100): pprint.pprint(document) loop = asyncio.get_event_loop() loop.run_until_complete(do_find()) # {'_id': ObjectId('...'), 'i': 0} # {'_id': ObjectId('...'), 'i': 1} # {'_id': ObjectId('...'), 'i': 2} # {'_id': ObjectId('...'), 'i': 3} # {'_id': ObjectId('...'), 'i': 4}
一length ,調用方法 to_list 的必傳參數,防止 Motor 從緩沖中獲取的文檔數量不受限制,此處限制為100。
使用 async for 查詢所有文檔#
您可以在循環中一次處理一個文檔:async for
async def do_find(): c = db.test_collection async for document in c.find({}): # 查詢所有文檔 pprint.pprint(document) loop = asyncio.get_event_loop() loop.run_until_complete(do_find()) # {'_id': ObjectId('...'), 'i': 0} # {'_id': ObjectId('...'), 'i': 1}
您可以在開始迭代之前對查詢應用排序,跳過或限制:
async def do_find(): cursor = db.test_collection.find({'i': {'$lt': 4}}) # Modify the query before iterating cursor.sort('i', -1).skip(1).limit(2) # 對查詢應用排序(sort),跳過(skip)或限制(limit) async for document in cursor: pprint.pprint(document) loop = asyncio.get_event_loop() loop.run_until_complete(do_find()) # {'_id': ObjectId('...'), 'i': 2} # {'_id': ObjectId('...'), 'i': 1}
游標 (cursor) 實際上并不是單獨從服務器檢索每個文檔; 它可以大批量地獲取文檔。
使用count_documents()查詢集合中的文檔數量#
async def do_count(): n = await db.test_collection.count_documents({}) # 查詢集合內所有文檔數量 print('%s documents in collection' % n) n = await db.test_collection.count_documents({'i': {'$gt': 1000}}) # 按條件查詢集合內文檔數量 print('%s documents where i > 1000' % n) loop = asyncio.get_event_loop() loop.run_until_complete(do_count()) # 2000 documents in collection # 999 documents where i > 1000
更新文檔(推薦使用update_one或update_many)#
replace_one() 更改文檔。它需要兩個參數:一個指定要替換哪個文檔的查詢和一個替換文檔。該查詢遵循與 find() 或 相同的語法 find_one() 。替換一個文檔的示例:
async def do_replace(): coll = db.test_collection old_document = await coll.find_one({'i': 50}) print('found document: %s' % pprint.pformat(old_document)) _id = old_document['_id'] old_document['i'] = -1 # 修改文檔(dict)的key, value old_document['new'] = 'new' # 增加文檔(dict)的key, value del old_document['i'] # 刪除文檔(dict)的key, value result = await coll.replace_one( {'_id': _id}, old_document) # replace_one第一個參數為查詢條件, 第二個參數為更新后的文檔 print('replaced %s document' % result.modified_count) new_document = await coll.find_one({'_id': _id}) print('document is now %s' % pprint.pformat(new_document)) loop = asyncio.get_event_loop() loop.run_until_complete(do_replace()) # found document: {'_id': ObjectId('...'), 'i': 50} # replaced 1 document # document is now {'_id': ObjectId('...'), 'key': 'value'}
除了 _id 不變,replace_one() 會對修改后文檔的所有字段做更新操作, 慎用 。
update_one() 與MongoDB的修飾符運算符一起使用來更新文檔的一部分并保持其余部分不變。我們將找到“i”為51的文檔,并使用 $set 運算符將“key”設置為“value”:
async def do_update(): coll = db.test_collection result = await coll.update_one({'i': 51}, {'$set': {'key': 'value'}}) # 僅新增或更改該文檔的某個key print('updated %s document' % result.modified_count) new_document = await coll.find_one({'i': 51}) print('document is now %s' % pprint.pformat(new_document)) loop = asyncio.get_event_loop() loop.run_until_complete(do_update()) # updated 1 document # document is now {'_id': ObjectId('...'), 'i': 51, 'key': 'value'}
“key”設置為“value”,“i”仍為51。
update_one() 只影響它找到的第一個文檔,您可以使用 update_many() 更新所有文檔:
await coll.update_many({'i': {'$gt': 100}}, {'$set': {'key': 'value'}})
刪除文檔#
delete_many() 采用與語法相同的 find() 查詢。delete_many() 立即刪除所有匹配的文檔。
async def do_delete_many(): coll = db.test_collection n = await coll.count_documents({}) print('%s documents before calling delete_many()' % n) result = await db.test_collection.delete_many({'i': {'$gte': 1000}}) print('%s documents after' % (await coll.count_documents({}))) loop = asyncio.get_event_loop() loop.run_until_complete(do_delete_many()) # 2000 documents before calling delete_many() # 1000 documents after

浙公網安備 33010602011771號