<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      flume+kafka+flink整合

      一、安裝軟件
      1、安裝flume
      cd /opt/Servers
      tar -zxvf flume1.8-bin.tar.gz
      gedit /opt/Servers/flume1.8/conf/kafka.properties
       
      agent.sources = s1
      agent.channels = c1
      agent.sinks = k1
      #從指定文件讀取數據
      agent.sources.s1.type=exec
      agent.sources.s1.command=tail -F /var/log/script/test111
      agent.sources.s1.channels=c1
      #配置傳輸通道
      agent.channels.c1.type=memory
      agent.channels.c1.capacity=10000
      agent.channels.c1.transactionCapacity=100
      #設置Kafka接收器
      agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
      #設置Kafka的broker地址和端口號
      agent.sinks.k1.brokerList=localhost:9092
      #設置Kafka的Topic
      agent.sinks.k1.topic=test5
      #設置序列化方式
      agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
      agent.sinks.k1.channel=c1
       
      2、安裝kafka
      配置文件 /opt/Servers/kafka_2.11-2.0.0/config/server.properties
      安裝參考單獨的文檔
      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #    http://www.apache.org/licenses/LICENSE-2.0
      #
      # Unless required by applicable law or agreed to in writing, software
      # distributed under the License is distributed on an "AS IS" BASIS,
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      # See the License for the specific language governing permissions and
      # limitations under the License.
      
      # see kafka.server.KafkaConfig for additional details and defaults
      
      ############################# Server Basics #############################
      
      # The id of the broker. This must be set to a unique integer for each broker.
      broker.id=0
      
      ############################# Socket Server Settings #############################
      
      # The address the socket server listens on. It will get the value returned from 
      # java.net.InetAddress.getCanonicalHostName() if not configured.
      #   FORMAT:
      #     listeners = listener_name://host_name:port
      #   EXAMPLE:
      #     listeners = PLAINTEXT://your.host.name:9092
      listeners=PLAINTEXT://localhost:9092
      
      # Hostname and port the broker will advertise to producers and consumers. If not set, 
      # it uses the value for "listeners" if configured.  Otherwise, it will use the value
      # returned from java.net.InetAddress.getCanonicalHostName().
      #advertised.listeners=PLAINTEXT://your.host.name:9092
      
      # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
      #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
      
      # The number of threads that the server uses for receiving requests from the network and sending responses to the network
      num.network.threads=3
      
      # The number of threads that the server uses for processing requests, which may include disk I/O
      num.io.threads=8
      
      # The send buffer (SO_SNDBUF) used by the socket server
      socket.send.buffer.bytes=102400
      
      # The receive buffer (SO_RCVBUF) used by the socket server
      socket.receive.buffer.bytes=102400
      
      # The maximum size of a request that the socket server will accept (protection against OOM)
      socket.request.max.bytes=104857600
      
      
      ############################# Log Basics #############################
      
      # A comma separated list of directories under which to store log files
      log.dirs=/opt/Servers/Cache/kafka
      
      # The default number of log partitions per topic. More partitions allow greater
      # parallelism for consumption, but this will also result in more files across
      # the brokers.
      num.partitions=1
      
      # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
      # This value is recommended to be increased for installations with data dirs located in RAID array.
      num.recovery.threads.per.data.dir=1
      
      ############################# Internal Topic Settings  #############################
      # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
      # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
      offsets.topic.replication.factor=1
      transaction.state.log.replication.factor=1
      transaction.state.log.min.isr=1
      
      ############################# Log Flush Policy #############################
      
      # Messages are immediately written to the filesystem but by default we only fsync() to sync
      # the OS cache lazily. The following configurations control the flush of data to disk.
      # There are a few important trade-offs here:
      #    1. Durability: Unflushed data may be lost if you are not using replication.
      #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
      #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
      # The settings below allow one to configure the flush policy to flush data after a period of time or
      # every N messages (or both). This can be done globally and overridden on a per-topic basis.
      
      # The number of messages to accept before forcing a flush of data to disk
      #log.flush.interval.messages=10000
      
      # The maximum amount of time a message can sit in a log before we force a flush
      #log.flush.interval.ms=1000
      
      ############################# Log Retention Policy #############################
      
      # The following configurations control the disposal of log segments. The policy can
      # be set to delete segments after a period of time, or after a given size has accumulated.
      # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
      # from the end of the log.
      
      # The minimum age of a log file to be eligible for deletion due to age
      log.retention.hours=168
      
      # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
      # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
      #log.retention.bytes=1073741824
      
      # The maximum size of a log segment file. When this size is reached a new log segment will be created.
      log.segment.bytes=1073741824
      
      # The interval at which log segments are checked to see if they can be deleted according
      # to the retention policies
      log.retention.check.interval.ms=300000
      
      ############################# Zookeeper #############################
      
      # Zookeeper connection string (see zookeeper docs for details).
      # This is a comma separated host:port pairs, each corresponding to a zk
      # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
      # You can also append an optional chroot string to the urls to specify the
      # root directory for all kafka znodes.
      zookeeper.connect=localhost:2181
      
      # Timeout in ms for connecting to zookeeper
      zookeeper.connection.timeout.ms=6000
      
      
      ############################# Group Coordinator Settings #############################
      
      # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
      # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
      # The default value for this is 3 seconds.
      # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
      # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
      group.initial.rebalance.delay.ms=0
       
      3、安裝flink解壓即可
      安裝參考單獨的文檔
       
      4、安裝zookeeper
      安裝參考單獨的文檔
       
      二、啟動軟件
      1、啟動zk
      cd /opt/Servers/zookeeper-3.4.10/bin
      ./zkServer.sh start
       
      2、啟動kafka(如果沒有topic5的話需要創建一個)
      cd /opt/Servers/kafka_2.11-2.0.0
      ./bin/kafka-server-start.sh config/server.properties &
      ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test5
       
      3、啟動 flume
      /opt/Servers/flume1.8/bin/flume-ng agent -c /opt/Servers/flume1.8/conf/ -f /opt/Servers/flume1.8/conf/kafka.properties -n agent -Dflume.root.logger=DEBUG,console
       
      4、啟動flink
      cd /opt/Servers/flink-1.7.2
      ./bin/start-cluster.sh
       
      5、打開代碼
      直接運行或打包上傳到flink上面【maven里面線clear然后package】
      flink run -c com.heishuidi.FlinkKafka /opt/JavaProject/flinkTest/target/flinkTest-1.0-SNAPSHOT.jar
       
      6、開始生產數據到 /var/log/script目錄
       
      posted @ 2025-10-27 14:09  Robots2  閱讀(6)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 久久人人爽爽人人爽人人片av| 耿马| 成年女性特黄午夜视频免费看| 玩弄放荡人妻少妇系列| 九九综合九色综合网站| 国产精品一区二区三区黄| 午夜福利在线永久视频| 东京热一精品无码av| 国产亚洲精品久久久久久无亚洲| 欧美精品亚洲精品日韩专| 国产老头多毛Gay老年男| 99国产精品国产精品久久| 18无码粉嫩小泬无套在线观看| 日韩精品一区二区三区久| 久久综合97丁香色香蕉| 扶风县| 国产精品久久久久无码网站| 韩国精品久久久久久无码| 午夜通通国产精品福利| 国产乱码精品一区二三区| 无遮高潮国产免费观看| 国产亚洲中文字幕久久网| 日韩人妻无码精品久久| 中文字幕va一区二区三区| 亚洲国产av久久久| 成人无码午夜在线观看| 国产网红女主播精品视频| 久久精品国产精品亚洲综合| 国产不卡一区不卡二区| 中文字幕乱码在线播放| 亚洲男人第一无码av网站| 国内永久福利在线视频图片 | 四虎国产精品永久入口| 99在线小视频| 大尺度国产一区二区视频| 91超碰在线精品| 日本大片在线看黄a∨免费| 欧美乱码伦视频免费| 丰原市| 开心婷婷五月激情综合社区| 做暖暖视频在线看片免费|