【解決方案】Java 互聯(lián)網(wǎng)項(xiàng)目中消息通知系統(tǒng)的設(shè)計(jì)與實(shí)現(xiàn)(下)
前言
書接上回,消息通知系統(tǒng)(notification-system)作為一個(gè)獨(dú)立的微服務(wù),完整地負(fù)責(zé)了 App 端內(nèi)所有消息通知相關(guān)的后端功能實(shí)現(xiàn)。該系統(tǒng)既需要與文章系統(tǒng)、訂單系統(tǒng)、會(huì)員系統(tǒng)等相關(guān)聯(lián),也需要和其它業(yè)務(wù)系統(tǒng)相關(guān)聯(lián),是一個(gè)偏底層的通用服務(wù)系統(tǒng)。
App 端內(nèi)的消息通知類型常見有這幾項(xiàng):評(píng)論通知、點(diǎn)贊通知、收藏通知、訂單通知、活動(dòng)通知、個(gè)人中心相關(guān)通知等。該系統(tǒng)在可拓展性、高性能、較高可用性、數(shù)據(jù)一致性等方面有較高要求,最終目的是提升用戶粘性、加強(qiáng) App 與用戶的互動(dòng)、支撐核心業(yè)務(wù)的發(fā)展。
文章的(上)篇將從需求分析、數(shù)據(jù)模型設(shè)計(jì)、關(guān)鍵流程設(shè)計(jì)這 3 部分來(lái)說(shuō)明,(下)篇將從技術(shù)選型、后端接口設(shè)計(jì)、關(guān)鍵邏輯實(shí)現(xiàn)這 3 部分來(lái)進(jìn)行說(shuō)明。
四、技術(shù)選型
我將該系統(tǒng)需要使用到的關(guān)鍵技術(shù)選型做成表格,方便梳理:
- 可以用 Spirng Cloud 或者 Spirng Cloud Alibaba,哪個(gè)習(xí)慣用哪個(gè),只要是能打包成一個(gè)可運(yùn)行的微服務(wù)即可;
- 也可以用非關(guān)系型數(shù)據(jù)庫(kù)如 MongoDB 來(lái)代替 MySQL,表與表之間的關(guān)系不密切的前提下,性能會(huì)更高;
- Redis 拿來(lái)做緩存中間件去存儲(chǔ)非結(jié)構(gòu)化的一些數(shù)據(jù)是非常合適的,很多場(chǎng)景下,突出的性能和便捷的 API 是它的優(yōu)勢(shì);
- MQ 其實(shí)是選用的,適合較為復(fù)雜的項(xiàng)目拿來(lái)異步/解耦,既可以 kafka 也可以 RabbitMQ,RocketMQ 是阿里親生的,控制臺(tái)用起來(lái)也方便;
- 其它開源依賴最好使用 apache 的頂級(jí)項(xiàng)目或者 Spring 官方的,像 hutool 這種第三方的包其實(shí)不太推薦,安全風(fēng)險(xiǎn)可能會(huì)比較高。
五、后端接口設(shè)計(jì)
作為一個(gè)偏底層的公共服務(wù),基本上都會(huì)先由上游的業(yè)務(wù)系統(tǒng)進(jìn)行調(diào)用,再服務(wù)于用戶(即 App 端)。下面設(shè)計(jì)兩個(gè) Controller 分別針對(duì)業(yè)務(wù)端和 App 端,大家可以先參考一下接口規(guī)范,也寫了總體的思路注釋,關(guān)鍵邏輯會(huì)在下一節(jié)再展開講。
5.1業(yè)務(wù)系統(tǒng)接口
暴露給業(yè)務(wù)系統(tǒng)的有 3 個(gè)接口:
- 獲取通知配置
- 發(fā)送通知
- 撤回通知
@RestController
@RequestMapping("notice/api")
public class NoticeApiController {
@Resource
private NotificationService notificationService;
/**
* 新增通知,業(yè)務(wù)系統(tǒng)用
* @param dto
* @return 消息系統(tǒng)唯一 id
*/
@PostMapping("/add")
public Response<Long> addNotice(@Valid @RequestBody AddNoticeDTO dto){
//業(yè)務(wù)方調(diào)用該接口前需要先根據(jù) sourceId 確認(rèn)來(lái)源,實(shí)現(xiàn)就是先入數(shù)據(jù)庫(kù),再入 Redis
return ResponseBuilder.buildSuccess(this.notificationService.addNotice(dto));
}
/**
* 撤回通知(同批量撤回),業(yè)務(wù)系統(tǒng)用
* @param idList,需要撤回的消息主鍵 id 集合
* @return 是否成功:true-成功,false-失敗
*/
@PostMapping("/recall")
public Response<Boolean> recallNotice(@RequestBody List<Long> idList){
//撤回只需要考慮先更新數(shù)據(jù)庫(kù),后更新 Redis
return ResponseBuilder.buildSuccess(this.notificationService.recallNotice(idList));
}
/**
* 獲取通知配置
* @param sourceId 業(yè)務(wù)系統(tǒng)標(biāo)識(shí)
* @return 配置詳情信息
*/
@GetMapping("/getNoticeConfig")
public Response<NotificationConfig> getNoticeConfig(@RequestParam(value = "noticeId") String sourceId){
//每個(gè)業(yè)務(wù)系統(tǒng)調(diào)用前需要校驗(yàn)通知配置,以防非法調(diào)用
return ResponseBuilder.buildSuccess(this.notificationService.getNoticeConfig(sourceId));
}
}
5.2App 端接口
開放給 App 端使用的有 2 個(gè)接口:
- 獲取用戶未讀消息總數(shù)
- 獲取用戶消息列表
@RestController
@RequestMapping("notice/app")
public class NoticeAppController {
@Resource
private NotificationService notificationService;
/**
* 獲取用戶未讀消息總數(shù)
*/
@Auth
@GetMapping("/num")
public Response<NoticeNumVO> getMsgNum() {
//App 端的用戶唯一 uuid
String userUuid = "";
return ResponseBuilder.buildSuccess(this.notificationService.getMsgNum(userUuid));
}
/**
* 獲取用戶消息列表
*
* @param queryDate:查詢時(shí)間 queryDate
* @param pageIndex:頁(yè)碼,1開始
* @param pageSize:每頁(yè)大小
* @param superType:消息父類型,1-評(píng)論、點(diǎn)贊、系統(tǒng)消息,2-通知,3-私信,4-客服消息
*/
@Auth
@GetMapping("/list/{queryDate}/{pageIndex}/{pageSize}/{superType}")
public Response<List<Notification>> getNoticeList(@PathVariable String queryDate, @PathVariable Integer pageIndex,
@PathVariable Integer pageSize, @PathVariable Integer superType) throws ParseException {
//App 端的用戶唯一 uuid
String userUuid = "";
Date dateStr = DateUtils.parseDate(queryDate, new String[]{"yyyyMMddHHmmss"});
return ResponseBuilder.buildSuccess(this.notificationService.getNoticeList(userUuid, dateStr, pageIndex, pageSize, superType));
}
}
六、關(guān)鍵邏輯實(shí)現(xiàn)
本小節(jié)會(huì)針對(duì) APP 端的兩個(gè)接口進(jìn)行詳細(xì)講解,未讀消息數(shù)和消息列表的實(shí)現(xiàn)需要 Redis + MySQL 的緊密配合。
6.1Redis存儲(chǔ)結(jié)構(gòu)
下面先著重介紹一下本系統(tǒng)的 Redis 緩存結(jié)構(gòu)設(shè)計(jì),全局只使用 Hash 結(jié)構(gòu),新增消息時(shí)+1,撤回消息時(shí)-1,已讀消息時(shí)做算術(shù)更新:
說(shuō)明:
-
Redis-key 是固定 String 常量 "sysName.notice.num.key";
-
Hash-key 為 App 端用戶唯一的 userUuid;
-
Hash-value 為該用戶接收的消息總數(shù),新增 +1,撤回 -1。
如果大家對(duì)于 Redis 的基本結(jié)構(gòu)還不太了解,參考下我的這篇博客:http://www.rzrgm.cn/CodeBlogMan/p/17816699.html
下面是關(guān)鍵實(shí)現(xiàn)步驟的代碼示例:
-
新增消息
//先入 MySQL Notification notification = this.insertNotice(dto); //再入 Redis redisTemplate.opsForHash().increment(RedisKey, dto.getTargetUserUuid(), 1); -
撤回消息
//先更新 MySQL this.updateById(notification); //再更新 Redis redisTemplate.opsForHash().increment(RedisKey, userUuid, -1);
注意:
寫操作和更新操作都是先操作數(shù)據(jù)庫(kù),然后再同步入 Redis。原因:數(shù)據(jù)庫(kù)里的數(shù)據(jù)是源頭,且存的是結(jié)構(gòu)化的持久性數(shù)據(jù);Redis 只是作為緩存,發(fā)揮 Redis 讀取速度快的優(yōu)點(diǎn),存儲(chǔ)的是一些 size 不大的熱點(diǎn)數(shù)據(jù)。
6.2已讀消息處理
已讀和未讀其實(shí)就是兩種狀態(tài),Redis 里一開始存儲(chǔ)的都是未讀數(shù),當(dāng)用戶點(diǎn)擊查看列表時(shí),前端會(huì)調(diào)用后端的消息列表接口,消息列表直接查數(shù)據(jù)庫(kù)(記錄了已讀和未讀狀態(tài)),此時(shí)同步更新 Redis 里的未讀消息數(shù),那么此時(shí):未讀消息數(shù) = Redis總數(shù) - MySQL已讀消息數(shù)。
下面的代碼說(shuō)得比較清楚了:
-
查詢未讀消息數(shù)
Integer num; //先讀 redis,沒有再讀數(shù)據(jù)庫(kù),最后再把數(shù)據(jù)庫(kù)讀出的放回 redis num = (Integer) redisTemplate.opsForHash().get(RedisKey, userUuid); //防止一開始新增通知的時(shí)候沒放進(jìn) redis 里,null 表示什么都沒有,而不是 0 if (Objects.nonNull(num)) { msgNumVO.setMsgNum(num); }else { num = this.getNoticeNum(userUuid, queryDate); log.info("緩存中沒有未讀消息總數(shù),查數(shù)據(jù)庫(kù):{}", num); msgNumVO.setMsgNum(num); //放入緩存,取出什么放什么 redisTemplate.opsForHash().put(RedisKey, userUuid, num); } return num; -
查詢消息列表
wrapper.eq(Notification::getTargetUserUuid, userUuid) .eq(Notification::getSuperType, superType) .eq(Notification::getMsgStatus, StatusEnum.TRUE.getType()) .le(Notification::getCreateTime, dateTime) .orderByDesc(Notification::getCreateTime); List<Notification> queryList = pageInfo.getResult(); //查詢后即要同步去更新數(shù)據(jù)庫(kù)中該類型下的消息為已讀 this.updateListBySuperType(wrapper); long isReadNum; isReadNum = queryList.stream().filter(val -> NumberUtils.INTEGER_ZERO.equals(val.getIsRead())).count(); //關(guān)鍵的一步,同步更新 redis 里的未讀消息數(shù) Integer redisNum = (Integer) redisTemplate.opsForHash().get(RedisKey.INITIAL_NOTICE_NUM_PERFIX, userUuid); //要先判斷 redis 里是否為 null,和 0 不一樣 int hv = Objects.isNull(redisNum) ? 0 : (int) (redisNum - isReadNum); redisTemplate.opsForHash().put(RedisKey, userUuid, Math.max(hv, 0)); return queryList;
6.3緩存定時(shí)清除
由于在上述的 redis-hash 結(jié)構(gòu)中并沒有加入 expire 過(guò)期時(shí)間,那么顯而易見的是這個(gè)結(jié)構(gòu)隨著時(shí)間增加會(huì)越來(lái)越大,最終導(dǎo)致形成一個(gè)大key,給 redis 的讀/寫性能帶來(lái)影響。
所以這里需要給出一個(gè)方案來(lái)解決這個(gè)問(wèn)題,我的核心思路是:
- 每當(dāng)寫redis計(jì)數(shù)的時(shí)候同時(shí)用另一個(gè) key 記操作時(shí)間,每10分鐘執(zhí)行一次定時(shí)任務(wù);
- 逐一將時(shí)間 key 的 value (即操作時(shí)間)根據(jù) uuid 拿出來(lái),如果當(dāng)前系統(tǒng)時(shí)間 - 該uuid的操作時(shí)間>3600ms(即一個(gè)小時(shí))那么就將該 uuid 的數(shù)據(jù)刪除;
- 下次調(diào)接口先讀數(shù)據(jù)庫(kù),再寫進(jìn) redis 里面,具體看代碼。
@Component
@Slf4j
public class HandleNoticeCache {
private static final Long FLAG_TIME = 3600L;
@Resource
private RedisTemplate redisTemplate;
@Scheduled(cron = " * 0/10 * * * ? ")
public void deleteNoticeCache(){
HashOperations<String, String, Integer> hashOperations = redisTemplate.opsForHash();
//通知操作的全部 uuid,數(shù)據(jù)量一大可能導(dǎo)致 OOM
Set<String> uuidList = hashOperations.keys(RedisKey.NOTICE_NUM_TIME);
if (CollectionUtils.isNotEmpty(uuidList)){
uuidList.forEach(val -> {
Integer operateTime = hashOperations.get(RedisKey.NOTICE_NUM_TIME, val);
if (Objects.nonNull(operateTime)){
//當(dāng)前系統(tǒng)時(shí)間-操作的記錄時(shí)間
long resultTime = System.currentTimeMillis() - operateTime;
if (resultTime > FLAG_TIME){
hashOperations.delete(RedisKey.NOTICE_NUM_PERFIX, val);
log.info("刪除通知的 uuid 為:{}", val);
hashOperations.delete(RedisKey.COMMENT_NUM_PERFIX, val);
log.info("刪除評(píng)論通知的 uuid 為:{}", val);
}
}
});
}
}
}
本篇小結(jié)
到這里關(guān)于互聯(lián)網(wǎng)消息通知系統(tǒng)的設(shè)計(jì)與實(shí)現(xiàn)就分享完了,至于源碼我看在周末或者假期有沒有時(shí)間發(fā)出來(lái),之后自己的個(gè)人 git 開源倉(cāng)庫(kù)應(yīng)該已經(jīng)建設(shè)好了。
文章如有錯(cuò)誤和不足,還望指正,同時(shí)也歡迎大家在評(píng)論區(qū)說(shuō)出自己的想法!

書接上回,消息通知系統(tǒng)(notification-system)作為一個(gè)獨(dú)立的微服務(wù),完整地負(fù)責(zé)了 App 端內(nèi)所有消息通知相關(guān)的后端功能實(shí)現(xiàn)。該系統(tǒng)既需要與文章系統(tǒng)、訂單系統(tǒng)、會(huì)員系統(tǒng)等相關(guān)聯(lián),也需要和其它業(yè)務(wù)系統(tǒng)相關(guān)聯(lián),是一個(gè)偏底層的通用服務(wù)系統(tǒng)。
浙公網(wǎng)安備 33010602011771號(hào)