山东省建设注册执业中心网站,网站自己建设,咸阳网站建设,推荐几个安全免费的网站测试是否连接成功#xff1a; 在主节点flume目录下输入命令: bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.loggerinfo,console # 这个file_to_kafka.conf文件就是我们的配置文件 然后在另一台节点输入命令进行消费数据#xff1a; kafka-cons… 测试是否连接成功 在主节点flume目录下输入命令: bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.loggerinfo,console # 这个file_to_kafka.conf文件就是我们的配置文件 然后在另一台节点输入命令进行消费数据 kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log 然后再开一个主节点终端在这个主节点上面在对应生成数据的文件追加数据 这样就可以看见第一个主节点的终端和消费节点上面有数据变化了 下面这个是配置拦截器把json格式的内容进行消费其他的进行拦截 Flume采集数据到kafka的配置conf文件内容 #定义组件 #1、定义source、channel、agent名称 a1.sources r1 a1.channels c1 #配置source #2、描述source a1.sources.r1.type TAILDIR #指定监控的组名 a1.sources.r1.filegroups f1 #指定f1组监控的路径 a1.sources.r1.filegroups.f1 /opt/software/applog/log/app.* #指定断点续传的文件 a1.sources.r1.positionFile /opt/software/flume/taildir_position.json # 配置拦截器 a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder #配置channel #3、描述channel a1.channels.c1.type org.apache.flume.channel.kafka.KafkaChannel #指定kafka集群 a1.channels.c1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092 #指定数据写到kafka哪个topic a1.channels.c1.kafka.topic topic_log #是否以Event对象的形式写入kafka a1.channels.c1.parseAsFlumeEvent false #组装 #4、关联source-channel a1.sources.r1.channels c1 如果一开始测试我们flume和kafka是否能成功采集数据的时候我们应该先把拦截器的两行配置先删除后面再根据我们需要的内容进行拦截对应的内容。就比如我们期望我们采集到数据是json格式的如果不是json格式的话我们就放弃这个数据。 具体操作
1创建Maven工程flume-interceptor
2创建包com.gugu.gmall.flume.interceptor
3在pom.xml文件中添加如下配置 dependencies dependency groupIdorg.apache.flume/groupId artifactIdflume-ng-core/artifactId version1.9.0/version scopeprovided/scope /dependency dependency groupIdcom.alibaba/groupId artifactIdfastjson/artifactId version1.2.62/version /dependency /dependencies build plugins plugin artifactIdmaven-compiler-plugin/artifactId version2.3.2/version configuration source1.8/source target1.8/target /configuration /plugin plugin artifactIdmaven-assembly-plugin/artifactId configuration descriptorRefs descriptorRefjar-with-dependencies/descriptorRef /descriptorRefs /configuration executions execution idmake-assembly/id phasepackage/phase goals goalsingle/goal /goals /execution /executions /plugin /plugins /build 在com.gugu.gmall.flume.utils包下创建JSONUtil类
package com.gugu.gmall.flume.utils;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONException;public class JSONUtil {
/*
* 通过异常判断是否是json字符串
* 是返回true 不是返回false
* */public static boolean isJSONValidate(String log){try {JSONObject.parseObject(log);return true;}catch (JSONException e){return false;}}
} 在com.gugu.gmall.flume.interceptor包下创建ETLInterceptor类
package com.gugu.gmall.flume.interceptor;import com.atguigu.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;public class ETLInterceptor implements Interceptor {Overridepublic void initialize() {}Overridepublic Event intercept(Event event) {//1、获取body当中的数据并转成字符串byte[] body event.getBody();String log new String(body, StandardCharsets.UTF_8);//2、判断字符串是否是一个合法的json是返回当前event不是返回nullif (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}Overridepublic ListEvent intercept(ListEvent list) {IteratorEvent iterator list.iterator();while (iterator.hasNext()){Event next iterator.next();if(intercept(next)null){iterator.remove();}}return list;}public static class Builder implements Interceptor.Builder{Overridepublic Interceptor build() {return new ETLInterceptor();}Overridepublic void configure(Context context) {}}Overridepublic void close() {}
} 然后进行打包复制到我们flume下的lib目录下就可以了 然后再和上面测试一样进行测试连接是否成功把非json格式的数据拦截成功 感谢各位的观看创作不易能不能给哥们来一个点赞呢
好了今天的分享就这么多了有什么不清楚或者我写错的地方请多多指教
私信评论我呗
关注我下一篇不迷路哦