<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Python向Kafka發消息

      后端研發可以提供一個向kafka發消息的接口,用requests向接口post消息就行:

      import requests
      import json
      import time
      
      now = int(time.time())
      n = 10
      while n > 0:
          tt = now - n * 60
          data = {
              "queue": "alarm-dog-alarm-dog-test",
              "payload": "{\"test\":80,\"notice_time\":%d}" % tt
          }
          header = {"Content-Type": "application/json"}
          res = requests.post(url="http://10.90.100.130:8088/v1/kafka/send", headers=header, data=json.dumps(data))
          print(res.status_code)
          print(res.content)
          n -= 1

      如果沒有提供接口,可以借助python-kafka庫連接kafka,模擬生產者向kafka發消息:

      同步發送消息:

      from kafka import KafkaProducer
      import json
      
      # 創建一個KafkaProducer實例,指定Kafka服務器地址
      producer = KafkaProducer(bootstrap_servers='http://10.90.100.130:8088')
      # 要發送的消息內容
      message = {'test': 80, 'notice_time': 5}
      # 將消息轉換為JSON字符串格式(也可以是其他格式,如純文本)
      message_json = json.dumps(message)
      # 發送消息到指定的Kafka主題,這里主題名稱是'my_topic'
      producer.send('alarm-dog-alarm-dog-test', value=message_json.encode('utf - 8'))
      # 確保所有消息都已發送
      producer.flush()
      # 關閉生產者連接
      producer.close()

      異步發送消息

      from kafka import KafkaProducer
      import json
      import time
      
      # 創建一個KafkaProducer實例,設置異步發送和回調函數
      producer = KafkaProducer(bootstrap_servers='http://10.90.100.130:8088',
                acks='all',
                retries=3,
                value_deliver_callback=lambda m: print(f"消息已發送到主題{m.topic()},分區{m.partition()}"))
      
      # 要發送的消息內容
      message = {'test': 80, 'notice_time': 6}
      message_json = json.dumps(message)
      
      # 異步發送消息到'my_topic'主題
      future = producer.send('alarm-dog-alarm-dog-test', value=message_json.encode('utf - 8'))
      try:
      record_metadata = future.get(timeout=10)
      print(f"消息已發送到主題{record_metadata.topic()},分區{record_metadata.partition()},偏移量{record_metadata.offset()}")
      except Exception as e:
      print(f"發送消息時出錯: {e}")
      # 關閉生產者連接
      producer.close()

       

      posted @ 2024-11-03 17:17  海布里Simple  閱讀(379)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲春色在线视频| 县级市| 日韩精品一区二区三区激情视频| 伊人久久大香线蕉综合观| 久久精品国产99久久久古代| 免费无遮挡毛片中文字幕| 日本三级香港三级人妇99| 午夜高清国产拍精品福利| V一区无码内射国产| 浑源县| 人妻系列无码专区无码中出| 久9视频这里只有精品| 望奎县| 国产微拍一区二区三区四区| 国产乱人伦AV在线麻豆A| 维西| 午夜成人精品福利网站在线观看 | 国产成人亚洲综合图区| 国产精品区免费视频| 国产熟女激情一区二区三区| 午夜福利在线观看6080| 精品国产成人一区二区| 亚洲欧美成人综合久久久| 日本亚洲一区二区精品| 久久香蕉国产线看观看猫咪av| 精品人妻午夜福利一区二区| 国产欧美在线一区二区三| 奇米四色7777中文字幕| 97久久久精品综合88久久| 精品一区二区三区四区激情| 精品人妻人人做人人爽| 国产欧美另类久久久精品不卡| 最近中文字幕国产精品| av一本久道久久波多野结衣| 久久夜色精品国产亚洲a| 中文字幕亚洲人妻系列| 浮梁县| 亚洲av永久无码天堂影院| 东京热大乱系列无码| 国产欧美在线手机视频| 亚洲最大成人在线播放|