当前位置: 首页 > news >正文

做网站需要每年交钱吗学校网站查询学历

做网站需要每年交钱吗,学校网站查询学历,德州核酸检测最新公告,珠海h5模板建站Kafka-Eagle 监控 Kafka-Eagle 框架可以监控 Kafka 集群的整体运行情况#xff0c;在生产环境中经常使用。 MySQL环境准备 Kafka-Eagle 的安装依赖于 MySQL#xff0c;MySQL 主要用来存储可视化展示的数据。 安装步骤参考#xff1a;P61 尚硅谷 kafka监控_MySQL环境准备 …Kafka-Eagle 监控 Kafka-Eagle 框架可以监控 Kafka 集群的整体运行情况在生产环境中经常使用。 MySQL环境准备 Kafka-Eagle 的安装依赖于 MySQLMySQL 主要用来存储可视化展示的数据。 安装步骤参考P61 尚硅谷 kafka监控_MySQL环境准备 Kafka 环境准备 关闭 Kafka 集群 [atguiguhadoop102 kafka]$ kf.sh stop修改 /opt/module/kafka/bin/kafka-server-start.sh [atguiguhadoop102 kafka]$ vim bin/kafka-server-start.sh修改如下参数值 if [ x$KAFKA_HEAP_OPTS x ]; thenexport KAFKA_HEAP_OPTS-Xmx1G -Xms1G fi为 if [ x$KAFKA_HEAP_OPTS x ]; thenexport KAFKA_HEAP_OPTS-server -Xms2G -Xmx2G -XX:PermSize128m -XX:UseG1GC -XX:MaxGCPauseMillis200 -XX:ParallelGCThreads8 -XX:ConcGCThreads5 -XX:InitiatingHeapOccupancyPercent70export JMX_PORT9999#export KAFKA_HEAP_OPTS-Xmx1G -Xms1G fi初始内存只分配1G如果要使用 Eagle 功能我们可以将内存设置为 2G。 注意修改之后在启动 Kafka 之前要分发至其他节点。 [atguiguhadoop102 bin]$ xsync kafka-server-start.shKafka-Eagle 安装 官网https://www.kafka-eagle.org/ 上传压缩包 kafka-eagle-bin-2.0.8.tar.gz 到集群 /opt/software 目录 解压到本地 [atguiguhadoop102 software]$ tar -zxvf kafka-eagle-bin-2.0.8.tar.gz进入刚才解压的目录 [atguiguhadoop102 kafka-eagle-bin-2.0.8]$ ll总用量 79164 -rw-rw-r--. 1 atguigu atguigu 81062577 10 月 13 00:00 efak-web-2.0.8-bin.tar.gz将 efak-web-2.0.8-bin.tar.gz 解压至 /opt/module [atguiguhadoop102 kafka-eagle-bin-2.0.8]$ tar -zxvf efak-web-2.0.8-bin.tar.gz -C /opt/module/修改名称 [atguiguhadoop102 module]$ mv efak-web-2.0.8/ efak修改配置文件/opt/module/efak/conf/system-config.properties [atguiguhadoop102 conf]$ vim system-config.properties###################################### # multi zookeeper kafka cluster list # Settings prefixed with kafka.eagle. will be deprecated, use efak.instead ###################################### efak.zk.cluster.aliascluster1 cluster1.zk.listhadoop102:2181,hadoop103:2181,hadoop104:2181/kafka ###################################### # zookeeper enable acl ###################################### cluster1.zk.acl.enablefalse cluster1.zk.acl.schemadigest cluster1.zk.acl.usernametest cluster1.zk.acl.passwordtest123 ###################################### # broker size online list ###################################### cluster1.efak.broker.size20 ###################################### # zk client thread limit ###################################### kafka.zk.limit.size32 ###################################### # EFAK webui port ###################################### efak.webui.port8048 ###################################### # kafka jmx acl and ssl authenticate ###################################### cluster1.efak.jmx.aclfalse cluster1.efak.jmx.userkeadmin cluster1.efak.jmx.passwordkeadmin123 cluster1.efak.jmx.sslfalse cluster1.efak.jmx.truststore.location/data/ssl/certificates/kafka.truststore cluster1.efak.jmx.truststore.passwordke123456 ###################################### # kafka offset storage ###################################### # offset 保存在 kafka cluster1.efak.offset.storagekafka ###################################### # kafka jmx uri ###################################### cluster1.efak.jmx.uriservice:jmx:rmi:///jndi/rmi://%s/jmxrmi ###################################### # kafka metrics, 15 days by default ###################################### efak.metrics.chartstrue efak.metrics.retain15 ###################################### # kafka sql topic records max ###################################### efak.sql.topic.records.max5000 efak.sql.topic.preview.records.max10 ###################################### # delete kafka topic token ###################################### efak.topic.tokenkeadmin ###################################### # kafka sasl authenticate ###################################### cluster1.efak.sasl.enablefalse cluster1.efak.sasl.protocolSASL_PLAINTEXT cluster1.efak.sasl.mechanismSCRAM-SHA-256 cluster1.efak.sasl.jaas.configorg.apache.kafka.common.security.scram.ScramL oginModule required usernamekafka passwordkafka-eagle; cluster1.efak.sasl.client.id cluster1.efak.blacklist.topics cluster1.efak.sasl.cgroup.enablefalse cluster1.efak.sasl.cgroup.topics cluster2.efak.sasl.enablefalse cluster2.efak.sasl.protocolSASL_PLAINTEXT cluster2.efak.sasl.mechanismPLAIN cluster2.efak.sasl.jaas.configorg.apache.kafka.common.security.plain.PlainL oginModule required usernamekafka passwordkafka-eagle; cluster2.efak.sasl.client.id cluster2.efak.blacklist.topics cluster2.efak.sasl.cgroup.enablefalse cluster2.efak.sasl.cgroup.topics ###################################### # kafka ssl authenticate ###################################### cluster3.efak.ssl.enablefalse cluster3.efak.ssl.protocolSSL cluster3.efak.ssl.truststore.location cluster3.efak.ssl.truststore.password cluster3.efak.ssl.keystore.location cluster3.efak.ssl.keystore.password cluster3.efak.ssl.key.password cluster3.efak.ssl.endpoint.identification.algorithmhttps cluster3.efak.blacklist.topics cluster3.efak.ssl.cgroup.enablefalse cluster3.efak.ssl.cgroup.topics ###################################### # kafka sqlite jdbc driver address ###################################### # 配置 mysql 连接 efak.drivercom.mysql.jdbc.Driver efak.urljdbc:mysql://hadoop102:3306/ke?useUnicodetruecharacterEncodingUTF-8zeroDateTimeBehaviorconvertToNull efak.usernameroot efak.password000000 ###################################### # kafka mysql jdbc driver address ###################################### #efak.drivercom.mysql.cj.jdbc.Driver #efak.urljdbc:mysql://127.0.0.1:3306/ke?useUnicodetruecharacterEncodingUTF-8zeroDateTimeBehaviorconvertToNull #efak.usernameroot #efak.password123456添加环境变量 [atguiguhadoop102 conf]$ sudo vim /etc/profile.d/my_env.sh# kafkaEFAK export KE_HOME/opt/module/efak export PATH$PATH:$KE_HOME/bin注意source /etc/profile [atguiguhadoop102 conf]$ source /etc/profile启动 注意启动之前需要先启动 zk 以及 kafka [atguiguhadoop102 kafka]$ kf.sh start启动 efak [atguiguhadoop102 efak]$ bin/ke.sh startVersion 2.0.8 -- Copyright 2016-2021 ***************************************************************** * EFAK Service has started success. * Welcome, Now you can visit http://192.168.10.102:8048 * Account:admin ,Password:123456 ***************************************************************** * Usage ke.sh [start|status|stop|restart|stats] /Usage * Usage https://www.kafka-eagle.org/ /Usage *****************************************************************如果停止 efak执行命令 [atguiguhadoop102 efak]$ bin/ke.sh stopKafka-Eagle 页面操作 登录页面查看监控数据 http://192.168.10.102:8048/ 主面板 Brokers Topics Zookeepers Consumers 大屏信息 Kafka-Kraft 模式 Kafka-Kraft 架构 左图为 Kafka 现有架构元数据在 zookeeper 中运行时动态选举 controller由 controller 进行 Kafka 集群管理。 右图为 kraft 模式架构实验性不再依赖 zookeeper 集群而是用三台 controller 节点代替 zookeeper元数据保存在 controller 中由 controller 直接进行 Kafka 集群管理。 这样做的好处有以下几个 Kafka 不再依赖外部框架而是能够独立运行controller 管理集群时不再需要从 zookeeper 中先读取数据集群性能上升由于不依赖 zookeeper集群扩展时不再受到 zookeeper 读写能力限制controller 不再动态选举而是由配置文件规定。 这样我们可以有针对性的加强 controller 节点的配置而不是像以前一样对随机 controller 节点的高负载束手无策。 Kafka-Kraft 集群部署 Kafka-Kraft 集群启动停止脚本 在 /home/atguigu/bin 目录下创建文件 kf2.sh 脚本文件 [atguiguhadoop102 bin]$ vim kf2.sh脚本如下 #! /bin/bashcase $1 in start){for i in hadoop102 hadoop103 hadoop104doecho --------启动 $i Kafka2-------ssh $i /opt/module/kafka2/bin/kafka-server-start.sh -daemon /opt/module/kafka2/config/kraft/server.propertiesdone };; stop){for i in hadoop102 hadoop103 hadoop104doecho --------停止 $i Kafka2-------ssh $i /opt/module/kafka2/bin/kafka-server-stop.sh done };; esac添加执行权限 [atguiguhadoop102 bin]$ chmod x kf2.sh启动集群命令 [atguiguhadoop102 ~]$ kf2.sh start停止集群命令 [atguiguhadoop102 ~]$ kf2.sh stopKafka 集成 Kafka 集成 Flume Flume 是一个在大数据开发中非常常用的组件可以用于 Kafka 的生产者也可以用于 Kafka 的消费者。 Flume 环境准备 启动 kafka 集群 [atguiguhadoop102 ~]$ zk.sh start [atguiguhadoop102 ~]$ kf.sh start启动 kafka 消费者 [atguiguhadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic firstFlume 安装步骤 参考P66 尚硅谷 Kafka 集成 Flume 环境准备 Flume 生产者 通过 Flume 实时监控 app.log 文件数据的变化使用 taildir source支持断点续传、实时监控文件变化并获取到数据由于我们传输的就是普通的日志没有必要追求太高的可靠性使用 memory channel完全基于内存速度非常快断电后会丢数据最多丢 100 条日志因为内存大小最大上线就是 100数据是发往到 kafka 的所以使用 kafka sink发到 first 主题中启动消费者消费。 配置 Flume 在 hadoop102 节点的 Flume 的 job 目录下创建 file_to_kafka.conf [atguiguhadoop102 flume]$ mkdir jobs [atguiguhadoop102 flume]$ vim jobs/file_to_kafka.conf 配置文件内容如下 # 1 组件定义 a1.sources r1 a1.sinks k1 a1.channels c1# 2 配置source a1.sources.r1.type TAILDIR a1.sources.r1.filegroups f1 a1.sources.r1.filegroups.f1 /opt/module/applog/app.* # 监控文件目录 a1.sources.r1.positionFile /opt/module/flume/taildir_position.json # offset文件 支持断点续传# 3 配置channel a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100# 4 配置sink a1.sinks.k1.type org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k1.kafka.topic first a1.sinks.k1.kafka.flumeBatchSize 20 a1.sinks.k1.kafka.producer.acks 1 a1.sinks.k1.kafka.producer.linger.ms 1# 5 拼接组件 a1.sources.r1.channels c1 a1.sinks.k1.channel c1启动 Flume [atguiguhadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/file_to_kafka.conf 向 /opt/module/applog/app.log 里追加数据查看 kafka 消费者消费情况 [atguiguhadoop102 module]$ mkdir applog [atguiguhadoop102 applog]$ echo hello /opt/module/applog/app.log观察 kafka 消费者能够看到消费的 hello 数据 Flume 消费者 Flume 作为消费者首先肯定选用 kafka source通道选择 memory channel打印到控制台选择 logger sink 配置 Flume 在 hadoop102 节点的 Flume 的 /opt/module/flume/jobs 目录下创建 kafka_to_file.conf [atguiguhadoop102 jobs]$ vim kafka_to_file.conf配置文件内容如下 # 1 组件定义 a1.sources r1 a1.sinks k1 a1.channels c1# 2 配置source a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize 50 a1.sources.r1.batchDurationMillis 200 a1.sources.r1.kafka.bootstrap.servers hadoop102:9092 a1.sources.r1.kafka.topics first a1.sources.r1.kafka.consumer.group.id custom.g.id# 3 配置channel a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100# 4 配置sink a1.sinks.k1.type logger# 5 拼接组件 a1.sources.r1.channels c1 a1.sinks.k1.channel c1启动 Flume [atguiguhadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/kafka_to_file.conf -Dflume.root.loggerINFO,console启动 kafka 生产者 [atguiguhadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first并输入数据例如hello 观察控制台输出的日志 Kafka 集成 Flink Flink是一个在大数据开发中非常常用的组件可以用于 Kafka 的生产者也可以用于 Kafka 的消费者。 Flink 环境准备 创建一个 maven 项目 flink-kafka 添加配置文件 dependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.13.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.12/artifactIdversion1.13.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.12/artifactIdversion1.13.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_2.12/artifactIdversion1.13.0/version/dependency /dependencies将 log4j.properties 文件添加到 resources 里面就能更改打印日志的级别为 error log4j.rootLoggererror, stdout,R log4j.appender.stdoutorg.apache.log4j.ConsoleAppender log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%nlog4j.appender.Rorg.apache.log4j.RollingFileAppender log4j.appender.R.File../log/agent.log log4j.appender.R.MaxFileSize1024KB log4j.appender.R.MaxBackupIndex1log4j.appender.R.layoutorg.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n在 java 文件夹下创建包名为 com.atguigu.flink Flink 生产者 在 com.atguigu.flink 包下创建 java 类FlinkKafkaProducer1系统也有一个 FlinkKafkaProducer会重名所以这里命名为 1。 import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig;import java.util.ArrayList; import java.util.Properties;public class FlinkKafkaProducer1 {public static void main(String[] args) throws Exception {// 0 初始化flink环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3); // 3个槽 对应kafka主题题的3个分区// 1 准备数据源 读取集合中数据ArrayListString wordsList new ArrayList();wordsList.add(hello);wordsList.add(atguigu);DataStreamString stream env.fromCollection(wordsList);// 2 kafka生产者配置信息Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// 3 创建kafka生产者FlinkKafkaProducerString kafkaProducer new FlinkKafkaProducer(first,new SimpleStringSchema(), // 序列化和反序列化模板类 string类型properties);// 4 生产者和flink流关联stream.addSink(kafkaProducer);// 5 执行env.execute();} }启动Kafka消费者 [atguiguhadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first执行 FlinkKafkaProducer1 程序观察 kafka 消费者控制台情况 Q 为什么先接收到 atguigu然后才是 hello 呢 A 在 Flink 中对于并行度大于 1 的情况不同的算子实例是并行运行的也就是说当你的 env.setParallelism(3) 时会有 3 个线程同时运行。在你的例子中hello 和 atguigu 可能由不同的线程处理并且处理的顺序是不确定的。如果你希望严格按照顺序处理你可以将并行度设置为 1即 env.setParallelism(1)。但是这样可能会影响处理速度。此外Flink 也提供了一些方法来保证在并行处理时的顺序可以查阅相关资料来了解更多。 Flink 消费者 在 com.atguigu.flink 包下创建 java 类FlinkKafkaConsumer1 import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;public class FlinkKafkaConsumer1 {public static void main(String[] args) throws Exception {// 0 初始化flink环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 1 kafka消费者配置信息Properties properties new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// group.id可选不配置不会报错// 2 创建kafka消费者FlinkKafkaConsumerString kafkaConsumer new FlinkKafkaConsumer(first,new SimpleStringSchema(),properties);// 3 消费者和flink流关联env.addSource(kafkaConsumer).print();// 4 执行env.execute();} }启动 FlinkKafkaConsumer1 消费者 启动 kafka 生产者 [atguiguhadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first观察 IDEA 控制台数据打印 有 3 个消费者并行消费因为只发了两条消息所以这里只有 1 和 3。 Kafka 集成 SpringBoot SpringBoot 是一个在 JavaEE 开发中非常常用的组件。可以用于 Kafka 的生产者也可以用于 SpringBoot 的消费者。 跟之前不太一样的是外部数据是通过接口的方式发送到 SpringBoot 程序然后 SpringBoot 接收到这个接口的数据然后再发送到 kafka 集群。 SpringBoot 环境准备 在 IDEA 中安装 lombok 插件 在 Plugins 下搜索 lombok 然后在线安装即可安装后注意重启 创建一个 Spring Initializr 注意有时候SpringBoot官方脚手架不稳定我们切换国内地址https://start.aliyun.com 项目名称 springboot 添加项目依赖 检查自动生成的配置文件 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.6.1/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.atguigu/groupIdartifactIdspringboot/artifactIdversion0.0.1-SNAPSHOT/versionnamespringboot/namedescriptionDemo project for Spring Boot/descriptionpropertiesjava.version1.8/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka-test/artifactIdscopetest/scope/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdconfigurationexcludesexcludegroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/exclude/excludes/configuration/plugin/plugins/build/projectSpringBoot 生产者 修改 SpringBoot 核心配置文件 application.propeties添加生产者相关信息 # 应用名称 spring.application.nameatguigu_springboot_kafka# 指定kafka的地址 spring.kafka.bootstrap-servershadoop102:9092,hadoop103:9092,hadoop104:9092# 指定key和value的序列化器 spring.kafka.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer创建 controller 从浏览器接收数据并写入指定的 topic import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;RestController public class ProducerController {// Kafka模板用来向kafka发送数据AutowiredKafkaTemplateString, String kafka;RequestMapping(/atguigu)public String data(String msg) {kafka.send(first, msg);return ok;} }在浏览器中给 /atguigu 接口发送数据 http://localhost:8080/atguigu?msghello kafka 消费者接收到数据 SpringBoot 消费者 修改 SpringBoot 核心配置文件 application.propeties # 消费者配置开始 # 指定kafka的地址 spring.kafka.bootstrap-servershadoop102:9092,hadoop103:9092,hadoop104:9092# 指定key和value的反序列化器 spring.kafka.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializer# 指定消费者组的group_id spring.kafka.consumer.group-idatguigu # 消费者配置结束创建类消费 Kafka 中指定 topic 的数据 import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListener;Configuration public class KafkaConsumer {// 指定要监听的topicKafkaListener(topics first)public void consumeTopic(String msg) { // 参数: 收到的valueSystem.out.println(收到的信息: msg);} }向 first 主题发送数据 [atguiguhadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic firstatguiguSpringBoot 消费者接收到数据 Kafka 集成 Spark Spark 是一个在大数据开发中非常常用的组件可以用于 Kafka 的生产者也可以用于 Kafka 的消费者。 Spark 环境准备 Scala 环境准备 参考P73 尚硅谷 Kafka 集成 Spark 生产者 Spark 的底层源码是用 Scala 编写的。 创建一个 maven 项目 spark-kafka 在项目 spark-kafka 上点击右键Add Framework Support 勾选 scala 在 main 下创建 scala 文件夹并右键 Mark Directory as Sources Root 在 scala 下创建包名为 com.atguigu.spark 添加配置文件 dependenciesdependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-10_2.12/artifactIdversion3.0.0/version/dependency /dependencies将 log4j.properties 文件添加到 resources 里面就能更改打印日志的级别为 error log4j.rootLoggererror, stdout,R log4j.appender.stdoutorg.apache.log4j.ConsoleAppender log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%nlog4j.appender.Rorg.apache.log4j.RollingFileAppender log4j.appender.R.File../log/agent.log log4j.appender.R.MaxFileSize1024KB log4j.appender.R.MaxBackupIndex1log4j.appender.R.layoutorg.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%nSpark 生产者 在 com.atguigu.spark 包下创建 scala ObjectSparkKafkaProducer import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}object SparkKafkaProducer {def main(args: Array[String]): Unit {// 0 kafka配置信息val properties new Properties()properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092,hadoop103:9092,hadoop104:9092)properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])// 1 创建kafka生产者var producer new KafkaProducer[String, String](properties)// 2 发送数据for (i - 1 to 5) {producer.send(new ProducerRecord[String, String](first, atguigu i))}// 3 关闭资源producer.close()} }启动 Kafka 消费者 [atguiguhadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first执行 SparkKafkaProducer 程序观察 kafka 消费者控制台情况 Spark 消费者 添加配置文件 dependenciesdependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-10_2.12/artifactIdversion3.0.0/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.12/artifactIdversion3.0.0/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.12/artifactIdversion3.0.0/version/dependency /dependencies在 com.atguigu.spark 包下创建 scala ObjectSparkKafkaConsumer import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}object SparkKafkaConsumer {def main(args: Array[String]): Unit {// 1.创建SparkConfval sparkConf: SparkConf new SparkConf().setAppName(sparkstreaming).setMaster(local[*])// 2.创建StreamingContext 初始化上下文环境// Seconds(3)时间窗口批处理间隔表示每隔3秒钟Spark Streaming就会收集一次数据进行处理。val ssc new StreamingContext(sparkConf, Seconds(3))// 3.定义Kafka参数kafka集群地址、消费者组名称、key序列化、value序列化val kafkaPara: Map[String, Object] Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - hadoop102:9092,hadoop103:9092,hadoop104:9092,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG - classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG - classOf[StringDeserializer],ConsumerConfig.GROUP_ID_CONFIG - atguiguGroup)// 4.读取Kafka数据创建DStreamval kafkaDStream: InputDStream[ConsumerRecord[String, String]] KafkaUtils.createDirectStream[String, String](ssc, // 上下文环境LocationStrategies.PreferConsistent, // 数据存储位置 优先位置ConsumerStrategies.Subscribe[String, String](Set(first), kafkaPara) // 消费策略订阅多个主题配置参数 )// 5.将每条消息的KV取出val valueDStream: DStream[String] kafkaDStream.map(record record.value())// 6.计算WordCountvalueDStream.print()// 7.开启任务 并阻塞使程序一直执行ssc.start()ssc.awaitTermination()} }启动 SparkKafkaConsumer 消费者 启动 kafka 生产者 [atguiguhadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first观察IDEA控制台数据打印 笔记整理自b站尚硅谷视频教程【尚硅谷】Kafka3.x教程从入门到调优深入全面
http://www.hkea.cn/news/14573161/

相关文章:

  • 购物网站两化融合建设项目报告游戏是怎么做的视频网站
  • 国外用python做的网站网站的前端开发
  • 中山网站建设开发备案的网站换空间
  • 网站开发培训成都顺义哪里有做网站设计的
  • 谷歌生成在线网站地图wordpress激活邮件
  • 免费建网站平台访问国外网站速度慢
  • 台州企业网站搭建价格网站建设和注册
  • 东莞市专注网站建设品牌网页设计与制作课程教学痛点
  • 杭州高端网站设计公司互联网外包公司有哪些
  • 百度生成手机网站东莞微信网站建设信息
  • 网站权限配置如何制作简易个人网站
  • 网站问卷调查怎么做静态网站flash
  • 甘肃省省建设厅网站有创意的设计工作室名字
  • 上海好的高端网站建设大学做视频网站设计
  • 自建网站多少钱慈溪企业网站
  • 郑州建站地方同城网站开发
  • 免费网站正能量app应用大全重庆微信网站建设价格
  • 网站管理后台地址怎么查询南京关键词seo公司
  • 租房网站开发wordpress issingle
  • 网站建设、微信小程序、修改wordpress图标
  • 营销网站的例子互动网站建设多少钱
  • 湘潭做网站找磐石网络一流郑州百度推广托管
  • 网站建设相关费用2023年中国进入一级战备状态了吗
  • 哪个网站做二手叉车回收好js+下载服务器wordpress
  • 匿名聊天网站怎么做腾讯广告建站工具
  • 个人网站建设知乎做翻译兼职的网站
  • 成都网站建设低价wordpress媒体库只有2m
  • 资源站 wordpresswordpress修改主页模板
  • 贵南县公司网站建设2017网络公司排名
  • 河北建站公司哪里的网站建设