<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      單機配置kafka和zookeeper

      1:環境準備

      jdk 推薦oracle,不建議open sdk

      在/etc/profile加入下列環境變量

      在PATH中將jdk和jre的bin加入path里面

      $JAVA_HOME/bin:$JRE_HOME/bin

      2:安裝zookeeper

      下載zookeeper tar

      url: https://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz

      將壓縮包移動到/usr/local下面

      tar -zxvf ***

       

      更改配置文件

      (1)將conf/zoo_sample.cfg更改為zoo.cfg

      (2)更改配置如下

      注意:

        *其中默認port為2181。

        *datadir需手動創建 mkdir -p datadir

             *注釋掉的參數在單機中無用

      (3)加入環境變量 /etc/profile

       

      測試:

      可以cd 到bin目錄下通過執行 zkServer.sh shell腳本啟動、停止或者查看狀態

      eg:

      ./zkServer.sh start/stop/status

       

      3:安裝kafka

      先下載tar包、解壓、mv到/usr/local下面

      wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz
      sudo mv *** /usr/local
      cd /usr/local
      sudo tar -zxvf ****
      sudo rm **.tar.gz

       

      修改config目錄下配置文件

      vim server.properties

      修改如下參數

      #broker.id需改成正整數,單機為1就好
      broker.id=1
      #指定端口號
      port=9092
      #localhost這一項還有其他要修改,詳細見下面說明
      host.name=localhost
      #指定kafka的日志目錄
      log.dirs=/usr/local/kafka_2.11-0.11.0.0/kafka-logs
      #連接zookeeper配置項,這里指定的是單機,所以只需要配置localhost,若是實際生產環境,需要在這里添加其他ip地址和端口號
      zookeeper.connect=localhost:2181

       

      vim zookeeper.properties

      修改如下參數

      #數據目錄
      dataDir=/usr/local/kafka_2.11-0.11.0.0/zookeeper/data
      #客戶端端口
      clientPort=2181
      host.name=localhost

      producer.properties and consumer.properties

      zookeeper.connect=localhost:2181

       

      4:啟動kafka

      bin/zookeeper-server-start.sh config/zookeeper.properties
      bin/kafka-server-start.sh config/server.properties

      終端都是info log

       

      注意:如果是非root用戶,可能會使用到sudo去啟動zookeeper和kafka,但是那樣會失敗必須將kafka整個目錄下的文件的owner和group都改為自己的用戶。

      可以通過jps命令查看兩者是否啟動成功

      2576 QuorumPeerMain表示zookeeper

      如果發現沒啟動成功,可以在zookeeper/config/zookeeper.out里debug。

       

      成功啟動后,可以簡單通過demo進行測試

      github有各種語言的kafka支持

      url:https://github.com/edenhill/librdkafka/tree/v0.9.5

       

       

      可以簡單通過go的demo進行測試

      consumer

      import (
          "fmt"
          "github.com/confluentinc/confluent-kafka-go/kafka"
      )
      
      func main() {
      
          c, err := kafka.NewConsumer(&kafka.ConfigMap{
              "bootstrap.servers": "localhost",
              "group.id":          "myGroup",
              "auto.offset.reset": "earliest",
          })
      
          if err != nil {
              panic(err)
          }
      
          c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)
      
          for {
              msg, err := c.ReadMessage(-1)
              if err == nil {
                  fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
              } else {
                  fmt.Printf("Consumer error: %v (%v)\n", err, msg)
                  break
              }
          }
      
          c.Close()
      }

      producer

      import (
          "fmt"
          "github.com/confluentinc/confluent-kafka-go/kafka"
      )
      
      func main() {
      
          p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
          if err != nil {
              panic(err)
          }
      
        
          go func() {
              for e := range p.Events() {
                  switch ev := e.(type) {
                  case *kafka.Message:
                      if ev.TopicPartition.Error != nil {
                          fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                      } else {
                          fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                      }
                  }
              }
          }()
      
       
          topic := "myTopic"
          for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
              p.Produce(&kafka.Message{
                  TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
                  Value:          []byte(word),
              }, nil)
          }
      
      
          p.Flush(15 * 1000)
      }

       

      posted @ 2018-05-24 13:38  柳下_MBX  閱讀(596)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 欧美人与性囗牲恔配| 狠狠色丁香婷婷亚洲综合| 蜜臀久久精品亚洲一区| 久久亚洲精品国产精品| 99亚洲男女激情在线观看| 国产免费无遮挡吸奶头视频| 国产另类ts人妖一区二区| 久久国产精品色av免费看| 久久国语对白| a级国产乱理伦片在线观看al| 亚洲区综合区小说区激情区| 性欧美VIDEOFREE高清大喷水 | 国产精品高清中文字幕| 亚洲中文字幕无码中文字| 亚洲免费观看一区二区三区| 日本亚洲一区二区精品久久| 欧美国产日韩久久mv| 精品素人AV无码不卡在线观看| 精品日韩色国产在线观看| 好男人社区神马在线观看www | 精品国产美女福到在线不卡| 无码中文字幕人妻在线一区| 日韩一区二区三区不卡片| 一个色的导航| 亚洲成A人片在线观看的电影| 国产精品 无码专区| 女人下边被添全过视频的网址 | 久久热精品视频在线视频| 中文字幕日韩精品有码| 丰满岳乱妇久久久| 国产一区精品在线免费看| 小鲜肉自慰网站xnxx| 中文字幕结果国产精品| 日本一区不卡高清更新二区| 日本强伦片中文字幕免费看| 久久精品久久黄色片看看| 美女自卫慰黄网站| 国产v综合v亚洲欧美久久| 色一伊人区二区亚洲最大| 唐人社视频呦一区二区| 国产精品大片中文字幕|