<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      flume 攔截器

      目的:功能1:判斷json文件,如何格式合格則正常傳輸,否則就不傳輸

                功能2:判斷出合格的json文件,并且key值中包含“date”才進行傳輸

      一、創建一個Json的工具類

      package com.atguigu.gmall.flume.utils;
      import com.alibaba.fastjson.JSONObject;
      import org.apache.flume.Event;
      
      public class JSONUtil {
      //    public static void main(String[] args) {
      //        String log = "{date:2}";
      //        String key = "date";
      //        System.out.println(isJSONValidate(log));
      //        System.out.println(isContainsKey(log, key));
      ////        System.out.println(JSONObject.parseObject(log).containsKey(key));
      //    }
      
      //    判斷log是否為一個合法的json
          public static boolean isJSONValidate(String log) {
              try {
                  JSONObject.parseObject(log);  //是將str轉化為相應的JSONObject對象
                  return true;
              } catch (Exception e) {
      //            e.printStackTrace();
                  return false;
              }
          }
      
          //  判斷json中是否包含key值,其中如果log不是合格的json,直接走異常返回false
          public static boolean isContainsKey(String log,String key){
              try {
                  if (JSONObject.parseObject(log).containsKey(key)) {
                      return true;
                  } else{
                      return  false;
                  }
              } catch (Exception e) {
      //            e.printStackTrace();
                  return false;
              }
          }
      }
      

        二、編寫攔截器,只需要重寫Event intercept和List<Event> intercept方法即可。如果傳輸的json文件不是很復雜,只需要重寫單個event的intercept方法即可

      package com.atguigu.gmall.flume.intercepter;
      
      import com.alibaba.fastjson.JSON;
      import com.alibaba.fastjson.JSONArray;
      import com.alibaba.fastjson.JSONObject;
      import com.atguigu.gmall.flume.utils.JSONUtil;
      import org.apache.flume.Context;
      import org.apache.flume.Event;
      import org.apache.flume.interceptor.Interceptor;
      
      import java.nio.charset.StandardCharsets;
      import java.util.ArrayList;
      import java.util.Iterator;
      import java.util.List;
      
      public class ETLIntercepter implements Interceptor {
          @Override
          public void initialize() {
          }
      
          @Override
              public Event intercept(Event event) {
      //        獲取body中的數據
              byte[] body = event.getBody();
              String log= new String(body, StandardCharsets.UTF_8);
      //        1.判斷是否為合法的json,是:返回event , 否:返回null
      //        if(JSONUtil.isJSONValidate(log ){
      //            return event;
      //        }else{
      //            return null;
      //        }
      //        2.判斷是否為合法的json,且是都包含key值“date”
              if(JSONUtil.isContainsKey(log,"date") ){
                  return event;
              }else{
                  return null;
              }
          }
      
          @Override
          public List<Event> intercept(List<Event> list) {
      //        判斷事件list中的event是都合格,如果不合格就將event的刪掉,返回合格的event列表
      //        如果list中的event都不合格,則返回null
              Iterator<Event> iterator = list.iterator();
              while(iterator.hasNext()){
                  Event event = iterator.next();
                  if(intercept(event) == null){
                      iterator.remove();
                  }
              }
              return list;
          }
      
          @Override
          public void close() {
          }
      
          public static  class Builder implements Interceptor.Builder{
              @Override
              public Interceptor build() {
                  return new ETLIntercepter();
              }
              @Override
              public void configure(Context context) {
              }
          }
      }
      

      三、生成rtProj-1.0-SNAPSHOT-jar-with-dependencies.jar包上傳到flume的lib目錄下,然后再編輯flume的配置文件

      a1.sources = r1
      a1.channels = c1
       
      # Describe/configure the source
      a1.sources.r1.type = taildir
      a1.sources.r1.filegroups=f1
      a1.sources.r1.filegroups.f1 = /workplace/data/log*.*
      a1.sources.r1.positionFile  = /workplace/data/taildir_position.json   
      a1.sources.r1.interceptors = i1
      a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.intercepter.ETLIntercepter$Builder 
      #i1.type :此地址是攔截器中Builder的地址(右鍵點擊Copy Reference,記得將.Builder中的 . 換成$號)
       
      # Use a channel which buffers events in memory
      a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
      a1.channels.c1.kafka.bootstrap.servers=master:9092
      a1.channels.c1.kafka.topic = test1
       
      # Bind the source and sink to the channel
      a1.sources.r1.channels = c1

       四、啟動flume開始傳輸數據

      bin/flume-ng agent --conf conf --conf-file ./conf/job/flume_to_kafka2.conf --name a1 -Dflume.root.logger=INFO,consol
      

      五、往文件里寫數據,查看fafka中的數據,發現已經能成功過濾出正常json文件,且含有date的key值的數據了

      /app/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.80.128:9092 --from-beginning --topic test1

       

      posted @ 2022-11-01 11:53  欣欣姐  Views(180)  Comments(0)    收藏  舉報
      主站蜘蛛池模板: 52熟女露脸国语对白视频| 黄又色又污又爽又高潮| 国产精品一区二区国产馆| 国产成人无码免费视频麻豆| 仲巴县| 成人久久精品国产亚洲av| 亚洲一区二区精品动漫| 亚洲色成人网站www永久四虎| 国产精品国产三级国产试看 | 日韩中文字幕免费在线观看| 国产视频一区二区在线看| 亚洲欧美一区二区三区图片| 国产99视频精品免费专区| 自拍偷区亚洲综合第二区| a级国产乱理伦片在线观看al| 久久国产精品成人影院| 久久国产精品精品国产色婷婷| 国产一区二区不卡精品视频| 韩国午夜理伦三级| 国产午夜福利免费入口| 久久国产成人高清精品亚洲| 国产精品女在线观看| 欧美高清一区三区在线专区| 久青草视频在线免费观看| 欧美日产国产精品日产| 中文字幕人妻在线精品| 亚洲欧美精品一中文字幕| 国产精品亚洲一区二区三区| 亚洲欧美牲交| 亚洲日韩亚洲另类激情文学| 国产69精品久久久久久妇女迅雷| 综合偷自拍亚洲乱中文字幕| 亚洲 中文 欧美 日韩 在线| 国产伦一区二区三区久久| 男女xx00上下抽搐动态图| 999精品色在线播放| 屁股中文字幕一二三四区人妻| 深夜释放自己在线观看| 9久9久热精品视频在线观看| 欧美激情内射喷水高潮| 久久久久夜夜夜精品国产|