簡單文件存儲服務 去重存儲
此篇基于之前寫過的一個文件存儲的函數,最初版本是只接收數據存儲后返回文件id(這個文件id是隨機生成的),在寫完之后就有考慮加一個去重功能,因為實際用在項目里這代碼看來很呆,測試時也看到存儲了大量重復文件在系統(tǒng)中.也是最近突然有興致想到重寫一下,當然我不保證這段代碼一定正確,同時我的代碼也只是理論上的實現,實際使用也不一定適合.如果有問題歡迎您的指正,我先提供修改后的代碼,最后再是最初版本.
邏輯簡單說就是 獲取文件的SHA256值來判斷是否已經存儲過 ==> 存儲文件.
然后實際編寫下來就會感受到幾種特殊情況:
多個線程同時存儲相同文件,檢查到沒有存儲過,都一起存儲了一遍,然后u_map也插入key沖突了.
然后就使用到提前存儲到u_map中防止另一線程拿到鎖發(fā)現沒存儲過也來存儲,這時候就又出現新問題那出現存儲失敗怎么辦?另一線程已經判斷了存在直接返回了fileId了.
這時候我就再添加一個u_map用于保存文件存儲狀態(tài),在判斷是否已經有存儲的時候,再去判斷這個存儲是否完畢,如果未完畢就等待存儲完畢,再返回這里的等待就不考慮重試了.
20250827
bthread | bRPC
使用其他的mutex是否會有問題 · Issue #114 · apache/brpc
bthread 中使用 std::mutex和std::condition_variable導致線程丟失 · Issue #2590 · apache/brpc
Apache BRPC項目中跨版本鎖機制的兼容性設計解析 - GitCode博客
今天看了下文檔看到了bthread是brpc使用的M:N線程庫,然后就在想我原先在brpc服務中直接使用標準庫的鎖會不會造成一些印象,只找到一些使用標準庫出現了問題,想了想和參看倆個issue后畢竟使用的是brpc那么還是使用配套的bthread的鎖更好點,這篇的代碼我就不改了,參看下面?zhèn)z個test
brpc/test/bthread_cond_unittest.cpp at master · apache/brpc
brpc/test/bthread_mutex_unittest.cpp at master · apache/brpc
//
#include <brpc/server.h>
#include <butil/logging.h>
#include <openssl/sha.h>
//我寫的一些庫文件就不貼出來獻丑了
class FileServiceImpl : public FileService{
public:
FileServiceImpl(const std::string& base_path)
:__base_filepath(base_path){
umask(0);
mkdir(base_path.c_str(), 0775);
DEBUG("文件根目錄{}",base_path);
if (__base_filepath.back() != '/') __base_filepath.push_back('/');
}
~FileServiceImpl(){}
//
//上傳單個文件
void PutSingleFile(::google::protobuf::RpcController *controller,
const ::PutSingleFileReq *request,
::PutSingleFileRsp *response,
::google::protobuf::Closure *done){
brpc::ClosureGuard guard(done);
response->set_request_id(request->request_id());
response->mutable_file_info()->set_file_size(request->file_data().file_size());
response->mutable_file_info()->set_file_name(request->file_data().file_name());
StoreResult r = StoreFile(request->file_data().file_content());
response->set_success(r.success);
//存儲失敗
if(!r.success){
response->set_errmsg(r.errmsg);
return;
}
response->mutable_file_info()->set_file_id(r.file_id);
return ;
}
private:
/**
* 用于文件存儲狀態(tài)
*/
struct FileState
{
bool finished = false;
bool success = false;
std::string errmsg;
std::condition_variable cv;
std::mutex mtx;
};
/**
* 存儲返回,用于復用的
*/
struct StoreResult
{
bool success;
std::string file_id;
std::string errmsg;
};
// 計算文件內容的 SHA256 哈希
std::string calculateFileHash(const std::string& content) {
unsigned char hash[SHA256_DIGEST_LENGTH];
SHA256((const unsigned char*)content.data(), content.size(), hash);
return std::string(reinterpret_cast<char*>(hash), SHA256_DIGEST_LENGTH);
}
/**
* 存儲文件,包含哈希去重、并發(fā)控制、寫入邏輯
*/
StoreResult StoreFile(const std::string& file_content) {
StoreResult result;
std::string hash_file = calculateFileHash(file_content);
std::string file_id = chat_im::util::uuid();
std::shared_ptr<FileState> file_state;
bool need_wait = false;
{
std::lock_guard lock(__file_mutex);
auto ite = __file_hashMap.find(hash_file);
/**
* 可能已經存儲過,檢查是否是其他插入key值,但還沒存儲完畢
*/
if(ite != __file_hashMap.end()){
file_id = ite->second;
//判斷是否提取插入的值
auto s_ite = __writing_files.find(file_id);
if(s_ite == __writing_files.end()){
/**
* 直接返回
*/
result.success = true;
result.file_id = file_id;
return result;
}else{
/**
* 需要等待
*/
file_state = s_ite->second;
need_wait = true;
}
}else{
/**
* 沒有存儲過,提前插入map中
* 避免多個線程同時進行寫入同一文件
*/
__file_hashMap[hash_file] = file_id;
file_state = std::make_shared<FileState>();
__writing_files[file_id] = file_state;
}
}
/**
* 等到另一線程寫入完畢
*/
if (need_wait){
std::unique_lock<std::mutex> lock(file_state->mtx);
if(!file_state->cv.wait_for(lock,std::chrono::seconds(3),[&](){return file_state->finished;})){
// 等待超時處理
result.success = false;
result.errmsg = "文件處理超時";
return result;
}
result.success = file_state->success;
result.errmsg = file_state->errmsg;
result.file_id = file_id;
return result;
}
/**
* 存儲文件,
*/
std::string file_name = __base_filepath+file_id;
bool status = chat_im::util::writeFile(file_name,file_content);
{
std::lock_guard lock(file_state->mtx);
file_state->finished = true;
file_state->success = status;
if(!status){
file_state->errmsg = "寫入文件失敗";
}
}
/**
* 通知等待的線程
*/
file_state->cv.notify_all();
{
std::lock_guard lock(__file_mutex);
__writing_files.erase(file_id);
}
if(status == false){
/**
* 取消原先插入的map值
*/
std::lock_guard lock(__file_mutex);
__file_hashMap.erase(hash_file);
result.success = false;
result.errmsg = "寫入文件數據失??!";
return result;
}
result.success = true;
result.file_id = file_id;
return result;
}
private:
std::string __base_filepath;
std::unordered_map<std::string,std::string> __file_hashMap;
std::unordered_map<std::string, std::shared_ptr<FileState>> __writing_files;
std::mutex __file_mutex;
};
//
//最初版本
void PutSingleFile(::google::protobuf::RpcController *controller,
const ::PutSingleFileReq *request,
::PutSingleFileRsp *response,
::google::protobuf::Closure *done){
brpc::ClosureGuard guard(done);
response->set_request_id(request->request_id());
std::string file_id = chat_im::util::uuid();
std::string file_name = __base_filepath+file_id;
bool status = util::writeFile(file_name,request->file_data().file_content());
if(status == false){
ERROR("上傳文件請求{}:寫入失敗",request->request_id());
response->set_success(false);
response->set_errmsg("寫入文件數據失??!");
return;
}
response->set_success(true);
response->mutable_file_info()->set_file_id(file_id);
response->mutable_file_info()->set_file_size(request->file_data().file_size());
response->mutable_file_info()->set_file_name(request->file_data().file_name());
return ;
}

浙公網安備 33010602011771號