<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      springboot~kafka-stream實現實時統計

      實時統計,也可以理解為流式計算,一個輸入流,一個輸出流,源源不斷。

      Kafka Stream

      Kafka Stream是Apache Kafka從0.10版本引入的一個新Feature。它是提供了對存儲于Kafka內的數據進行流式處理和分析的功能。

      Kafka Stream的特點

      • Kafka Stream提供了一個非常簡單而輕量的Library,它可以非常方便地嵌入任意Java應用中,也可以任意方式打包和部署
      • 除了Kafka外,無任何外部依賴
      • 充分利用Kafka分區機制實現水平擴展和順序性保證
      • 通過可容錯的state store實現高效的狀態操作(如windowed join和aggregation)
      • 支持正好一次處理語義
      • 提供記錄級的處理能力,從而實現毫秒級的低延遲
      • 支持基于事件時間的窗口操作,并且可處理晚到的數據(late arrival of records)
      • 同時提供底層的處理原語Processor(類似于Storm的spout和bolt),以及高層抽象的DSL(類似于Spark的map/group/reduce)

      相關術語

      源處理器和Sink處理器是Kafka Streams中的兩個重要組件,它們分別用于從輸入流獲取數據并將處理后的數據發送到輸出流。以下是它們的工作流程的文字圖示表達:

      [Source Processor] -> [Processor Topology] -> [Sink Processor]
      
      1. 源處理器(Source Processor)

        • 源處理器負責從一個或多個輸入主題(topics)中提取數據,并將數據轉換為KStream或KTable對象。
        • 它通常是處理拓撲結構的起點,從一個或多個輸入主題中讀取數據,并將其發送到處理拓撲中的下一個處理器。
      2. Sink 處理器(Sink Processor)

        • Sink處理器負責將經過處理的數據發送到一個或多個輸出主題,或者執行其他終端操作。
        • 它通常是處理拓撲結構的終點,在處理拓撲的最后階段接收處理后的數據,并將其發送到輸出主題,或者執行其他終端操作,如存儲到數據庫、發送到外部系統等。
      3. Processor Topology

        • 處理拓撲包含了源處理器、中間處理器和Sink處理器,它定義了數據流的處理邏輯。
        • 在處理拓撲中,數據流會通過一系列的處理器進行轉換、聚合和處理,最終到達Sink處理器,完成整個處理流程。

      通過這種處理流程,Kafka Streams可以實現對數據流的靈活處理和轉換,使得你能夠方便地構建實時流處理應用程序。

      kafka stream demo

      依賴

      <!-- kafka -->
      <dependency>
          <groupId>org.springframework.kafka</groupId>
          <artifactId>spring-kafka</artifactId>
          <version>2.5.5.RELEASE</version>
      </dependency>
      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>2.5.1</version>
      </dependency>
      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-streams</artifactId>
        <version>2.5.1</version>
      </dependency>
      

      環境準備

      • 安裝kafka
      • 建立topic,我以keycloak為例,它有login_in這個主題,用來記錄登錄信息
      • 建立topic,如total_record,用來存儲login_in的實時統計的結果
      • 可使用springboot繼承的消費者,去消費total_record,如寫入數據庫進行持久化

      業務代碼

      • 配置類
      @Configuration
      @EnableKafkaStreams
      public class KafkaStreamConfig {
      
      	private static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024;
      
      	@Value("${spring.kafka.bootstrap-servers}")
      	private String hosts;
      
      	@Value("${spring.kafka.consumer.group-id}")
      	private String group;
      
      	@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
      	public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
      		Map<String, Object> props = new HashMap<>();
      		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
      		props.put(StreamsConfig.APPLICATION_ID_CONFIG, group + "_stream_aid");
      		props.put(StreamsConfig.CLIENT_ID_CONFIG, group + "_stream_cid");
      		props.put(StreamsConfig.RETRIES_CONFIG, 3);
      		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");//從最近的消息開始消費
      		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
      		props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
      		return new KafkaStreamsConfiguration(props);
      	}
      
      }
      
      • 消費類
      @Configuration
      @Slf4j
      public class KafkaStreamListener {
      
      	@Autowired
      	ReportLoginTypeMapper reportLoginTypeMapper;
      	@KafkaListener(topics = "total_record")
      	public void listen(ConsumerRecord<String, String> record) {
      		// 將時間戳轉換為本地日期時間
      		LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneId.systemDefault());
      		ReportLoginType reportLoginType=new ReportLoginType();
      		reportLoginType.setLoginType(record.key());
      		reportLoginType.setCreateAt(dateTime);
      		reportLoginType.setCount(Integer.parseInt(record.value()));
      		reportLoginTypeMapper.insert(reportLoginType);
      	}
      
      	@Bean
      	public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
      		KStream<String, String> stream = streamsBuilder.stream("KC_LOGIN");
      		KStream<String, String> serializedStream = stream.mapValues(jsonString -> {
      			// 分組依據
      			if (JSONUtil.parseObj(jsonString).containsKey("details")) {
      				JSONObject details = JSONUtil.parseObj(jsonString).getJSONObject("details");
      				if (details.containsKey("loginType")) {
      					String loginType = details.getStr("loginType");
      					return loginType;
      				}
      				return "";
      			}
      			else {
      				return "";
      			}
      		});
      		/**
      		 * 處理消息的value
      		 */
      		serializedStream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
      			@Override
      			public Iterable<String> apply(String value) {
      				return Arrays.asList(value.split(" "));
      			}
      		}).filter((key, value) -> !value.equals(""))
      				// 按照value進行聚合處理
      				.groupBy((key, value) -> value)// 這進而的value是kafka的消息內容
      				// 時間窗口
      				.windowedBy(TimeWindows.of(Duration.ofSeconds(60)))
      				// 統計單詞的個數
      				.count()
      				// 轉換為kStream
      				.toStream().map((key, value) -> {
      					// key是分組的key,它是一個window對象,它里面有分組key和時間窗口的開始時間和結束時間,方便后期我們統計,value是分組count的結果
      					return new KeyValue<>(key.toString(), value.toString());
      				})
      				// 發送消息
      				.to("topic-out");
      		return stream;
      	}
      
      }
      

      上面代碼在分組統計之后,給把數據發到topic-out的kafka主題里,需要注意kafka主題的key是一個代碼分組key和窗口期的字符串,方便我們后期做數據統計,一般這些窗口期的數據和key一樣,會寫到數據表里,像我們查詢數據表時,會根據它們選擇最大的value值,因為同一窗口里的計數,我們取最大就可以,它已經包含了相同窗口期的其它值。

      select login_type,window_start,window_end,max(count) FROM report_login_type
      where login_type='password' and create_at>='2024-01-10 14:00:00' 
      group by login_type,window_start,window_end
      

      最后看一下total_record的內容

      posted @ 2024-01-09 16:20  張占嶺  閱讀(1262)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 龙川县| 欧洲精品色在线观看| 麻豆精品一区二区视频在线| 国色天香成人一区二区| 骚虎视频在线观看| 亚洲全网成人资源在线观看| 漂亮的人妻不敢呻吟被中出| 在线观看无码av免费不卡网站| 精品一区二区不卡免费| 熟女在线视频一区二区三区| 国产午精品午夜福利757视频播放| 久久国产精品成人免费| 国产69成人精品视频免费| 国产精品十八禁在线观看| 最新精品露脸国产在线| 成人自拍小视频在线观看| 亚洲av永久无码精品网站| 久久er99热精品一区二区 | 蜜桃一区二区三区在线看| 国产日韩一区二区在线| 中文字幕av无码免费一区| 欧美成人www免费全部网站| 亚洲人成电影在线天堂色| 欧美猛少妇色xxxxx| 亚洲欧洲无码av电影在线观看| 人妻av无码系列一区二区三区| 中文字幕乱码熟女人妻水蜜桃| 日韩一区二区三区女优丝袜| 377P欧洲日本亚洲大胆| 铜陵市| 人人爽人人爽人人片av东京热| 亚洲精品午夜国产VA久久成人| 精品国产迷系列在线观看| 2022最新国产在线不卡a| 亚洲欧美日韩国产四季一区二区三区| 中文字幕国产精品自拍| 99精品久久免费精品久久| 日韩欧美在线综合网另类| 五月丁香激激情亚洲综合| 大地资源免费视频观看| 开阳县|