<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      RocketMQ消息過濾機制源碼詳解

      #RocketMQ提供了2種消息過濾的方式:

      • TAG 過濾

      • SQL92 過濾

      SQL過濾默認是沒有打開的,如果想要支持,必須在broker的配置文件中設置:enablePropertyFilter = true

      一. 示例代碼

      1.1 producer 代碼

      public class Producer {
      
          public static void main(String[] args) throws Exception {
      
              // 實例化消息生產者Producer
              DefaultMQProducer producer = new DefaultMQProducer("tag_p_g");
              // 設置NameServer的地址
              producer.setNamesrvAddr("127.0.0.1:9876");
      
              producer.start();
      
              String[] tags = {"TAG_A", "TAG_B", "TAG_C"};
      
              for (int i = 0; i < 10 ; i++) {
      
                  byte[] body = ("Hi filter message," + i).getBytes();
                  String tag = tags[i % tags.length];
      
                  //同一個topic下,會發(fā)送多種tag消息
                  Message msg = new Message("MY_topic", tag, body);
                  
                  //設置一些屬性,消費者SQL過濾時可以使用
                  msg.putUserProperty("age", String.valueOf(i));
                  msg.putUserProperty("name", "name" + (i + 1));
                  msg.putUserProperty("isGender", String.valueOf(new Random().nextBoolean()));
      
                  SendResult sendResult = producer.send(msg);
      
                  System.out.println("sendResult = " + sendResult);
              }
      
      
              producer.shutdown();
          }
      }
      

      1.2 consumer 代碼

      1.2.1 TAG過濾
      public class Consumer {
      
          public static void main(String[] args) throws Exception {
      
              DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("c_tag_group");
      
              consumer.setNamesrvAddr("127.0.0.1:9876");
      
              /**
               * 訂閱消息過濾
               * 只訂閱 topic = MY_topic 下
               * tag = TAG_A 或者 tag = TAG_C 的消息,不要 tag = TAG_B 的消息
               * 訂閱多個tag使用 || 分開
               */
              consumer.subscribe("MY_topic", "TAG_A || TAG_C");
      
              consumer.registerMessageListener(new MessageListenerConcurrently() {
                  @Override
                  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
      
                      for (MessageExt msg : msgs) {
                          System.out.println(msg);
                      }
      
                      //消費成功時返回
                      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                  }
              });
      
              consumer.start();
      
              System.out.println("Filter Tag Consumer Started");
          }
      }
      
      1.2.2 SQL92過濾
      public class Consumer {
      
          public static void main(String[] args) throws Exception {
      
              DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
      
              consumer.setNamesrvAddr(MQConstant.NAME_SERVER_ADDR);
      
              /**
               * 訂閱消息過濾: 根據消息生產者指定的用戶屬性進行過濾
               * 支持的常量類型:
               *   數值:比如:123,3.1415
               *   字符:必須用單引號包裹起來,比如:'abc'
               *   布爾:TRUE 或 FALSE
               *   NULL:特殊的常量,表示空
               *
               * 支持的運算符有:
               *   數值比較:>,>=,<,<=,BETWEEN,=
               *   字符比較:=,<>,IN
               *   邏輯運算 :AND,OR,NOT
               *   NULL判斷:IS NULL 或者 IS NOT NULL
               *
               *   // (age between 6 and 9) AND (name IS NOT NULL) AND (isGender = TRUE)
               */
              consumer.subscribe(MQConstant.FILTER_SQL_TOPIC, MessageSelector.bySql("(age between 6 and 9) AND (name IS NOT NULL) AND (isGender = TRUE)"));
      
              consumer.registerMessageListener(new MessageListenerConcurrently() {
                  @Override
                  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
      
                      for (MessageExt msg : msgs) {
                          System.out.println(msg);
                      }
      
                      //消費成功時返回
                      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                  }
              });
      
              consumer.start();
      
              System.out.println("Filter SQL Consumer Started");
          }
      }
      

      二. 說明

      消費者去broker拉取消息時,先經過broker過濾一次,在經過消費者過濾一次

      1. 如果是 TAG 過濾。broker要先根據ConsumeQueue 中 Tag HashCode過濾一次,消費者在根據 Tag 值過濾一次。因為 ConsumeQueue 為了便于檢索,文件中每一個條目都是定長20字節(jié),所以條目在最后八個字節(jié)存儲的是消息 Tag 的 HashCode,而不是TAG值。這樣broker在拉取磁盤中的消息時,只需要對比 ConsumeQueue中 的Tag HashCode,而不需要解析 CommitLog 中的 Tag 值,如果發(fā)生Hash沖突,則交給消費者客戶端過濾消息中的Tag值。
      2. 如果是 SQL92 過濾。則全部由 broker 過濾。因為 SQL 過濾的是消息中的屬性值,所以必須反序列化 CommitLog 中的屬性值,既然在broker已經進行了精確匹配,那么客戶端自然可以省去這個步驟了。

      三. 消費者啟動注冊訂閱信息到broker

      consumer訂閱信息會保存到SubscriptionData中,當consumer啟動后,會通過心跳先將訂閱信息發(fā)送到broker。broker主要是構建2部分:

      1. 保存consumer發(fā)送的訂閱信息SubscriptionData對象。
      2. 構建SQL過濾的ConsumerFilterData對象。

      那么我們看下consumer構建訂閱數據以及發(fā)送到broker的過程:

      // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#subscribe(java.lang.String, org.apache.rocketmq.client.consumer.MessageSelector)
      public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
          try {
              if (messageSelector == null) {
                  subscribe(topic, SubscriptionData.SUB_ALL);
                  return;
              }
              
              //核心就是創(chuàng)建SubscriptionData
              SubscriptionData subscriptionData = FilterAPI.build(topic,
                  messageSelector.getExpression(), messageSelector.getExpressionType());
      
              this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
              if (this.mQClientFactory != null) {
                  this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
              }
          } catch (Exception e) {
              throw new MQClientException("subscription exception", e);
          }
      }
      

      繼續(xù)看FilterAPI.build(...)方法:

      // org.apache.rocketmq.common.filter.FilterAPI#build
      public static SubscriptionData build(final String topic, final String subString,
              final String type) throws Exception {
              // 如果是TAG過濾,則執(zhí)行這里
              if (ExpressionType.TAG.equals(type) || type == null) {
                  return buildSubscriptionData(topic, subString);
              }
      
              if (subString == null || subString.length() < 1) {
                  throw new IllegalArgumentException("Expression can't be null! " + type);
              }
      
              // 如果是SQL過濾,則執(zhí)行這里,相對簡單,直接原樣發(fā)送給broker
              SubscriptionData subscriptionData = new SubscriptionData();
              subscriptionData.setTopic(topic);
              subscriptionData.setSubString(subString);
              subscriptionData.setExpressionType(type);
      
              return subscriptionData;
          }
      }
      

      如果是TAG過濾,consumer會做些額外的處理:

      // org.apache.rocketmq.common.filter.FilterAPI#buildSubscriptionData 
      public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
              String subString) throws Exception {
              SubscriptionData subscriptionData = new SubscriptionData();
              subscriptionData.setTopic(topic);
              subscriptionData.setSubString(subString);
      
              if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
                  // 訂閱所有消息
                  subscriptionData.setSubString(SubscriptionData.SUB_ALL);
              } else {
                  // 如果訂閱的不是*,則通過 || 分割
                  String[] tags = subString.split("\\|\\|");
                  if (tags.length > 0) {
                      for (String tag : tags) {
                          if (tag.length() > 0) {
                              String trimString = tag.trim();
                              if (trimString.length() > 0) {
                                  // 保存分割后的TAG值
                                  subscriptionData.getTagsSet().add(trimString);
                                  // 保存分割后的TAG HashCode
                                  subscriptionData.getCodeSet().add(trimString.hashCode());
                              }
                          }
                      }
                  } else {
                      throw new Exception("subString split error");
                  }
              }
      
              return subscriptionData;
          }
      

      這樣consumer的訂閱信息就準備好了,然后consumer啟動,發(fā)送心跳數據:

      //org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
      
      public synchronized void start() throws MQClientException {
          //......代碼省略.......
          
          // 發(fā)送心跳
          this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
          
          //......代碼省略.......
      }
      

      我們再看下broker是如何處理心跳數據的:

      public class ClientManageProcessor implements NettyRequestProcessor {
      
          @Override
          public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
              throws RemotingCommandException {
              switch (request.getCode()) {
      			// 接收客戶端心跳指令,保存客戶端信息
                  case RequestCode.HEART_BEAT:
                      return this.heartBeat(ctx, request);
                  case RequestCode.UNREGISTER_CLIENT:
                      return this.unregisterClient(ctx, request);
                  case RequestCode.CHECK_CLIENT_CONFIG:
                      return this.checkClientConfig(ctx, request);
                  default:
                      break;
              }
              return null;
          }
      }
      

      heartBeat方法:

      // org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat 
      public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
      
              // 處理消費者心跳
              for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
                  SubscriptionGroupConfig subscriptionGroupConfig =
                      this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                          data.getGroupName());
                  //...
      
                  // 注冊消費者信息
                  boolean changed = this.brokerController.getConsumerManager().registerConsumer(
                      data.getGroupName(),
                      clientChannelInfo,
                      data.getConsumeType(),
                      data.getMessageModel(),
                      data.getConsumeFromWhere(),
                      data.getSubscriptionDataSet(),
                      isNotifyConsumerIdsChangedEnable
                  );
      
                  // ...
              }
      
           	// ...
             
              return response;
          }
      

      繼續(xù)往下走:

      // org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer
      public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
          ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
          final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
      
          //...
          
          // 更新topic下消費組信息
          boolean r2 = consumerGroupInfo.updateSubscription(subList);
      
          //...
          
          
          this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
      
          //...
      }
      

      繼續(xù)往里走:

      // org.apache.rocketmq.broker.filter.ConsumerFilterManager#register(java.lang.String, java.lang.String, java.lang.String, java.lang.String, long)
      public boolean register(final String topic, final String consumerGroup, final String expression,
          final String type, final long clientVersion) {
          // 如果是TAG 過濾,則退出
          if (ExpressionType.isTagType(type)) {
              return false;
          }
      
          // 如果是SQL過濾,但沒有指定過濾規(guī)則,則退出
          if (expression == null || expression.length() == 0) {
              return false;
          }
      
          FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);
      
          if (filterDataMapByTopic == null) {
              FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);
              FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);
              filterDataMapByTopic = prev != null ? prev : temp;
          }
      
          BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);
      
          // 構建SQL過濾的ConsumerFilterData
          return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);
      }
      
      

      注冊方法內部主要就是構建ConsumerFilterData對象:

      // org.apache.rocketmq.broker.filter.ConsumerFilterManager#build
      public static ConsumerFilterData build(final String topic, final String consumerGroup,
          final String expression, final String type,
          final long clientVersion) {
          if (ExpressionType.isTagType(type)) {
              return null;
          }
      
          ConsumerFilterData consumerFilterData = new ConsumerFilterData();
          consumerFilterData.setTopic(topic);
          consumerFilterData.setConsumerGroup(consumerGroup);
          consumerFilterData.setBornTime(System.currentTimeMillis());
          consumerFilterData.setDeadTime(0);
          consumerFilterData.setExpression(expression);
          consumerFilterData.setExpressionType(type);
          consumerFilterData.setClientVersion(clientVersion);
          try {
              consumerFilterData.setCompiledExpression(
                  FilterFactory.INSTANCE.get(type).compile(expression)
              );
          } catch (Throwable e) {
              log.error("parse error: expr={}, topic={}, group={}, error={}", expression, topic, consumerGroup, e.getMessage());
              return null;
          }
      
          return consumerFilterData;
      }
      

      最終工作的就是:

      public class SqlFilter implements FilterSpi {
      
          @Override
          public Expression compile(final String expr) throws MQFilterException {
              return SelectorParser.parse(expr);
          }
      
          @Override
          public String ofType() {
              return ExpressionType.SQL92;
          }
      }
      

      好了,到這里就鋪墊好了,接下來我們繼續(xù)看消息過濾的過程,這個過程中,上面的2個對象將會工作。

      四. 拉取消息

      broker處理拉取請求的處理器:PullMessageProcessor 方法內容比較多,還是關注和過濾相關的部分

      // org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
      private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
          throws RemotingCommandException {
          RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
          final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
          final PullMessageRequestHeader requestHeader =
              (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
      
         
          // .......省略諸多代碼........
      
          SubscriptionData subscriptionData = null;
          ConsumerFilterData consumerFilterData = null;
          // 這里是false, consumer啟動時已經將訂閱信息發(fā)送到了broker,拿來即用即可
          if (hasSubscriptionFlag) {
              try {
                  subscriptionData = FilterAPI.build(
                      requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
                  );
                  if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                      consumerFilterData = ConsumerFilterManager.build(
                          requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
                          requestHeader.getExpressionType(), requestHeader.getSubVersion()
                      );
                      assert consumerFilterData != null;
                  }
              } catch (Exception e) {
                  log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
                      requestHeader.getConsumerGroup());
                  response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
                  response.setRemark("parse the consumer's subscription failed");
                  return response;
              }
          } else {
              ConsumerGroupInfo consumerGroupInfo =
                  this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
              
              // ....省略判斷.......
      
              // 獲取訂閱數據,這個就是consumer啟動時發(fā)送給broker的
              subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
               
              // .....省略判斷.......
               
              // SQL過濾 
              if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                  //TODO:前面分析consumer心跳時看到了它,SQL過濾時會創(chuàng)建
                  consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
                      requestHeader.getConsumerGroup());
                  
                  // ....省略判斷......
              }
          }
      
          // .....省略判斷.......
      
          MessageFilter messageFilter;
          if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
              messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
                  this.brokerController.getConsumerFilterManager());
          } else {
              // 創(chuàng)建MessageFilter
              messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
                  this.brokerController.getConsumerFilterManager());
          }
      
      
          // 從broker 拉取消息
          final GetMessageResult getMessageResult =
              this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                  requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
                  
                  
         //....省略大量代碼.....和過濾無關        
      }      
      

      接下來我們就看下從 CommitLog 讀取消息并過濾的過程:

      // org.apache.rocketmq.store.DefaultMessageStore#getMessage
      public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
          final int maxMsgNums,
          final MessageFilter messageFilter) {
          
             // .....省略大篇幅代碼.......
          
                  // 在去commitlog讀取消息之前,對ConsumeQueue條目進行 tag hashcode 過濾
                  if (messageFilter != null
                      && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                      if (getResult.getBufferTotalSize() == 0) {
                          status = GetMessageStatus.NO_MATCHED_MESSAGE;
                      }
      
                      continue;
                  }
      
                  // 從CommitLog 讀取消息
                  SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                  if (null == selectResult) {
                      if (getResult.getBufferTotalSize() == 0) {
                          status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                      }
      
                      nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                      continue;
                  }
      
                  // 在從commitlog讀取消息之后,進行 SQL 過濾
                  if (messageFilter != null
                      && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
                      if (getResult.getBufferTotalSize() == 0) {
                          status = GetMessageStatus.NO_MATCHED_MESSAGE;
                      }
                      // release...
                      selectResult.release();
                      continue;
                  }
      
                              
      }
      

      主要就是做3件事:

      1. 在去 CommitLog 讀取消息之前,先根據 TAG hashcode 過濾一次 ConsumeQueue 中的條目,如果ConsumeQueue中保存Tag HashCode與消費組需要消費Tag HashCode不一致,則不會讀取CommitLog中的消息了。

      broker先完成tag hashcode 過濾,consumer進一步完成tag 值過濾。

      1. 去 CommitLog 讀取消息
      2. 從 CommitLog 讀取出消息之后,如果是SQL過濾,則在broker完成過濾。

      4.1 Broker完成 TAG HashCode 過濾

      TAG 過濾就是ExpressionMessageFilter#isMatchedByConsumeQueue(..)方法:

      @Override
      public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
          if (null == subscriptionData) {
              return true;
          }
      
          if (subscriptionData.isClassFilterMode()) {
              return true;
          }
      
          // by tags code.
          if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
      
              if (tagsCode == null) {
                  return true;
              }
      
              if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
                  return true;
              }
      
              // 根據tag hashcode 過濾
              return subscriptionData.getCodeSet().contains(tagsCode.intValue());
          } else {
          
             // ....省略else.....
          }
      
          return true;
      }
      

      這個方法內部會完成TAG 的hashcode 過濾,不過這里只是TAG的初步過濾,因為兩個不同TAG也可能有相同的hashcode,所以這里過濾并不完善,真正的TAG過濾是交給消費者來完成的。

      4.2 Broker完成 SQL 過濾

      SQL的過濾是在ExpressionMessageFilter#isMatchedByCommitLog(..)方法中:

      @Override
      public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
          if (subscriptionData == null) {
              return true;
          }
      
          if (subscriptionData.isClassFilterMode()) {
              return true;
          }
      
          // 如果是TAG過濾,則直接退出
          if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
              return true;
          }
      
          // SQL過濾的數據(sql表達式等等)
          ConsumerFilterData realFilterData = this.consumerFilterData;
          Map<String, String> tempProperties = properties;
      
          // .....校驗code.....
      
          Object ret = null;
          try {
              MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
      
              ret = realFilterData.getCompiledExpression().evaluate(context);
          } catch (Throwable e) {
              log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
          }
      
          log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);
      
          if (ret == null || !(ret instanceof Boolean)) {
              return false;
          }
      
          return (Boolean) ret;
      }
      

      這里會根據SQL進行過濾,如果該條消息是消費者想要的,則將其放入容器中,返回給消費者,如果不是消費者想要的,則直接丟棄,繼續(xù)查詢下一條消息。

      這里的丟棄只是不返回給消費者,在清除 CommitLog 文件之前,這條消息都是在的。

      五. 消費消息

      前面說了,如果是TAG 過濾,則Broker會率先完成一次TAG Hashcode過濾,但是這樣過濾并不完全,因為不同TAG可能有相同Hashcode,所以消費者要根據TAG 值完成最后的過濾。

      如果是SQL過濾,只能由Broker完成,消費者不做其他任何操作。

      那么我們還是看消費者消費消息時的過濾邏輯:

      // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
      public void pullMessage(final PullRequest pullRequest) {
         
          //......
      
          PullCallback pullCallback = new PullCallback() {
              @Override
              public void onSuccess(PullResult pullResult) {
                  if (pullResult != null) {
                      // 處理拉取結果,這里將會完成TAG的值過濾
                      pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                          subscriptionData);
                  }
                  
              //.......
          }
          
          //.......
      }
      

      那么我們繼續(xù)看下它的內部實現:

      // org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#processPullResult
      public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
          final SubscriptionData subscriptionData) {
          PullResultExt pullResultExt = (PullResultExt) pullResult;
      
          this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
          if (PullStatus.FOUND == pullResult.getPullStatus()) {
              ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
              List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
      
              List<MessageExt> msgListFilterAgain = msgList;
              // 根據TAG 值過濾
              if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
                  msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
                  for (MessageExt msg : msgList) {
                      if (msg.getTags() != null) {
                          if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                              msgListFilterAgain.add(msg);
                          }
                      }
                  }
              }
              
              // 將過濾后的消息給消費者消費
              pullResultExt.setMsgFoundList(msgListFilterAgain);
      
              //........
          }
      
          return pullResult;
      }
      

      六. 總結

      1. RocketMQ支持兩種方式的消息過濾:TAG/SQL
      2. 要想使用SQL過濾,必須要在broker中配置:enablePropertyFilter = true
      3. TAG 過濾分兩個階段完成:
      • 第一階段:broker率先根據tag的hashcode完成過濾
      • 第二階段:consumer根據tag值完成最后的過濾
      1. SQL過濾只能在Broker中完成
      posted @ 2023-12-02 09:55  聽到微笑  閱讀(280)  評論(0)    收藏  舉報  來源
      主站蜘蛛池模板: 日韩国产精品一区二区av| 久久久久久久久久久国产| 亚洲乱妇老熟女爽到高潮的片| 少妇午夜福利一区二区三区| 久久99精品久久久久久青青| 久久香蕉欧美精品| 亚洲综合久久国产一区二区| 风韵丰满妇啪啪区老老熟女杏吧| 欧美一区二区三区欧美日韩亚洲| av日韩在线一区二区三区| 加勒比无码专区中文字幕| 亚洲欧洲一区二区精品| 亚洲aⅴ无码专区在线观看q| 在线播放国产女同闺蜜| 55大东北熟女啪啪嗷嗷叫| 国产欧美日韩精品丝袜高跟鞋| 免费国产午夜理论片不卡| 枣阳市| 亚洲av成人网在线观看| 精品国产亚洲午夜精品a| 妺妺窝人体色www婷婷| 日本九州不卡久久精品一区| 大陆精大陆国产国语精品| 西安市| 亚洲最大福利视频网| 免费看欧美全黄成人片| 亚洲AV成人无码久久精品四虎| 国产精品国产亚洲看不卡| 免费无码黄十八禁网站| 亚洲精品成人区在线观看| 日韩av中文字幕有码| 午夜片神马影院福利| 国产精品午夜福利片国产| 亚洲另类丝袜综合网| 欧美丰满熟妇hdxx| 亚洲国产精品va在线观看麻豆 | 久久国产精品精品国产色婷婷| 日韩精品有码中文字幕| 精品久久久久久无码中文字幕| 亚洲V天堂V手机在线| 国产精品 无码专区|