当前位置: 首页 > news >正文

廊坊建设企业网站wordpress搜索根据范围

廊坊建设企业网站,wordpress搜索根据范围,微信开放平台账号怎么注销,毕业设计代做网站都可信么目录 消息队列简介消息队列的应用场景异步处理系统解耦流量削峰日志处理 消息队列的两种模式点对点模式发布订阅模式 Kafka简介及应用场景Kafka比较其他MQ的优势Kafka目录结构搭建Kafka集群编写Kafka一键启动/关闭脚本 Kafka基础操作创建topic生产消息到Kafka从Kafka消费消息使… 目录 消息队列简介消息队列的应用场景异步处理系统解耦流量削峰日志处理 消息队列的两种模式点对点模式发布订阅模式 Kafka简介及应用场景Kafka比较其他MQ的优势Kafka目录结构搭建Kafka集群编写Kafka一键启动/关闭脚本 Kafka基础操作创建topic生产消息到Kafka从Kafka消费消息使用 Kafka Tools 操作Kafka带Security连接Kafka Tool Java编程操作Kafka同步生产消息到Kafka中使用同步等待的方式发送消息异步使用带有回调函数方法生产消息 从Kafka的topic中消费消息 Kafka 重要概念brokerZookeeperproducer生产者consumer消费者consumer group消费者组主题Topic分区Partitions副本Replicas偏移量offset消费者组 Kafka生产者幂等性幂等性原理 Kafka 事务事务操作APIKafka事务编程事务相关属性配置Kafka事务编程案例 消息队列简介 消息队列经常缩写为MQ。从字面上来理解消息队列是一种用来存储消息的队列。例如Java中的队列 // 1. 创建一个保存字符串的队列 QueueString stringQueue new LinkedListString(); // 2. 往消息队列中放入消息 stringQueue.offer(message); // 3. 从消息队列中取出消息并打印 System.out.println(stringQueue.poll());上述代码创建了一个队列先往队列中添加了一个消息然后又从队列中取出了一个消息。这说明了队列是可以用来存取消息的。我们可以简单理解消息队列就是将需要传输的数据存放在队列中。   消息队列中间件就是用来存储消息的软件组件。消息队列有很多例如Kafka、RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ 等。 消息队列的应用场景 异步处理 例如在电商网站中新的用户注册时需要将用户的信息保存到数据库中同时还需要额外发送注册的邮件通知、以及短信注册码给用户。   但因为发送邮件、发送注册短信需要连接外部的服务器需要额外等待一段时间此时就可以使用消息队列来进行异步处理从而实现快速响应。 系统解耦 流量削峰 日志处理 大型电商网站淘宝、京东、国美、苏宁…、App抖音、美团、滴滴等等需要分析用户行为要根据用户的访问行为来发现用户的喜好以及活跃情况需要在页面上收集大量的用户访问信息。 消息队列的两种模式 点对点模式 消息发送者生产消息发送到消息队列中然后消息接收者从消息队列中取出并且消费消息。消息被消费以后消息队列中不再有存储所以消息接收者不可能消费到已经被消费的消息。 特点 每个消息只有一个接收者Consumer(即一旦被消费消息就不再在消息队列中)发送者和接收者间没有依赖性发送者发送消息之后不管有没有接收者在运行都不会影响到发送者下次发送消息接收者在成功接收消息之后需向队列应答成功以便消息队列删除当前接收的消息 发布订阅模式 特点 每个消息可以有多个订阅者发布者和订阅者之间有时间上的依赖性。针对某个主题Topic的订阅者它必须创建一个订阅者之后才能消费发布者的消息。为了消费消息订阅者需要提前订阅该角色主题并保持在线运行 Kafka简介及应用场景 Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力 发布和订阅流数据流类似于消息队列或者是企业消息传递系统以容错的持久化方式存储数据流处理数据流 通常将Apache Kafka用在两类程序 建立实时数据管道以可靠地在系统或应用程序之间获取数据 构建实时流应用程序以转换或响应数据流 Producers可以有很多的应用程序将消息数据放入到Kafka集群中。 Consumers可以有很多的应用程序将消息数据从Kafka集群中拉取出来。 ConnectorsKafka的连接器可以将数据库中的数据导入到Kafka也可以将Kafka的数据导出到数据库中。 Stream Processors流处理器可以Kafka中拉取数据也可以将数据写入到Kafka中。 Kafka比较其他MQ的优势 特性ActiveMQRabbitMQKafkaRocketMQ所属社区/公司ApacheMozilla Public LicenseApacheApache/Ali成熟度成熟成熟成熟比较成熟生产者-消费者模式支持支持支持支持发布-订阅支持支持支持支持REQUEST-REPLY支持支持-支持API完备性高高高低静态配置多语言支持支持JAVA优先语言无关支持JAVA优先支持单机呑吐量万级最差万级十万级十万级最高消息延迟-微秒级毫秒级-可用性高主从高主从非常高分布式高消息丢失-低理论上不会丢失-消息重复-可控制理论上会有重复-事务支持不支持支持支持文档的完备性高高高中提供快速入门有有有无首次部署难度-低中高 Kafka目录结构 使用的Kafka版本为2.4.1。 目录名称说明binKafka的所有执行脚本都在这里。例如启动Kafka服务器、创建Topic、生产者、消费者程序等等configKafka的所有配置文件libs运行Kafka所需要的所有JAR包logsKafka的所有日志文件如果Kafka出现一些问题需要到该目录中去查看异常信息site-docsKafka的网站帮助文件 搭建Kafka集群 使用的Kafka版本为2.4.1是2020年3月12日发布的版本。 注Kafka 的版本号为kafka_2.12-2.4.1因为Kafka 主要是使用scala语言开发的2.12为scala 的版本号。 创建并解压 sudo mkdir export cd /export sudo mkdir server sudo mkdir software sudo chmod 777 software/ sudo chmod 777 server/ cd /export/software/ tar -xvzf kafka_2.12-2.4.1.tgz -C ../server/修改 server.properties # 创建Kafka数据的位置 mkdir /export/server/kafka_2.12-2.4.1/data vim /export/server/kafka_2.12-2.4.1/config/server.properties # 指定broker的id broker.id0 # 指定Kafka数据的位置 log.dirs/export/server/kafka_2.12-2.4.1/data # 配置zk的三个节点 zookeeper.connect10.211.55.8:2181,10.211.55.9:2181,10.211.55.7:2181其余两台服务器重复以上步骤,仅修改 broker.id 为不同。 配置KAFKA_HOME环境变量 sudo su vim /etc/profile export KAFKA_HOME/export/server/kafka_2.12-2.4.1 export PATH:$PATH:${KAFKA_HOME} #源文件无下面这条需手动添加 export PATH每个节点加载环境变量 source /etc/profile启动服务器 # 启动ZooKeeper # 启动Kafka,需要在kafka根目录下启动 cd /export/server/kafka_2.12-2.4.1nohup bin/kafka-server-start.sh config/server.properties # 测试Kafka集群是否启动成功 bin/kafka-topics.sh --bootstrap-server 10.211.55.8:9092 --list # 无报错打印为空编写Kafka一键启动/关闭脚本 为了方便将来进行一键启动、关闭Kafka可以编写一个shell脚本来操作只要执行一次该脚本就可以快速启动或关闭Kafka。 准备 slave 配置文件用于保存要启动哪几个节点上的kafka # 创建 /export/onekey 目录 sudo mkdir onekeycd /export/onekey sudo su #新建slave文件 touch slave#slave中写入以下内容 10.211.55.8 10.211.55.9 10.211.55.7编写start-kafka.sh脚本 vim start-kafka.shcat /export/onekey/slave | while read line do {echo $linessh $line source /etc/profile;export JMX_PORT9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties /dev/nul* 21 wait } done编写stop-kafka.sh脚本 vim stop-kafka.shcat /export/onekey/slave | while read line do {echo $linessh $line source /etc/profile;jps |grep Kafka |cut -d -f1 |xargs kill -s 9wait } done给start-kafka.sh、stop-kafka.sh配置执行权限 chmod ux start-kafka.sh chmod ux stop-kafka.sh# 执行一键启动、一键关闭注执行shell脚本需实现服务器间ssh免密登录 ./start-kafka.sh ./stop-kafka.sh# 当查看日志发生Error connecting to node ubuntu2:9092错误时需在三台服务器上配置如下命令以ubuntu2为例,另外两台同样的规则配置 # sudo vim /etc/hosts # 10.211.55.8 ubuntu1 # 10.211.55.7 ubuntu3Kafka基础操作 创建topic 创建一个topic主题。Kafka中所有的消息都是保存在主题中要生产消息到Kafka首先必须要有一个确定的主题。 # 创建名为test的主题 bin/kafka-topics.sh --create --bootstrap-server 10.211.55.8:9092 --topic test # 查看目前Kafka中的主题 bin/kafka-topics.sh --list --bootstrap-server 10.211.55.8:9092 # 成功打印出 test生产消息到Kafka 使用Kafka内置的测试程序生产一些消息到Kafka的test主题中。 bin/kafka-console-producer.sh --broker-list 10.211.55.8:9092 --topic test # “”表示等待输入从Kafka消费消息 再开一个窗口 # 使用消费 test 主题中的消息。 bin/kafka-console-consumer.sh --bootstrap-server 10.211.55.8:9092 --topic test --from-beginning# 实现了生产者发送消息消费者接受消息使用 Kafka Tools 操作Kafka 带Security连接Kafka Tool Java编程操作Kafka 导入Maven Kafka pom.xml 依赖 repositories!-- 代码库 --repositoryidcentral/idurlhttp://maven.aliyun.com/nexus/content/groups/public///urlreleasesenabledtrue/enabled/releasessnapshotsenabledtrue/enabledupdatePolicyalways/updatePolicychecksumPolicyfail/checksumPolicy/snapshots/repository /repositoriesdependencies!-- kafka客户端工具 --dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.4.1/version/dependency!-- 工具类 --dependencygroupIdorg.apache.commons/groupIdartifactIdcommons-io/artifactIdversion1.3.2/version/dependency!-- SLF桥接LOG4J日志 --dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.6/version/dependency!-- SLOG4J日志 --dependencygroupIdlog4j/groupIdartifactIdlog4j/artifactIdversion1.2.16/version/dependency /dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.7.0/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/plugin/plugins /build log4j.properties放入到resources文件夹中 log4j.rootLoggerINFO,stdout log4j.appender.stdoutorg.apache.log4j.ConsoleAppender log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern%5p - %m%n同步生产消息到Kafka中 创建用于连接Kafka的Properties配置 Properties props new Properties(); //这个配置是 Kafka 生产者和消费者必须要指定的一个配置项它用于指定 Kafka 集群中的一个或多个 broker 地址生产者和消费者将使用这些地址与 Kafka 集群建立连接。 props.put(bootstrap.servers, 192.168.88.100:9092); //这行代码将 acks 配置设置为 all。acks 配置用于指定消息确认的级别。在此配置下生产者将等待所有副本都成功写入后才会认为消息发送成功。这种配置级别可以确保数据不会丢失但可能会影响性能。 props.put(acks, all); //这行代码将键key序列化器的类名设置为 org.apache.kafka.common.serialization.StringSerializer。键和值都需要被序列化以便于在网络上传输。这里使用的是一个字符串序列化器它将字符串序列化为字节数组以便于发送到 Kafka 集群。 props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); //这行代码将值value序列化器的类名设置为 org.apache.kafka.common.serialization.StringSerializer。这里同样使用的是一个字符串序列化器它将字符串序列化为字节数组以便于发送到 Kafka 集群。 props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);创建一个生产者对象 KafkaProducer调用 send 发送1-100消息到指定Topic test并获取返回值Future该对象封装了返回值再调用一个Future.get() 方法等待响应关闭生产者 使用同步等待的方式发送消息 import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;/*** Kafka的生产者程序会将消息创建出来并发送到Kafka集群中* 1. 创建用于连接Kafka的Properties配置* 2. 创建一个生产者对象KafkaProducer* 3. 调用send发送1-100消息到指定Topic test并获取返回值Future该对象封装了返回值* 4. 再调用一个Future.get()方法等待响应* 5. 关闭生产者*/ public class KafkaProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {// 创建用于连接Kafka的Properties配置Properties props new Properties();props.put(bootstrap.servers, 172.xx.xx.1x8:9092);props.put(acks, all);props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(security.protocol, SASL_PLAINTEXT);props.put(sasl.mechanism, PLAIN);props.put(sasl.jaas.config, org.apache.kafka.common.security.plain.PlainLoginModule required username\xxxx\ password\xxxx\;);// 实现生产者的幂等性props.put(enable.idempotence,true);// 创建一个生产者对象KafkaProducerKafkaProducerString, String kafkaProducer new KafkaProducer(props);// 发送1-100的消息到指定的topic中for (int i 0; i 100; i) {// 一、使用同步等待的方式发送消息// 构建一条消息直接new ProducerRecord//test这个参数是指定 Kafka 主题topic的名称表示这条记录将被发送到哪个主题中。// null这个参数表示记录的键key。在 Kafka 中每条消息都可以有一个键值对键是一个可选参数如果没有设置则为 null。//i 这个参数表示记录的值value。这里的 i 是一个整数通过将它转换为字符串来设置记录的值。这个值将被序列化为字节数组并被发送到 Kafka 集群。ProducerRecordString, String producerRecord new ProducerRecord(test, null, i );FutureRecordMetadata future kafkaProducer.send(producerRecord);// 调用Future的get方法等待响应future.get();System.out.println(第 i 条消息写入成功);}// 关闭生产者kafkaProducer.close();} }异步使用带有回调函数方法生产消息 如果想获取生产者消息是否成功或者成功生产消息到 Kafka 中后执行一些其他动作。此时可以很方便地使用带有回调函数来发送消息。 在发送消息出现异常时能够及时打印出异常信息在发送消息成功时打印 Kafka 的 topic 名字、分区id、offset import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;/*** Kafka的生产者程序会将消息创建出来并发送到Kafka集群中* 1. 创建用于连接Kafka的Properties配置* 2. 创建一个生产者对象KafkaProducer* 3. 调用send发送1-100消息到指定Topic test并获取返回值Future该对象封装了返回值* 4. 再调用一个Future.get()方法等待响应* 5. 关闭生产者*/ public class KafkaProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {// 创建用于连接Kafka的Properties配置Properties props new Properties();props.put(bootstrap.servers, 172.16.4.158:9092);props.put(acks, all);props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(security.protocol, SASL_PLAINTEXT);props.put(sasl.mechanism, PLAIN);props.put(sasl.jaas.config, org.apache.kafka.common.security.plain.PlainLoginModule required username\admin\ password\admin\;);//实现生产者的幂等性props.put(enable.idempotence,true);// 创建一个生产者对象KafkaProducerKafkaProducerString, String kafkaProducer new KafkaProducer(props);// 发送1-100的消息到指定的topic中for (int i 0; i 100; i) {// 二、使用异步回调的方式发送消息ProducerRecordString, String producerRecord new ProducerRecord(test, null, i );//使用匿名内部类实现Callback接口该接口中表示Kafka服务器响应给客户端会自动调用onCompletion方法//metadata消息的元数据属于哪个topic、属于哪个partition、对应的offset是什么//exception这个对象Kafka生产消息封装了出现的异常如果为null表示发送成功如果不为null表示出现异常。kafkaProducer.send(producerRecord, new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {// 1. 判断发送消息是否成功if(exception null) {// 发送成功// 主题String topic metadata.topic();// 分区idint partition metadata.partition();// 偏移量long offset metadata.offset();System.out.println(topic: topic 分区id partition 偏移量 offset);}else {// 发送出现错误System.out.println(生产消息出现异常);// 打印异常消息System.out.println(exception.getMessage());// 打印调用栈System.out.println(exception.getStackTrace());}}});}// 4.关闭生产者kafkaProducer.close();} }从Kafka的topic中消费消息 从 test topic中将消息都消费并将记录的offset、key、value打印出来。 创建Kafka消费者配置 Properties props new Properties(); //这一行将属性bootstrap.servers的值设置为node1.itcast.cn:9092。这是Kafka生产者和消费者所需的Kafka集群地址和端口号。 props.setProperty(bootstrap.servers, node1.itcast.cn:9092); //这一行将属性group.id的值设置为test。这是消费者组的唯一标识符。所有属于同一组的消费者将共享一个消费者组ID。 props.setProperty(group.id, test); //这一行将属性enable.auto.commit的值设置为true。这表示消费者是否应该自动提交偏移量。 props.setProperty(enable.auto.commit, true); //这一行将属性auto.commit.interval.ms的值设置为1000。这是消费者自动提交偏移量的时间间隔以毫秒为单位。 props.setProperty(auto.commit.interval.ms, 1000); //这两行将属性key.deserializer和value.deserializer的值都设置为org.apache.kafka.common.serialization.StringDeserializer。这是用于反序列化Kafka消息的Java类的名称。在这种情况下消息的键和值都是字符串类型因此使用了StringDeserializer类来反序列化它们。 props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer); props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);创建Kafka消费者订阅要消费的主题使用一个while循环不断从Kafka的topic中拉取消息将将记录record的offset、key、value都打印出来 import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties;/*** 消费者程序* 1.创建Kafka消费者配置* 2.创建Kafka消费者* 3.订阅要消费的主题* 4.使用一个while循环不断从Kafka的topic中拉取消息* 5.将将记录record的offset、key、value都打印出来*/ public class KafkaConsumerTest {public static void main(String[] args) throws InterruptedException {// 创建Kafka消费者配置Properties props new Properties();props.setProperty(bootstrap.servers, 172.16.4.158:9092);props.setProperty(group.id, test);props.setProperty(enable.auto.commit, true);props.setProperty(auto.commit.interval.ms, 1000);props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(security.protocol, SASL_PLAINTEXT);props.put(sasl.mechanism, PLAIN);props.put(sasl.jaas.config, org.apache.kafka.common.security.plain.PlainLoginModule required username\xxxx\ password\xxxx\;);// 创建Kafka消费者KafkaConsumerString, String kafkaConsumer new KafkaConsumer(props);// 订阅要消费的主题// 指定消费者从哪个topic中拉取数据kafkaConsumer.subscribe(Arrays.asList(test));// 使用一个while循环不断从Kafka的topic中拉取消息while(true) {// Kafka的消费者一次拉取一批的数据ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(5));// 将将记录record的offset、key、value都打印出来for (ConsumerRecordString, String consumerRecord : consumerRecords) {// 主题String topic consumerRecord.topic();// offset这条消息处于Kafka分区中的哪个位置long offset consumerRecord.offset();// key和valueString key consumerRecord.key();String value consumerRecord.value();System.out.println(topic: topic offset: offset key: key value: value);}}} }Kafka 重要概念 broker 一个Kafka的集群通常由多个broker组成这样才能实现负载均衡、以及容错。broker是无状态Sateless的它们是通过ZooKeeper来维护集群状态。一个Kafka的broker每秒可以处理数十万次读写每个broker都可以处理TB消息而不影响性能。 Zookeeper ZK 用来管理和协调 broker并且存储了 Kafka 的元数据例如有多少topic、partition、consumer。ZK 服务主要用于通知生产者和消费者 Kafka 集群中有新的 broker 加入、或者 Kafka 集群中出现故障的 broker。 注Kafka正在逐步想办法将 ZooKeeper 剥离维护两套集群成本较高社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据。 Kafka Tool 可以查看ZooKeeper配置 producer生产者 生产者负责将数据推送给broker的 topic consumer消费者 消费者负责从broker的 topic 中拉取数据并自己进行处理 consumer group消费者组 consumer group是 Kafka 提供的可扩展且具有容错性的消费者机制。一个消费者组可以包含多个消费者。一个消费者组有一个唯一的IDgroup Id。组内的消费者一起消费主题的所有分区数据。 主题Topic 主题是一个逻辑概念用于生产者发布数据消费者拉取数据。Kafka 中的主题必须要有标识符而且是唯一的Kafka中可以有任意数量的主题没有数量上的限制。在主题中的消息是有结构的一般一个主题包含某一类消息。一旦生产者发送消息到主题中这些消息就不能被更新更改 分区Partitions 在Kafka集群中主题被分为多个分区。在 Kafka 中同一个 topic 的消息可以被分配到不同的分区中具体分配规则取决于 partitioner。 Kafka 提供了默认的 partitioner 实现称为 DefaultPartitioner其将消息的 key如果存在进行哈希然后根据哈希值确定该消息应该被分配到哪个分区。如果消息没有 key则采用轮询的方式将消息分配到不同的分区中。 除了默认的 partitioner用户还可以自定义 partitioner 实现以满足不同的需求。自定义 partitioner 实现需要实现 Kafka 提供的 Partitioner 接口并在生产者配置中指定使用该 partitioner。   无论是使用默认的 partitioner 还是自定义 partitioner都需要遵循以下规则 对于同一个 key始终分配到同一个分区中。对于没有 key 的消息应该采用随机或轮询的方式将消息分配到不同的分区中。 需要注意的是分区数的变化也可能导致消息分配到不同的分区中。例如当某个 topic 的分区数发生变化时之前已经写入的消息可能会被重新分配到不同的分区中。因此在生产者代码中应该谨慎处理分区数的变化以避免数据丢失或重复。 副本Replicas 副本可以确保某个服务器出现故障时确保数据依然可用。在Kafka中一般都会设计副本的个数1。 偏移量offset offset 记录着下一条将要发送给 Consumer 的消息的序号。默认 Kafka 将 offset 存储在ZooKeeper 中。在一个分区中消息是有顺序的方式存储着每个在分区的消费都是有一个递增的id。这个就是偏移量offset。偏移量在分区中才是有意义的。在分区之间offset 是没有任何意义的。 消费者组 Kafka 支持有多个消费者同时消费一个主题中的数据。启动两个消费者共同来消费 test 主题的数据。 修改生产者程序让生产者不停地每3秒生产1-100个数字 // 发送1-100数字到Kafka的test主题中 while(true) {for (int i 1; i 100; i) {// 注意send方法是一个异步方法它会将要发送的数据放入到一个buffer中然后立即返回// 这样可以让消息发送变得更高效producer.send(new ProducerRecord(test, i ));}Thread.sleep(3000); }同时运行两个消费者 可以发现只有一个消费者程序能够拉取到消息。想要让两个消费者同时消费消息必须要给 test 主题添加一个分区。 # 设置 test topic为2个分区 bin/kafka-topics.sh --zookeeper 10.211.55.8:2181 -alter --partitions 2 --topic test重新运行生产者、两个消费者程序就可以看到两个消费者都可以消费Kafka Topic的数据了。 Kafka生产者幂等性 拿http举例来说一次或多次请求得到地响应是一致的网络超时等问题除外换句话说就是执行多次操作与执行一次操作的影响是一样的。如果某个系统是不具备幂等性的如果用户重复提交了某个表格就可能会造成不良影响。例如用户在浏览器上点击了多次提交订单按钮会在后台生成多个一模一样的订单。 Kafka生产者幂等性在生产者生产消息时如果出现 retry 时有可能会一条消息被发送了多次如果 Kafka 不具备幂等性的就有可能会在 partition 中保存多条一模一样的消息。 //配置幂等性 props.put(enable.idempotence,true);幂等性原理 为了实现生产者的幂等性Kafka引入了 Producer IDPID和 Sequence Number的概念。 PID每个Producer在初始化时都会分配一个唯一的PID这个PID对用户来说是透明的。Sequence Number针对每个生产者对应PID发送到指定主题分区的消息都对应一个从0开始递增的 Sequence Number。 Kafka 事务 Kafka事务是2017年Kafka 0.11.0.0引入的新特性。   类似于数据库的事务。Kafka事务指的是 生产者生产消息 以及消费者提交offset 的操作可以在一个原子操作中要么都成功要么都失败。尤其是在生产者、消费者并存时事务的保障尤其重要。consumer-transform-producer模式 事务操作API Producer接口中定义了以下5个事务相关方法 initTransactions初始化事务要使用Kafka事务必须先进行初始化操作。beginTransaction开始事务启动一个Kafka事务。sendOffsetsToTransaction提交偏移量批量地将分区对应的offset发送到事务中方便后续一块提交。commitTransaction提交事务提交事务。abortTransaction放弃事务取消事务。 Kafka事务编程 事务相关属性配置 生产者 // 配置事务的id开启了事务会默认开启幂等性 props.put(transactional.id, first-transactional);消费者 // 消费者需要设置隔离级别 props.put(isolation.level,read_committed); // 关闭自动提交开启事务的不能开启offset自动提交假设每秒提交一次offset不受事务控制 props.put(enable.auto.commit, false);Kafka事务编程案例 需求在Kafka的 topic 【ods_user】中有一些用户数据数据格式如下 姓名,性别,出生日期 张三,1,1980-10-09 李四,0,1985-11-01需要编写程序将用户的性别转换为 男、女1-男0-女转换后将数据写入到topic 【dwd_user】中。 要求使用事务保障要么消费了数据同时写入数据到 topic提交offset。要么全部失败。 启动生产者控制台程序模拟数据 # 创建名为ods_user和dwd_user的主题 bin/kafka-topics.sh --create --bootstrap-server 10.211.55.8:9092 --topic ods_user bin/kafka-topics.sh --create --bootstrap-server 10.211.55.8:9092 --topic dwd_user# 窗口一生产数据到 ods_user bin/kafka-console-producer.sh --broker-list 10.211.55.8:9092 --topic ods_user# 窗口二从 dwd_user 消费数据 bin/kafka-console-consumer.sh --bootstrap-server 10.211.55.8:9092 --topic dwd_user --from-beginning --isolation-level read_committed创建消费者代码 createConsumer方法该方法中返回一个消费者订阅【ods_user】主题。注意需要配置事务隔离级别、关闭自动提交。 //创建消费者public static ConsumerString, String createConsumer() {// 1. 创建Kafka消费者配置Properties props new Properties();props.setProperty(bootstrap.servers, 10.211.55.8:9092);props.setProperty(group.id, ods_user);props.put(isolation.level,read_committed);props.setProperty(enable.auto.commit, false);props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);// 2. 创建Kafka消费者KafkaConsumerString, String consumer new KafkaConsumer(props);// 3. 订阅要消费的主题consumer.subscribe(Arrays.asList(ods_user));return consumer; }编写创建生产者代码 createProducer方法返回一个生产者对象。注意需要配置事务的id开启了事务会默认开启幂等性。 注如果使用了事务不要使用异步发送 //创建生产者public static ProducerString, String createProduceer() {// 1. 创建生产者配置Properties props new Properties();props.put(bootstrap.servers, 10.211.55.8:9092);props.put(transactional.id, dwd_user);props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);// 2. 创建生产者ProducerString, String producer new KafkaProducer(props);return producer;}编写代码消费并生产数据 步骤 调用之前实现的方法创建消费者、生产者对象 。生产者调用 initTransactions 初始化事务。编写一个while死循环在while循环中不断拉取数据进行处理后再写入到指定的topic。 while循环中 生产者开启事务消费者拉取消息遍历拉取到的消息并进行预处理将1转换为男0转换为女生产消息到 topic【dwd_user】中提交偏移量到事务中提交事务捕获异常如果出现异常则取消事务 public static void main(String[] args) {// 调用之前实现的方法创建消费者、生产者对象ConsumerString, String consumer createConsumer();ProducerString, String producer createProducer();// 初始化事务producer.initTransactions();// 在while死循环中不断拉取数据进行处理后再写入到指定的topicwhile (true) {try {// 1. 开启事务producer.beginTransaction();// 2. 定义Map结构用于保存分区对应的offsetMapTopicPartition, OffsetAndMetadata offsetCommits new HashMap();// 2. 拉取消息ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(2));for (ConsumerRecordString, String record : records) {// 3. 保存偏移量//将当前消息所属分区的偏移量保存到HashMap中并且将偏移量加1以便下次从此偏移量开始消费消息。offsetCommits.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() 1));// 4. 进行转换处理String[] fields record.value().split(,);fields[1] fields[1].equalsIgnoreCase(1) ? 男 : 女;String message fields[0] , fields[1] , fields[2];// 5. 生产消息到dwd_userproducer.send(new ProducerRecord(dwd_user, message));}// 6. 提交偏移量到事务producer.sendOffsetsToTransaction(offsetCommits, ods_user);// 7. 提交事务producer.commitTransaction();}catch (Exception e) {// 8. 放弃事务producer.abortTransaction();}}}将已消费的消息的偏移量提交到生产者的事务中是为了确保在生产者发送消息到新的主题之前已经消费的消息的偏移量已经被记录下来并保存在事务中。   如果不提交偏移量则可能会导致已经消费的消息在下一次启动消费者时重复消费。   因此将偏移量提交到生产者的事务中是非常重要的可以确保消费者在下一次启动时可以正确地从上次停止的位置继续消费。 测试 成功转化并消费 模拟异常测试事务 // 3. 保存偏移量 offsetCommits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() 1));// 4. 进行转换处理 String[] fields record.value().split(,); fields[1] fields[1].equalsIgnoreCase(1) ? 男:女; String message fields[0] , fields[1] , fields[2];// 模拟异常 int i 1/0;// 5. 生产消息到dwd_user producer.send(new ProducerRecord(dwd_user, message));启动程序一次抛出异常。再启动程序一次还是抛出异常。直到我们处理该异常为止。 可以消费到消息但如果中间出现异常的话offset是不会被提交的除非消费、生产消息都成功才会提交事务。
http://www.hkea.cn/news/14276535/

相关文章:

  • 网站做收付款接口网站公司怎么做业务
  • 开发高端网站开发如何给企业做网络推广赚钱
  • 如何做好网站需求分析做钓鱼网站会被抓判刑吗
  • 仿99健康网网站源码长沙做网站湖南微联讯点不错
  • 大连零基础网站建设教学在哪里福州做网站哪家公司好
  • pc端网站转手机站怎么做个人怎么做淘宝客网站吗
  • 网页设计素材网站大全龙岗专业网站建设
  • 做网站需要多少钱 网络服务网页设计与网站建设作业
  • 哪些彩票网站可做代理赚钱乐都营销型网站建设
  • 上海工程建设信息网站北京营销推广公司
  • dede采集规则下载网站网络营销公司简介
  • 西安网站建设seo优化什么网站做的好看的
  • 移动电商网站开发需求做国际贸易的网站
  • 网站的头尾和导航的公用文件电子商务有限公司有哪些
  • 百度网站提交收录怎么样建设网站赚钱
  • 网站如何做微信支付宝支付宝支付接口如何用phpstorm做网站
  • seo网站优化对象网站建设百度云
  • 传统网站怎么做前端模块公司要招个做网站的人
  • 百科网站程序网站建设哪家好就推 鹏博资讯
  • 企业网站优化服务公司团关系转接网站建设
  • 网站的整体风格包括建设银行园区公积金管理中心网站
  • 3万网站建设费会计分录济南酷火网站建设
  • 营销型网站传统网站网络seo优化平台
  • wordpress视频网站模板下载门户网站建设采购
  • 网站的关于我们怎么做东莞市网络营销推广多少钱
  • 县区网站集约化建设做网站怎么和广告公司合作
  • 中国建设银行网站怎么改支付密码忘了怎么办手工建站与模板网站的区别
  • 网站建设参考网站的说明书建筑工程网名大全霸气
  • 如何建立优秀企业网站专门做羽毛球的网站
  • 医院建设网站意义小程序搭建价格