<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      kafka消息發送以及接收

            需求說明:消息發送到kafka,對消息進行處理。使用springboot-kafka自帶的組件,使用kafkaTemple進行發送和消費。

          

      package com.gwm.marketing.kafka.product;
      
      import com.alibaba.fastjson.JSONObject;
      import com.google.gson.Gson;
      import com.gwm.marketing.dto.user.UserSyncIdentificationDto;
      import org.apache.kafka.common.protocol.Message;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.kafka.core.KafkaTemplate;
      import org.springframework.stereotype.Component;
      
      import javax.annotation.Resource;
      
      /**
       * @author fanht
       * @descrpiton
       * @date 2022/10/21 15:23:02
       * @versio 1.0
       */
      @Component
      public class KafkaProductDemo {
      
      
          private Logger logger = LoggerFactory.getLogger(this.getClass());
          @Value("${spring.kafka.consumer.syncUserIdentify}")
          private String kafkaTopic;
      
          @Resource
          private KafkaTemplate kafkaTemplate;
      
          public void sendKafkaMessage(){
              Gson gson = new Gson();
              UserSyncIdentificationDto dto = UserSyncIdentificationDto.builder().name("官方")
                      .beanId("3032395126028730368").identifyId("3032395126028730368").typeCode("OFFICIAL").updateTime(System.currentTimeMillis()).build();
              UserSyncIdentificationDto dto1 = UserSyncIdentificationDto.builder().identifyId("3032395126028730368").updateTime(System.currentTimeMillis())
                              .beanId("3032395126028730368").typeCode("OFFICIAL").build();
              System.out.println("入參:" + gson.toJson(dto));
              logger.info("請求入參:" + gson.toJson(dto));
              kafkaTemplate.send(kafkaTopic,null, "1",gson.toJson(dto));
              kafkaTemplate.send(kafkaTopic,null, "2", gson.toJson(dto));
          }
      
      }
      package com.gwm.marketing.kafka.consumer;
      
      import com.alibaba.fastjson.JSON;
      import com.alibaba.fastjson.JSONObject;
      import com.google.gson.Gson;
      import com.google.gson.GsonBuilder;
      import com.gwm.marketing.common.constants.UserConstants;
      import com.gwm.marketing.common.dto.user.CancleUser;
      import com.gwm.marketing.common.enums.TypeCodeEnum;
      import com.gwm.marketing.constants.CommonConstants;
      import com.gwm.marketing.dao.user.UserSyncIdentificationDao;
      import com.gwm.marketing.dto.user.UserSyncIdentificationDto;
      import com.gwm.marketing.entity.user.UserSyncIdentification;
      import com.gwm.marketing.feign.community.FeignCommunityClient;
      import com.gwm.marketing.service.user.UserDetailService;
      import org.apache.commons.lang3.StringUtils;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.springframework.kafka.annotation.KafkaListener;
      import org.springframework.stereotype.Component;
      
      import javax.annotation.Resource;
      import java.util.Date;
      import java.util.Optional;
      
      /**
       * @author fanht
       * @descrpiton 消費仙豆kafka消息
       * @date 2022/10/21 14:41:59
       * @versio 1.0
       */
      @Component
      public class UserSyncIdentificationConsumer {
      
          private Logger logger = LoggerFactory.getLogger(this.getClass());
      
          @Resource
          private UserDetailService userDetailService;
      
          @Resource
          private UserSyncIdentificationDao userSyncIdentificationDao;
      
          @Resource
          FeignCommunityClient feignCommunityClient;
      
      
      
          Gson gson = new GsonBuilder().create();
      
          @KafkaListener(topics = "${spring.kafka.consumer.syncUserIdentify}", topicPartitions = {}, groupId = "test")
          public void onMessage(ConsumerRecord<?, ?> record) {
              String key = (String) record.key();
              Optional<?> kafkaMessage = Optional.ofNullable(record.value());
              if (kafkaMessage.isPresent() && StringUtils.isNotEmpty(key)) {
                  try {
                      logger.info("===仙豆入參==" +JSONObject.toJSONString(kafkaMessage));
                      Object message = kafkaMessage.get();
      
                      switch (key) {
                          case CommonConstants.SAVE:
                              //導入、添加、or修改
                              updateOrCreateIdentify(message);
                              break;
                          case CommonConstants.CANCEL:
                              //撤銷認證
                              cancleUserIdentification(message);
                              break;
                          default:
                              break;
                      }
      
                  } catch (Exception e) {
                      logger.error("=====創建異常======", e);
                  }
              }
          }
      
      
          private void updateOrCreateIdentify(Object message) {
      
              logger.debug("====仙豆入參====" + JSONObject.toJSONString(message));
              //UserSyncIdentificationDto dto = gson.fromJson(message.toString(), UserSyncIdentificationDto.class);
              UserSyncIdentificationDto dto = JSONObject.toJavaObject(JSON.parseObject(message.toString()), UserSyncIdentificationDto.class);
              logger.info("======javabean轉化對象成功======" + JSONObject.toJSONString(dto));
              String userId = userDetailService.getUserIdByBeanId(dto.getBeanId(), CommonConstants.ORA);
              UserSyncIdentification usi = UserSyncIdentification.builder().userId(userId).typeCode(dto.getTypeCode())
                      .createTime(new Date()).beanId(dto.getBeanId()).extra1(dto.getIdentifyId())
                      .name(dto.getName()).sourceApp(CommonConstants.ORA).extra2(dto.getUpdateTime()==null?null:dto.getUpdateTime().toString()).build();
              if (dto.getId() != null && dto.getId() > 0) {
                  userSyncIdentificationDao.updateByPrimaryKeySelective(usi);
              } else {
                  //todo 此處可能會出現多次插入,是否需要做限制
                 int count = userSyncIdentificationDao.countByBeanIdAndExtra1(usi.getBeanId(),usi.getExtra1());
                 if(count <= 0){
                     userSyncIdentificationDao.insertSelective(usi);
                     try {
                         logger.info("==========更新同步es===start" + JSONObject.toJSONString(dto));
                         this.syncEs(dto);
                         logger.info("==========更新同步es===end");
                     } catch (Exception e) {
                         logger.error("同步es異常",e);
                     }
                 }else {
                     logger.info("========數據重復==,入參:" +JSONObject.toJSONString(usi));
                 }
              }
          }
      
          private void cancleUserIdentification(Object message) {
              logger.debug("====仙豆取消入參====" + JSONObject.toJSONString(message));
              //UserSyncIdentificationDto dto = gson.fromJson(message.toString(), UserSyncIdentificationDto.class);
              UserSyncIdentificationDto dto = JSONObject.toJavaObject(JSON.parseObject(message.toString()), UserSyncIdentificationDto.class);
              logger.info("======javabean轉化對象成功======" + JSONObject.toJSONString(dto));
              int cancleResult = userSyncIdentificationDao.cancleIdentifies(dto.getUpdateTime().toString(),dto.getBeanId(), dto.getIdentifyId());
              if(cancleResult <= 0){
                  logger.info("====用戶撤銷認證同步失敗======" +JSONObject.toJSONString(dto));
              }else {
                  //操作es,若當前撤銷認證的用戶是歐拉用戶,且身份是官方且已佩戴,則更改es中的帖子對應的官方身份狀態(佩戴或者不佩戴都更改es)
                  try {
                      logger.info("==========取消同步es===start" + JSONObject.toJSONString(dto));
                      this.syncEs(dto);
                      logger.info("==========取消同步es===end");
                  } catch (Exception e) {
                      logger.error("更新es官方數據失敗",  e);
                  }
              }
          }
      
          public void syncEs(UserSyncIdentificationDto dto){
              if(TypeCodeEnum.OFFICIAL.getCode().equals(dto.getTypeCode())){
                  String userId = userDetailService.getUserIdByBeanId(dto.getBeanId(), CommonConstants.ORA);
                  CancleUser cancleUser = new CancleUser();
                  cancleUser.setIsOfficial(UserConstants.IS_OFFICIAL_NO);
                  cancleUser.setCreateBy(userId);
                  cancleUser.setSourceApp(CommonConstants.ORA);
                  feignCommunityClient.updateEsUserStatus(cancleUser);
                  logger.debug("========更改es成功======");
              }
          }
      }

       

      posted @ 2022-11-10 13:58  Doyourself!  閱讀(2051)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 少妇人妻偷人精品系列| 免费观看全黄做爰大片| 亚洲精品综合网中文字幕| 中文字幕人成无码免费视频 | 久久日产一线二线三线| 成人免费区一区二区三区| 国内综合精品午夜久久资源| 亚洲高潮喷水无码AV电影| 一区二区三区不卡国产| 国产精品亚洲а∨天堂2021| 免费费很色大片欧一二区| 久久久亚洲欧洲日产国码αv| 国产精品一品二区三四区| 91福利视频一区二区| 亚洲色大成网站www永久男同| 国产精品午夜福利精品| 欧美人成精品网站播放| 国产精品免费中文字幕| 亚洲最大日韩精品一区| 亚洲中文字幕日产无码成人片 | 久久久久香蕉国产线看观看伊| 久久久亚洲精品无码| 国产精品小粉嫩在线观看| 精品免费看国产一区二区| 国产v综合v亚洲欧美久久| 国产寡妇偷人在线观看| 91中文字幕一区在线| 贞丰县| 久久亚洲中文无码咪咪爱| 中文字幕日韩精品有码| 日韩人妻熟女中文字幕a美景之屋| 国产精品无码一区二区在线观一| 五月丁香综合缴情六月小说| 久久99精品国产麻豆婷婷| 国产午夜亚洲精品不卡网站| 在线播放国产精品三级网| 欧美乱大交aaaa片if| 国产老熟女视频一区二区| 亚洲精品一区二区动漫| 久久精品国产福利一区二区 | 国产午夜福利精品视频|