当前位置: 首页 > news >正文

山东省建设注册执业中心网站网站自己建设

山东省建设注册执业中心网站,网站自己建设,咸阳网站建设,推荐几个安全免费的网站测试是否连接成功#xff1a; 在主节点flume目录下输入命令: bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.loggerinfo,console # 这个file_to_kafka.conf文件就是我们的配置文件 然后在另一台节点输入命令进行消费数据#xff1a; kafka-cons… 测试是否连接成功 在主节点flume目录下输入命令: bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.loggerinfo,console # 这个file_to_kafka.conf文件就是我们的配置文件 然后在另一台节点输入命令进行消费数据 kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log 然后再开一个主节点终端在这个主节点上面在对应生成数据的文件追加数据 这样就可以看见第一个主节点的终端和消费节点上面有数据变化了  下面这个是配置拦截器把json格式的内容进行消费其他的进行拦截 Flume采集数据到kafka的配置conf文件内容 #定义组件 #1、定义source、channel、agent名称 a1.sources r1 a1.channels c1 #配置source #2、描述source a1.sources.r1.type TAILDIR #指定监控的组名 a1.sources.r1.filegroups f1 #指定f1组监控的路径 a1.sources.r1.filegroups.f1 /opt/software/applog/log/app.* #指定断点续传的文件 a1.sources.r1.positionFile /opt/software/flume/taildir_position.json # 配置拦截器 a1.sources.r1.interceptors  i1 a1.sources.r1.interceptors.i1.type com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder #配置channel #3、描述channel a1.channels.c1.type org.apache.flume.channel.kafka.KafkaChannel #指定kafka集群 a1.channels.c1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092 #指定数据写到kafka哪个topic a1.channels.c1.kafka.topic topic_log #是否以Event对象的形式写入kafka a1.channels.c1.parseAsFlumeEvent false #组装  #4、关联source-channel a1.sources.r1.channels c1 如果一开始测试我们flume和kafka是否能成功采集数据的时候我们应该先把拦截器的两行配置先删除后面再根据我们需要的内容进行拦截对应的内容。就比如我们期望我们采集到数据是json格式的如果不是json格式的话我们就放弃这个数据。 具体操作 1创建Maven工程flume-interceptor 2创建包com.gugu.gmall.flume.interceptor 3在pom.xml文件中添加如下配置 dependencies     dependency         groupIdorg.apache.flume/groupId         artifactIdflume-ng-core/artifactId         version1.9.0/version         scopeprovided/scope     /dependency     dependency         groupIdcom.alibaba/groupId         artifactIdfastjson/artifactId         version1.2.62/version     /dependency /dependencies build     plugins         plugin             artifactIdmaven-compiler-plugin/artifactId             version2.3.2/version             configuration                 source1.8/source                 target1.8/target             /configuration         /plugin         plugin             artifactIdmaven-assembly-plugin/artifactId             configuration                 descriptorRefs                     descriptorRefjar-with-dependencies/descriptorRef                 /descriptorRefs             /configuration             executions                 execution                     idmake-assembly/id                     phasepackage/phase                     goals                         goalsingle/goal                     /goals                 /execution             /executions         /plugin     /plugins /build 在com.gugu.gmall.flume.utils包下创建JSONUtil类 package com.gugu.gmall.flume.utils; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONException;public class JSONUtil { /* * 通过异常判断是否是json字符串 * 是返回true 不是返回false * */public static boolean isJSONValidate(String log){try {JSONObject.parseObject(log);return true;}catch (JSONException e){return false;}} } 在com.gugu.gmall.flume.interceptor包下创建ETLInterceptor类  package com.gugu.gmall.flume.interceptor;import com.atguigu.gmall.flume.utils.JSONUtil; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List;public class ETLInterceptor implements Interceptor {Overridepublic void initialize() {}Overridepublic Event intercept(Event event) {//1、获取body当中的数据并转成字符串byte[] body event.getBody();String log new String(body, StandardCharsets.UTF_8);//2、判断字符串是否是一个合法的json是返回当前event不是返回nullif (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}Overridepublic ListEvent intercept(ListEvent list) {IteratorEvent iterator list.iterator();while (iterator.hasNext()){Event next iterator.next();if(intercept(next)null){iterator.remove();}}return list;}public static class Builder implements Interceptor.Builder{Overridepublic Interceptor build() {return new ETLInterceptor();}Overridepublic void configure(Context context) {}}Overridepublic void close() {} } 然后进行打包复制到我们flume下的lib目录下就可以了 然后再和上面测试一样进行测试连接是否成功把非json格式的数据拦截成功 感谢各位的观看创作不易能不能给哥们来一个点赞呢 好了今天的分享就这么多了有什么不清楚或者我写错的地方请多多指教 私信评论我呗 关注我下一篇不迷路哦
http://www.hkea.cn/news/14330087/

相关文章:

  • 什么是速成网站access2003做网站
  • 广州网站建设网站托管运营宁德市蕉城区
  • 长春自助建站软件php初学者网站
  • 做自媒体要知道的网站百度没有排名的点击软件
  • 宁波品牌网站设计价格自定义域名
  • 建设工业网站首页wordpress公司展示网站
  • 汝南专业网站建设外贸销售平台有哪些
  • 汕头市门户网站建设wordpress寻模板
  • 小学门户网站建设情况汇报做算法题网站
  • 软件开网站建设骗术wordpress插件丢失
  • 企业网站域名注册查询网站建设开发报价方案模板
  • 湘潭做网站 联系磐石网络wordpress后台样式
  • 没有域名的网站做装修行业营销型网站
  • 财税营销型网站泉州企业网站维护制作
  • 北京专业网站设计推荐wordpress excerpt
  • 建设一个聊天类的网站西安建设工程交易信息网
  • 怎么在百度做网站推广页面设计美工
  • 溧阳有没有做网站的公司孝感市网站建设公司
  • wap网站还用吗做网站大约多少钱
  • 建设新网站征求意见保定市清苑区网站建设
  • 摄影师网站模板陕西建设执业中心网站办事大厅
  • 全能网站建设完全自学手册昆明网站设计都需要设计什么
  • 网站页脚怎么做能好看点免费app制作工具
  • 做论文常用网站漳州 网站设计
  • 做电子请帖的网站公司网站建设需要注意事项
  • 上门做网站视频素材网免费
  • wordpress实现注册功能长沙优化网站服务
  • 中兴豫建设管理有限公司网站wordpress副标题修改代码
  • 帮别人做视频剪辑的网站wix和wordpress区别
  • 如何在八戒网便宜做网站网站制作案例