flume 攔截器
目的:功能1:判斷json文件,如何格式合格則正常傳輸,否則就不傳輸
功能2:判斷出合格的json文件,并且key值中包含“date”才進行傳輸
一、創建一個Json的工具類
package com.atguigu.gmall.flume.utils;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Event;
public class JSONUtil {
// public static void main(String[] args) {
// String log = "{date:2}";
// String key = "date";
// System.out.println(isJSONValidate(log));
// System.out.println(isContainsKey(log, key));
//// System.out.println(JSONObject.parseObject(log).containsKey(key));
// }
// 判斷log是否為一個合法的json
public static boolean isJSONValidate(String log) {
try {
JSONObject.parseObject(log); //是將str轉化為相應的JSONObject對象
return true;
} catch (Exception e) {
// e.printStackTrace();
return false;
}
}
// 判斷json中是否包含key值,其中如果log不是合格的json,直接走異常返回false
public static boolean isContainsKey(String log,String key){
try {
if (JSONObject.parseObject(log).containsKey(key)) {
return true;
} else{
return false;
}
} catch (Exception e) {
// e.printStackTrace();
return false;
}
}
}
二、編寫攔截器,只需要重寫Event intercept和List<Event> intercept方法即可。如果傳輸的json文件不是很復雜,只需要重寫單個event的intercept方法即可
package com.atguigu.gmall.flume.intercepter;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class ETLIntercepter implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 獲取body中的數據
byte[] body = event.getBody();
String log= new String(body, StandardCharsets.UTF_8);
// 1.判斷是否為合法的json,是:返回event , 否:返回null
// if(JSONUtil.isJSONValidate(log ){
// return event;
// }else{
// return null;
// }
// 2.判斷是否為合法的json,且是都包含key值“date”
if(JSONUtil.isContainsKey(log,"date") ){
return event;
}else{
return null;
}
}
@Override
public List<Event> intercept(List<Event> list) {
// 判斷事件list中的event是都合格,如果不合格就將event的刪掉,返回合格的event列表
// 如果list中的event都不合格,則返回null
Iterator<Event> iterator = list.iterator();
while(iterator.hasNext()){
Event event = iterator.next();
if(intercept(event) == null){
iterator.remove();
}
}
return list;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new ETLIntercepter();
}
@Override
public void configure(Context context) {
}
}
}
三、生成rtProj-1.0-SNAPSHOT-jar-with-dependencies.jar包上傳到flume的lib目錄下,然后再編輯flume的配置文件
a1.sources = r1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = taildir a1.sources.r1.filegroups=f1 a1.sources.r1.filegroups.f1 = /workplace/data/log*.* a1.sources.r1.positionFile = /workplace/data/taildir_position.json a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.intercepter.ETLIntercepter$Builder #i1.type :此地址是攔截器中Builder的地址(右鍵點擊Copy Reference,記得將.Builder中的 . 換成$號) # Use a channel which buffers events in memory a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers=master:9092 a1.channels.c1.kafka.topic = test1 # Bind the source and sink to the channel a1.sources.r1.channels = c1
四、啟動flume開始傳輸數據
bin/flume-ng agent --conf conf --conf-file ./conf/job/flume_to_kafka2.conf --name a1 -Dflume.root.logger=INFO,consol
五、往文件里寫數據,查看fafka中的數據,發現已經能成功過濾出正常json文件,且含有date的key值的數據了
/app/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.80.128:9092 --from-beginning --topic test1


浙公網安備 33010602011771號