当前位置: 首页 > news >正文

做投票链接网站网站建设详细流程视频

做投票链接网站,网站建设详细流程视频,珠海软件开发公司,网站页面自动还原代码本文模拟实际生产环境#xff0c;通过FileBeat采集日志信息到Kafka#xff0c;再通过Flink消费Kafka实时写入Doris。 文章目录 Filebeat采集日志到KafkaFlink消费Kafka实时写入Doris总结 Filebeat采集日志到Kafka 常见的日志采集工具有以下几种#xff1a;Flume、Logstash和… 本文模拟实际生产环境通过FileBeat采集日志信息到Kafka再通过Flink消费Kafka实时写入Doris。 文章目录 Filebeat采集日志到KafkaFlink消费Kafka实时写入Doris总结 Filebeat采集日志到Kafka 常见的日志采集工具有以下几种Flume、Logstash和Filebeat。 Flume采用Java编写它是一个分布式、高度可靠且高度可用的工具旨在高效地搜集、汇总和转移大量日志数据该工具拥有一个简洁且灵活的流数据流架构它配备了可调节的可靠性机制、故障切换以及恢复功能此外Flume通过简单且可扩展的数据模型支持在线分析应用程序。Logstash是一个开源的日志管理和分析工具它能够从多个数据源收集数据对数据进行转换和清洗并将处理后的数据传输到目标系统。Filebeat是一款go语言编写的日志文件收集工具当在服务器上部署其客户端后它会持续监听特定的日志目录或日志文件实时跟踪并读取这些文件的更新内容并将这些数据发送到指定的输出目标例如Elasticsearch或Kafka等。 这里选择Filebeat进行日志采集的主要原因在于其资源消耗极低相较于Flume和LogstashFilebeat占用的内存最少对CPU的负载也最小。它的运行进程十分稳定很少出现崩溃或宕机的情况。 首先下载Filebeat curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.12.0-linux-x86_64.tar.gz解压缩文件 tar xzvf filebeat-8.12.0-linux-x86_64.tar.gz进入目录 cd filebeat-8.12.0-linux-x86_64编写配置文件接入Kafka vim filebeat.yamlfilebeat.yaml的文件内容 filebeat.inputs: - type: logpaths:- /doc/input/*.log # 更换为你的日志文件路径 processors:- include_fields:fields: [message] output.kafka:# 更换为你的Kafka地址和主题.hosts: [192.168.235.130:9092]topic: k2ggcodec:format:string: %{[message]} 运行Filebeat采集日志 ./filebeat -e -c ./filebeat.yaml这是log日志的信息现要求保持原始格式发送到Kafka Filebeat采集日志信息发送到Kafka的主题消费者收到的信息如下Filebeat会添加一些自带的数据比如时间戳和元数据等但是一般情况下只需要采集message里面的信息通过filebeat.yaml中的processors和codec即可实现。 processors处理只保留 message 的字段信息其他字段将被丢弃codec用于定义数据的编码格式将 message 字段的值作为字符串发送到 kafka这样就可以保留日志信息的原始格式发送到Kafka。 消费者消费原始格式的日志消息 Flink消费Kafka实时写入Doris 在写入之前建立doris的数据表用于接收消费的信息 CREATE TABLE transactions (timestamp datetime,user_id INT,transaction_type VARCHAR(50),amount DECIMAL(15, 2),currency CHAR(3),status VARCHAR(20),description TEXT ) DISTRIBUTED BY HASH(user_id) BUCKETS 10 PROPERTIES(replication_num1);引入依赖 dependencygroupIdorg.apache.doris/groupIdartifactIdflink-doris-connector-1.16/artifactIdversion24.0.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-core/artifactIdversion1.17.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion1.17.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.17.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion1.17.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-common/artifactIdversion1.17.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java/artifactIdversion1.17.0/version/dependency主程序 package flink;import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType;import java.util.Properties;public class DorisWrite {public static void main(String[] args) throws Exception {Properties props new Properties();//Kafka broker的地址props.put(bootstrap.servers, 192.168.235.130:9092);props.put(group.id, test);props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(auto.offset.reset, latest);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//指定消费的主题FlinkKafkaConsumerString flinkKafkaConsumer new FlinkKafkaConsumer(k2gg,new SimpleStringSchema(),props);DorisSink.BuilderString builder DorisSink.builder();DorisOptions.Builder dorisBuilder DorisOptions.builder();//Doris的地址以及账号密码等信息dorisBuilder.setFenodes(192.168.235.130:8030).setTableIdentifier(test.transactions).setUsername(root).setPassword(1445413748);Properties pro new Properties();pro.setProperty(format, json);pro.setProperty(read_json_by_line, true);DorisExecutionOptions executionOptions DorisExecutionOptions.builder().setLabelPrefix(label-doris12System.currentTimeMillis()) //streamload label prefix,.setStreamLoadProp(pro).build();builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionOptions).setSerializer(new SimpleStringSerializer()) //serialize according to string.setDorisOptions(dorisBuilder.build());DataStreamSourceString dataStreamSource env.addSource(flinkKafkaConsumer);// 将Kafka数据转换为JSON格式DataStreamString jsonStream dataStreamSource.map(new MapFunctionString, String() {Overridepublic String map(String value) throws Exception {System.out.println(valuevalue);// 分割字符串String[] parts value.split(,);// 创建JSON字符串StringBuilder jsonString new StringBuilder();jsonString.append({);jsonString.append(\timestamp\:\).append(parts[0]).append(\,);jsonString.append(\user_id\:).append(parts[1]).append(,);jsonString.append(\transaction_type\:\).append(parts[2]).append(\,);jsonString.append(\amount\:).append(parts[3]).append(,);jsonString.append(\currency\:\).append(parts[4]).append(\,);jsonString.append(\status\:\).append(parts[5]).append(\,);jsonString.append(\description\:\).append(parts[6].replace(\, )).append(\);jsonString.append(});return jsonString.toString();}});jsonStream.print();jsonStream.sinkTo(builder.build());env.execute(flink kafka to doris by datastream);} } 运行主程序通过Flink消费Kafka的信息写入doris log日志的信息 登录Doris进行验证 mysql -h k8s-master -P 9030 -uroot -p这是没运行主程序之前doris的数据没有2024-10-15这一天的数据。 select * from transactions where date(timestamp) 2024-10-15;运行主程序之后Flink将Kafka主题的信息实时写入Doris。 总结 1.Filebeat格式问题 Filebeat采集日志格式会添加一些自带的额外信息一般情况下只需要message里面的字段信息那么yaml文件配置processors和codec属性即可。processors处理只保留 message 的字段信息其他字段将被丢弃codec用于定义数据的编码格式将 message 字段的值作为字符串发送到 kafka这样就可以保留日志信息的原始格式发送到Kafka。 2.Flink消费Kafka失败 Flink在消费Kafka主题的过程中不要往该主题发送其他格式的数据否则会解析失败尽量新建一个新主题来接收Filebeat采集过来的日志信息。如果还是执行失败可以尝试在setLabelPrefix添加一个时间戳这样保证每次生成的标签前缀都不一样这是因为客户端会生成一个唯一的标签来标识这次导入Doris的操作Doris服务器会根据这个标签来跟踪导入的进度和状态如果导入过程中出现问题Doris会保留失败的数据客户端就可以通过标签重新导入这些数据。 3.实时写入Doris失败 Flink处理字段的数据类型要与Doris匹配可以参考官方文档Doris 和 Flink 列类型映射关系。
http://www.hkea.cn/news/14433232/

相关文章:

  • 中小型网站建设多少钱seo快速排名软件
  • 苏州无锡市住房和城乡建设局网站如何免费开网店的步骤
  • 网站建设 今晟网络做网站一定要域名吗
  • 自己做盗版影视网站google帐户登录网站如何做的
  • 网站建设哪些好免费外网服务器ip地址
  • 南乐网站建设费用南京营销型网站制作
  • 宁波网站建设设计至诚服务wordpress数据库优化插件
  • 雕刻业务网站怎么做江门那里做公司网站好
  • 响应式网站模板是什么阿里接外包吗网站开发
  • 免费网站app源码互联云主机
  • 网站竞价排名商丘网络第一媒体
  • 宝塔建设网站域名进不去下载素材的网站
  • 怎么快速建设小型外贸网站餐饮网站建设规划书
  • 像聚美网站建设费用南昌seo计费管理
  • h5网站开发平台无极小说网
  • 如何做购物网站推广新乡网站seo优化
  • 网站开通宣传怎么写设计公司名字创意
  • 网站建设玖金手指谷哥三十兰州公司做网站
  • 网站提交入口大全房地产大数据获客软件
  • 个人网站毕业设计作品江门seo全网营销
  • 房屋出租网站模板wordpress批量注册账号
  • 手机上做整蛊网站php做网站首页修改
  • 怀柔手机网站建设装修设计软件网页版
  • 阿里云建站视频陕西富国建设工程有限公司网站
  • 珠海网站开发价格玛纳斯县住房和城乡建设局网站
  • 苏州大学网站建设长沙网络科技公司
  • 做网站要的图片斗鱼河南洛阳网络公司
  • 哈尔滨网站制作专业专科计算机哪个专业最吃香
  • 网站子页面如何做seoWordPress建页面
  • 网站建设开发电销话术巴鱼士设计师服务平台