当前位置: 首页 > news >正文

网站未在腾讯云备案网站之家

网站未在腾讯云备案,网站之家,app设计模板网站,网站建设详细方案今天的议题是:如何快速处理kafka的消息积压 通常的做法有以下几种: 增加消费者数增加 topic 的分区数,从而进一步增加消费者数调整消费者参数,如max.poll.records增加硬件资源 常规手段不是本文的讨论重点或者当上面的手段已经使…

今天的议题是:如何快速处理kafka的消息积压

通常的做法有以下几种:

  1. 增加消费者数
  2. 增加 topic 的分区数,从而进一步增加消费者数
  3. 调整消费者参数,如max.poll.records
  4. 增加硬件资源

常规手段不是本文的讨论重点或者当上面的手段已经使用过依然存在很严重的消息积压时该怎么办?本文给出一种增加消费者消费速率的方案。我们知道消息积压往往是因为生产速率远大于消费速率,本文的重点就是通过提高消费速率来解决消息积压。

经验判断,消费速率低下的主要原因往往都是数据处理时间长,业务逻辑复杂最终导致一次 poll 的时间被无限拉长,如果可以通过增加数据处理的线程数来降低一次 poll 的时间那么问题就解决了。但是需要注意一下几点:

  1. 业务逻辑对乱序数据不敏感,因为并行一定会导致乱序问题
  2. kafka 的消费者是线程不安全的
  3. 如何提交 offset

基于上述几点,思路就是消费者 poll 下来一批数据,交给多个线程去并行处理,消费者等待所有线程执行完后提交。为了减少线程的创建与销毁则维护一个线程池。代码如下:

第一步:创建一个MultipleConsumer类用于封装消费者和线程池

public class MultipleConsumer {private final KafkaConsumer<String, String> consumer;private final int threadNum;private final ExecutorService threadPool;private boolean isRunning = true;public MultipleConsumer(Properties properties, List<String> topics, int threadNum) {// 实例化消费者consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(topics);this.threadNum = threadNum;this.threadPool = Executors.newFixedThreadPool(threadNum);}
}

理论上相较于传统的消费速率可以提升 threadNum 倍。

第二步:因为需要并行处理一批 poll 数据,因此需要对数据进行切分,切分逻辑如下

private Map<Integer, List<ConsumerRecord<String, String>>> splitTask(ConsumerRecords<String, String> consumerRecords) {HashMap<Integer, List<ConsumerRecord<String, String>>> tasks = new HashMap<>();for (int i = 0; i < threadNum; i++) {tasks.put(i, new ArrayList<>());}int recordIndex = 0;for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {tasks.get(recordIndex % threadNum).add(consumerRecord);recordIndex++;}return tasks;}

这里采用轮训的方式且切分的个数与 threadNum 一致,尽可能保证每个线程处理的数据数量相差不大

第三步:定义一个静态内部类用来处理数据,并处理同步逻辑(因为需要等待所有线程执行完再提交 offset)

private static class InnerProcess implements Runnable {private final List<ConsumerRecord<String, String>> records;private final CountDownLatch countDownLatch;public InnerProcess(List<ConsumerRecord<String, String>> records, CountDownLatch countDownLatch) {this.records = records;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {// 处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("topic: " + record.topic() + ", partition: " + record.partition() + ", offset: " + record.offset() + ", key: " + record.key() + ", value: " + record.value());TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();} finally {countDownLatch.countDown();}}}

使用 CountDownLatch 实现线程同步逻辑,假设每条数据的业务处理时间为 1 s

第四步:消费者 poll 逻辑

public void start() {while (isRunning) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(5));if (!consumerRecords.isEmpty()) {// 分割任务Map<Integer, List<ConsumerRecord<String, String>>> splitTask = splitTask(consumerRecords);CountDownLatch countDownLatch = new CountDownLatch(threadNum);// 提交任务for (int i = 0; i < threadNum; i++) {threadPool.submit(new InnerProcess(splitTask.get(i), countDownLatch));}// 等待任务执行结束try {countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}// 提交偏移量consumer.commitAsync((map, e) -> {if (e != null) {System.out.println("提交偏移量失败");}});}}}

完整代码如下:

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.*;/*** @author wjun* @date 2023/3/1 14:50* @email wjunjobs@outlook.com* @describe*/
public class MultipleConsumer {private final KafkaConsumer<String, String> consumer;private final int threadNum;private final ExecutorService threadPool;private boolean isRunning = true;public MultipleConsumer(Properties properties, List<String> topics, int threadNum) {// 实例化消费者consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(topics);this.threadNum = threadNum;this.threadPool = Executors.newFixedThreadPool(threadNum);}public void start() {while (isRunning) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(5));if (!consumerRecords.isEmpty()) {// 分割任务Map<Integer, List<ConsumerRecord<String, String>>> splitTask = splitTask(consumerRecords);CountDownLatch countDownLatch = new CountDownLatch(threadNum);// 提交任务for (int i = 0; i < threadNum; i++) {threadPool.submit(new InnerProcess(splitTask.get(i), countDownLatch));}// 等待任务执行结束try {countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}// 提交偏移量consumer.commitAsync((map, e) -> {if (e != null) {System.out.println("提交偏移量失败");}});}}}private Map<Integer, List<ConsumerRecord<String, String>>> splitTask(ConsumerRecords<String, String> consumerRecords) {HashMap<Integer, List<ConsumerRecord<String, String>>> tasks = new HashMap<>();for (int i = 0; i < threadNum; i++) {tasks.put(i, new ArrayList<>());}int recordIndex = 0;for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {tasks.get(recordIndex % threadNum).add(consumerRecord);recordIndex++;}return tasks;}public void stop() {isRunning = false;threadPool.shutdown();}private static class InnerProcess implements Runnable {private final List<ConsumerRecord<String, String>> records;private final CountDownLatch countDownLatch;public InnerProcess(List<ConsumerRecord<String, String>> records, CountDownLatch countDownLatch) {this.records = records;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {// 处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("topic: " + record.topic() + ", partition: " + record.partition() + ", offset: " + record.offset() + ", key: " + record.key() + ", value: " + record.value());TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();} finally {countDownLatch.countDown();}}}
}

测试一下:

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;import java.util.ArrayList;
import java.util.List;
import java.util.Properties;/*** @author wjun* @date 2023/3/1 16:03* @email wjunjobs@outlook.com* @describe*/
public class MultipleConsumerTest {private static final Properties properties = new Properties();private static final List<String> topics = new ArrayList<>();public static void before() {properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "test");properties.put("enable.auto.commit", "false");properties.put("auto.commit.interval.ms", "1000");properties.put("session.timeout.ms", "30000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");topics.add("multiple_demo");}public static void main(String[] args) {new MultipleConsumer(properties, topics, 5).start();}
}

20 条数据的处理事件只需要 4s(threadNume = 5,即缩短 5 倍)

image-20230301172739782

但是此方法的缺点:

  1. 只适用于业务逻辑复杂导致的处理时间长的场景
  2. 对数据乱序不敏感的业务场景
http://www.hkea.cn/news/701156/

相关文章:

  • 开发微信小程序的流程广州seo优化电话
  • 小企业网站建设和管理全能搜
  • 无棣县建设局网站游戏优化大师下载安装
  • 小额贷款 网站模板品牌推广软文
  • 网站建设开发成本天津百度搜索网站排名
  • 做的好的营销型网站有哪些内容外贸网站外链平台
  • 东营网站建设预算价格百度网盘网页版入口官网
  • 网站建设中标公告18款禁用看奶app入口
  • 网站运营人员岗位职责长沙正规seo优化价格
  • cnzz统计代码放在后台网站为什么没显示seo的英文全称是什么
  • 杭州企业网站建设方案广告门
  • 自己免费做网站(二)seo优化公司信
  • 广州外贸b2b网站建设刷钻业务推广网站
  • 做企业网站用什么怎样宣传自己的品牌
  • 濮阳做网站的公司我的百度账号
  • 美食网站开发如何做好网络营销工作
  • 网站建设案例资料今天的新闻内容
  • 台州专业网站建设方案seo软文代写
  • 个人网站 百度推广全球搜索大全
  • 网站消息推送5118素材网站
  • 天津 响应式网站设计企业网站模板免费
  • 网站用花生壳nas做存储百度seo发包工具
  • wordpress cache深圳纯手工seo
  • 怎样找到正规代加工网站百度地图3d实景地图
  • 潍坊网站建设公司网站搭建免费
  • 惠州做网站好的公司下载百度语音导航地图安装
  • 春节网站怎么做小说排行榜百度搜索风云榜
  • 商城服务是什么软件seo是指什么岗位
  • 无锡网站建设有限公司网站快速收录的方法
  • 网站建设通报推广网站多少钱