<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      nodejs中的kafkajs,消費順序,不重復消費

      參考:https://kafka.js.org/docs
      確保同一個消息發送到同一個partition,一個topic,一個partition,一個consumer,內部單線程消費
      1.封裝kafkaUtil類



      const { Kafka, logLevel } = require('kafkajs') //const cache = require('../conn/redis.js'); const kafka = new Kafka({ clientId: 'my-app', brokers: [ "lcoalhost:8092", "localhost:8093", "localhost:8094", "lcoalhost:8095", "localhost:8096", ], retry: { retries: 8 }, logLevel: logLevel.ERROR }) /** * 如果groupId已存在重復的,建立不同的kafka實例會報錯 */ /** * kafka生產者發送消息 * messages: [{ value: 'Hello KafkaJS user!', }, { value: 'Hello KafkaJS user2!', }], */ exports.producer = async (topic, groupId, msg) => { try { const producer = kafka.producer({ groupId: groupId }) await producer.connect() await producer.send({ topic: topic, messages: msg, acks: 1 }) } catch (error) { throw error; } } exports.consumer = async (topic, groupId, callback) => { try { const consumer = kafka.consumer({ groupId: groupId }) await consumer.connect() await consumer.subscribe({ topic: topic }) await consumer.run({ autoCommit: true, eachMessage: async ({ topic, partition, message }) => {
      //防止重復消費數據 await consumer.commitOffsets([{ topic: topic, partition: partition, offset: Number(message.offset) + 1 }]) let msg = message.value.toString() console.log(72, '消費者接收到的數據為:', msg); callback(msg); } }) } catch (err) { throw err; } }

      2.producer.js

         

      const kafka = require('./kafkaUtil');
      (async function () {
          const topic = 'MY——TOPIC1'
          const groupId = 'MY——TOPIC1'
          try {
              for (let i = 0; i < 10000; i++) {
                  await new Promise((resolve, reject) => {
                      setTimeout(async () => {
                          resolve(1)
                      }, 1000)
                  }).then(async () => {
                      console.log('發送的數據為:', i)
                      await kafka.producer(topic, groupId, [{
                          key: "a",//key值為了保證消費者按照生產者生產的數據順序,消費數據,key值必須一致;如果不需要消費者按照生產的順序消費,key去掉即可,參考: https://www.zhihu.com/question/266390197
                          value: `${i}`
                      }])
                  })
              }
          } catch (error) {
              console.log(14, error)
              throw error;
          }
      
      })()
      

      3.consumer.js

       

      const kafka = require('./kafkaUtil');
      (async function () {
          const fs = require('fs');
          let count = 1;
          const topic = 'MY——TOPIC1'
          const groupId = 'MY——TOPIC1'
          try {
              await kafka.consumer(topic, groupId, async (msg) => {
                  let str = `第${count}接收到的數據為:${msg}`;
                  count++;
                  fs.writeFileSync(`${process.cwd()}/test01.txt`, str, {
                      flag: 'a',
                  })
                  console.log(str)
              })
          } catch (error) {
              console.log(14, error)
              throw error;
          }
      })()

      經實際測試,沒有發現消費問題。如有發現問題,請多多指教,謝謝。。?! ?/p>

       
      posted @ 2020-05-16 00:24  江山一族  閱讀(3894)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 国产精品自在线拍国产手青青机版| 日韩V欧美V中文在线| 黄又色又污又爽又高潮| 精品嫩模福利一区二区蜜臀| 国产精品久久久久无码网站| 久久精品第九区免费观看| 国产suv精品一区二区五| 在线播放亚洲人成电影| 国产对白老熟女正在播放| 亚洲av肉欲一区二区| 国产av最新一区二区| 国产亚洲欧洲AⅤ综合一区| 精品久久丝袜熟女一二三| 日本熟妇人妻xxxxx人hd| 国内精品亚洲成av人片| 免费观看欧美猛交视频黑人| 我和亲妺妺乱的性视频| 久久久久蜜桃精品成人片公司| 少妇人妻偷人免费观看| 好硬好湿好爽好深视频| 中文字幕一区二区三区麻豆| 国产中文字幕精品免费| 亚洲日韩成人av无码网站| 国产偷国产偷亚洲高清人| 国产精品普通话国语对白露脸| 人妻夜夜爽天天爽一区 | 精品人人妻人人澡人人爽人人| 美乳丰满人妻无码视频| 涡阳县| 98久久人妻少妇激情啪啪| 久久先锋男人AV资源网站| 乌克兰丰满女人a级毛片右手影院| 国产精品无卡毛片视频| 欧洲免费一区二区三区视频| 亚洲天堂精品一区二区| 九九热精品在线观看| 中文字幕精品人妻av在线| julia无码中文字幕一区| 久久亚洲国产成人精品性色| 国产在线98福利播放视频| 精品久久精品午夜精品久久|