当前位置: 首页 > news >正文

网站公司简介模板免费下载成都互联网制作

网站公司简介模板免费下载,成都互联网制作,高明网站设计收费,法律建设企业网站前言 1. 高吞吐量#xff08;High Throughput#xff09; Kafka 设计的一个核心特性是高吞吐量。它能够每秒处理百万级别的消息#xff0c;适合需要高频次、低延迟消息传递的场景。即使在大规模分布式环境下#xff0c;它也能保持很高的吞吐量和性能#xff0c;支持低延…前言 1. 高吞吐量High Throughput Kafka 设计的一个核心特性是高吞吐量。它能够每秒处理百万级别的消息适合需要高频次、低延迟消息传递的场景。即使在大规模分布式环境下它也能保持很高的吞吐量和性能支持低延迟的数据传输。 2. 可扩展性Scalability Kafka 具有强大的可扩展性。它支持水平扩展可以轻松增加更多的节点来处理更多的数据流量。Kafka 通过分区机制Partitioning和分布式架构确保即使数据量剧增也能够平滑地扩展。 分区消息被划分成多个分区partition每个分区可以独立存储和读取数据从而支持并行处理。 副本Kafka 会将每个分区的数据复制到多个节点上以确保高可用性和容错性。 3. 高可用性和容错性Fault Tolerance Kafka 提供了内建的高可用性和容错机制。它通过将消息复制到多个代理broker上来确保数据的可靠性即使部分节点出现故障数据仍然可以从其他副本中恢复。这种机制使得 Kafka 能够承受硬件故障而不会丢失数据。 4. 持久化和日志存储Durability and Log Storage Kafka 的消息是持久化存储的意味着消息会被写入磁盘并按日志顺序存储。这使得 Kafka 不仅可以作为一个消息队列还能用作长期的日志存储系统能够回溯历史数据。消息默认保留在 Kafka 中一段时间消费者可以根据需要按需读取。 5. 低延迟Low Latency Kafka 的设计能够提供低延迟的数据传输特别适合于实时流处理的应用场景。Kafka 能够以毫秒级的延迟来传输大量数据这对许多实时数据处理应用至关重要比如实时分析、监控系统等。 6. 强大的消费模型Consumer Model Kafka 提供了灵活的消费模型允许消费者以不同的方式读取消息 发布/订阅模式多个消费者可以同时订阅相同的主题Topic消息将被广播给所有消费者。 点对点模式消费者组Consumer Group机制允许多个消费者协调工作处理不同的消息分区。 消息偏移量Kafka 维护每个消费者的消息偏移量offset消费者可以从任意位置开始读取消息支持灵活的消息消费和重播。 7. 分布式和横向扩展Distributed and Horizontal Scaling Kafka 本身是一个分布式系统设计上支持横向扩展。随着系统需求的增加可以简单地增加更多的 Kafka 节点分区和副本会在节点之间自动重新平衡确保负载均匀分布和高可用性。 8. 支持流处理Stream Processing Kafka 不仅是一个消息队列也可以作为流处理平台。通过 Kafka Streams API可以对流数据进行实时处理例如聚合、过滤、连接等。此外Kafka 可以与其他流处理框架如 Apache Flink、Apache Spark集成构建复杂的数据处理管道。 9. 灵活的集成能力Integration Capabilities Kafka 被广泛集成到各种技术栈中包括传统数据库、数据仓库、大数据系统、微服务架构等。Kafka 的强大支持让它成为企业架构中的核心组件。 Kafka ConnectKafka Connect 是 Kafka 官方提供的一个框架用于将 Kafka 与外部系统如数据库、文件系统、Hadoop、Elasticsearch 等进行连接。它简化了系统之间的数据同步和集成。 10. 广泛的社区支持和生态系统 Kafka 拥有活跃的开源社区和庞大的生态系统。它得到了许多企业的广泛使用拥有大量的插件和扩展工具可以满足各种需求。Kafka 的生态系统包括但不限于 Kafka Streams用于实时数据流处理。 Kafka Connect用于与外部系统集成。 KSQL用于在 Kafka 上执行 SQL 查询的流处理工具。 11. 使用场景 Kafka 被广泛应用于以下场景 日志收集与传输Kafka 可以作为一个日志聚合平台将来自不同服务或系统的日志收集起来并传输到中央日志分析平台。 实时数据处理Kafka 用于支持实时数据处理和流计算例如监控系统、推荐系统、用户行为分析等。 事件驱动架构Kafka 非常适合于事件驱动的微服务架构帮助微服务之间实现解耦和异步通信。 数据管道Kafka 作为一个高效的数据管道广泛用于将不同系统中的数据实时传输到数据仓库、数据湖或分析平台。 总结 Kafka 是一个高性能、可扩展、容错且持久化的消息队列系统非常适合处理大规模的实时数据流。它在大数据流处理、日志聚合、微服务架构、事件驱动架构等多个领域都有广泛应用。如果你的系统需要处理高吞吐量、低延迟、可扩展的消息传递和实时数据流处理那么 Kafka 是一个非常合适的选择。 使用教程 1.导入依赖 !-- Kafka --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency 2.导入配置 # Spring spring:kafka:producer:# Kafka服务器bootstrap-servers: 你自己的kafka服务器地址# 开启事务必须在开启了事务的方法中发送否则报错transaction-id-prefix: kafkaTx-# 发生错误后消息重发的次数开启事务必须设置大于0。retries: 3# acks0 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks1 只要集群的首领节点收到消息生产者就会收到一个来自服务器成功响应。# acksall 只有当所有参与复制的节点全部收到消息时生产者才会收到一个来自服务器的成功响应。# 开启事务时必须设置为allacks: all# 当有多个消息需要被发送到同一个分区时生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小按照字节数计算。batch-size: 16384# 生产者内存缓冲区的大小。buffer-memory: 1024000# 键的序列化方式key-serializer: org.springframework.kafka.support.serializer.JsonSerializer# 值的序列化方式建议使用Json这种序列化方式可以无需额外配置传输实体类value-serializer: org.springframework.kafka.support.serializer.JsonSerializerconsumer:# Kafka服务器bootstrap-servers: 你自己的kafka服务器地址group-id: firstGroup# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式如1S,1M,2H,5D#auto-commit-interval: 2s# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理# earliest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费分区的记录# latest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据在消费者启动之后生成的记录# none当各分区都存在已提交的offset时从提交的offset开始消费只要有一个分区不存在已提交的offset则抛出异常auto-offset-reset: latest# 是否自动提交偏移量默认值是true为了避免出现重复数据和数据丢失可以把它设置为false然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式#key-deserializer: org.apache.kafka.common.serialization.StringDeserializerkey-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 值的反序列化方式建议使用Json这种序列化方式可以无需额外配置传输实体类value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 配置消费者的 Json 反序列化的可信赖包反序列化实体类需要properties:spring:json:trusted:packages: *# 这个参数定义了poll方法最多可以拉取多少条消息默认值为500。如果在拉取消息的时候新消息不足500条那有多少返回多少如果超过500条每次只返回500。# 这个默认值在有些场景下太大有些场景很难保证能够在5min内处理完500条消息# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,# 然后这批消息会被分配到另一个消费者中还是会处理不完这样这批消息就永远也处理不完。# 要避免出现上述问题提前评估好处理一条消息最长需要多少时间然后覆盖默认的max.poll.records参数# 注需要开启BatchListener批量监听才会生效如果不开启BatchListener则不会出现reBalance情况max-poll-records: 3properties:# 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalancemax:poll:interval:ms: 600000# 当broker多久没有收到consumer的心跳请求后就触发reBalance默认值是10ssession:timeout:ms: 10000listener:# 在侦听器容器中运行的线程数一般设置为 机器数*分区数concurrency: 4# 自动提交关闭需要设置手动消息确认ack-mode: manual_immediate# 消费监听接口监听的主题不存在时默认会报错所以设置为false忽略错误missing-topics-fatal: false# 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalancepoll-timeout: 600000 3.config文件(一共4个,已经封装好直接使用即可) import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap; import java.util.Map;/*** kafka配置也可以写在yml这个文件会覆盖yml*/ SpringBootConfiguration public class KafkaConsumerConfig {Value(${spring.kafka.consumer.bootstrap-servers})private String bootstrapServers;Value(${spring.kafka.consumer.group-id})private String groupId;Value(${spring.kafka.consumer.enable-auto-commit})private boolean enableAutoCommit;Value(${spring.kafka.properties.session.timeout.ms})private String sessionTimeout;Value(${spring.kafka.properties.max.poll.interval.ms})private String maxPollIntervalTime;Value(${spring.kafka.consumer.max-poll-records})private String maxPollRecords;Value(${spring.kafka.consumer.auto-offset-reset})private String autoOffsetReset;Value(${spring.kafka.listener.concurrency})private Integer concurrency;Value(${spring.kafka.listener.missing-topics-fatal})private boolean missingTopicsFatal;Value(${spring.kafka.listener.poll-timeout})private long pollTimeout;Beanpublic MapString, Object consumerConfigs() {MapString, Object propsMap new HashMap(16);propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//是否自动提交偏移量默认值是true为了避免出现重复数据和数据丢失可以把它设置为false然后手动提交偏移量propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);//自动提交的时间间隔自动提交开启时生效propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 2000);//该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理//earliest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费分区的记录//latest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据在消费者启动之后生成的记录//none当各分区都存在已提交的offset时从提交的offset开始消费只要有一个分区不存在已提交的offset则抛出异常propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);//两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalancepropsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);//这个参数定义了poll方法最多可以拉取多少条消息默认值为500。如果在拉取消息的时候新消息不足500条那有多少返回多少如果超过500条每次只返回500。//这个默认值在有些场景下太大有些场景很难保证能够在5min内处理完500条消息//如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,//然后这批消息会被分配到另一个消费者中还是会处理不完这样这批消息就永远也处理不完。//要避免出现上述问题提前评估好处理一条消息最长需要多少时间然后覆盖默认的max.poll.records参数//注需要开启BatchListener批量监听才会生效如果不开启BatchListener则不会出现reBalance情况propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//当broker多久没有收到consumer的心跳请求后就触发reBalance默认值是10spropsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);//序列化建议使用Json这种序列化方式可以无需额外配置传输实体类propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);return propsMap;}Beanpublic ConsumerFactoryObject, Object consumerFactory() {//配置消费者的 Json 反序列化的可信赖包反序列化实体类需要try (JsonDeserializerObject deserializer new JsonDeserializer()) {deserializer.trustedPackages(*);return new DefaultKafkaConsumerFactory(consumerConfigs(), new JsonDeserializer(), deserializer);}}/*** KafkaListenerContainerFactory 用来做消费者的配置* return*/Beanpublic KafkaListenerContainerFactoryConcurrentMessageListenerContainerObject, Object kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactoryObject, Object factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory());//在侦听器容器中运行的线程数一般设置为 机器数*分区数factory.setConcurrency(concurrency);//消费监听接口监听的主题不存在时默认会报错所以设置为false忽略错误factory.setMissingTopicsFatal(missingTopicsFatal);// 自动提交关闭需要设置手动消息确认factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);factory.getContainerProperties().setPollTimeout(pollTimeout);// 设置为批量监听需要用List接收// factory.setBatchListener(true);return factory;} }import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.transaction.KafkaTransactionManager;import java.util.HashMap; import java.util.Map;SpringBootConfiguration public class KafkaProviderConfig {Value(${spring.kafka.producer.bootstrap-servers})private String bootstrapServers;Value(${spring.kafka.producer.transaction-id-prefix})private String transactionIdPrefix;Value(${spring.kafka.producer.acks})private String acks;Value(${spring.kafka.producer.retries})private String retries;Value(${spring.kafka.producer.batch-size})private String batchSize;Value(${spring.kafka.producer.buffer-memory})private String bufferMemory;Beanpublic MapString, Object producerConfigs() {MapString, Object props new HashMap(16);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//acks0 生产者在成功写入消息之前不会等待任何来自服务器的响应。//acks1 只要集群的首领节点收到消息生产者就会收到一个来自服务器成功响应。//acksall 只有当所有参与复制的节点全部收到消息时生产者才会收到一个来自服务器的成功响应。//开启事务必须设为allprops.put(ProducerConfig.ACKS_CONFIG, acks);//发生错误后消息重发的次数开启事务必须大于0props.put(ProducerConfig.RETRIES_CONFIG, retries);//当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送//批次的大小可以通过batch.size 参数设置.默认是16KB//较小的批次大小有可能降低吞吐量批次大小为0则完全禁用批处理。//比如说kafka里的消息5秒钟Batch才凑满了16KB才能发送出去。那这些消息的延迟就是5秒钟//实测batchSize这个参数没有用props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);//有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,//即使数据没达到16KB,也将这个批次发送出去props.put(ProducerConfig.LINGER_MS_CONFIG, 5000);//生产者内存缓冲区的大小props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);//反序列化和生产者的序列化方式对应props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);return props;}Beanpublic ProducerFactoryObject, Object producerFactory() {DefaultKafkaProducerFactoryObject, Object factory new DefaultKafkaProducerFactory(producerConfigs());//开启事务会导致 LINGER_MS_CONFIG 配置失效factory.setTransactionIdPrefix(transactionIdPrefix);return factory;}Beanpublic KafkaTransactionManagerObject, Object kafkaTransactionManager(ProducerFactoryObject, Object producerFactory) {return new KafkaTransactionManager(producerFactory);}Beanpublic KafkaTemplateObject, Object kafkaTemplate() {return new KafkaTemplate(producerFactory());} }import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.kafka.support.ProducerListener; import org.springframework.lang.Nullable; import org.springframework.stereotype.Component;/*** kafka消息发送回调*/ Component public class KafkaSendResultHandler implements ProducerListenerObject, Object {Overridepublic void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {System.out.println(消息发送成功 producerRecord.toString());}Overridepublic void onError(ProducerRecord producerRecord, Nullable RecordMetadata recordMetadata, Exception exception) {System.out.println(消息发送失败 producerRecord.toString() exception.getMessage());} }import org.apache.kafka.clients.consumer.Consumer; import org.springframework.kafka.listener.KafkaListenerErrorHandler; import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.lang.NonNull; import org.springframework.messaging.Message; import org.springframework.stereotype.Component;/*** 异常处理*/ Component public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {OverrideNonNullpublic Object handleError(NonNull Message? message, NonNull ListenerExecutionFailedException exception) {return new Object();}OverrideNonNullpublic Object handleError(NonNull Message? message, NonNull ListenerExecutionFailedException exception,Consumer?, ? consumer) {System.out.println(消息详情 message);System.out.println(异常信息 exception);System.out.println(消费者详情 consumer.groupMetadata());System.out.println(监听主题 consumer.listTopics());// TODO 消费记录return KafkaListenerErrorHandler.super.handleError(message, exception, consumer);} }4.代码实现 生产者: Autowiredprivate KafkaTemplateObject,Object kafkaTemplate;public void test() {// 生成一个随机的 UUID 字符串String message UUID.randomUUID().toString();// 使用 KafkaTemplate 将消息发送到 Kafka 中的 test 主题// KafkaTemplate.send() 方法的第一个参数是目标主题名第二个参数是要发送的消息内容kafkaTemplate.send(test, message);// 发送成功后Kafka 会异步处理消息返回一个 Future 对象 // 如果需要进一步处理发送成功与否的回调可以通过该对象的回调接口进行处理} 消费者: import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;Component Log4j2 public class Consumer {Autowiredprivate StringRedisTemplate redisTemplate;private static final String REDIS_SET_KEY test;// Kafka 监听器配置KafkaListener(topics test,containerFactory kafkaListenerContainerFactory, errorHandler myKafkaListenerErrorHandler)public void consumer(ConsumerRecordObject, Object consumerRecord, Acknowledgment acknowledgment) {String key (String) consumerRecord.key();Object value consumerRecord.value();// 记录消费的基本信息便于追踪log.info(开始消费消息Topic: {}, Partition: {}, Offset: {}, 消费内容: {}, consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), value);try {// Redis 去重确保消息没有重复消费Long result redisTemplate.opsForSet().add(REDIS_SET_KEY, key);// 如果 Redis 返回 1表示该 key 尚未消费才进行后续处理if (result ! null result 1) {// 处理业务逻辑// 手动提交偏移量acknowledgment.acknowledge();log.info(消费成功消费的信息: {}, value);} else {log.info(消息已消费过跳过处理。key: {}, key);acknowledgment.acknowledge();}} catch (Exception e) {log.error(消费失败错误信息: {}, e.getMessage(), e);// 如果发生异常进行重试处理acknowledgment.nack(1000); // 可配置重试时间}}// 可选定时清理 Redis 集合中的已消费记录Scheduled(fixedDelay 3600000) // 每小时清理一次public void cleanUpRedis() {// 可以根据消息处理的需要调整清理策略redisTemplate.delete(REDIS_SET_KEY);log.info(已清理 Redis 中的消费记录);} }
http://www.hkea.cn/news/14281558/

相关文章:

  • 先锋设计网站什么是网站静态页面
  • 外贸推广建站公司微网站建设最新报价
  • 济南想做网站淘宝付费推广
  • 微网站的案例手机app制作网站模板
  • 邯郸网站制作设计网站排版
  • 傻瓜做网站软件wordpress全站广告位
  • 自学做网站的做网站的挣钱么
  • 做网站直接开二级域名做网站挂广告赚多少
  • 做网站最烂公司企业网络营销是什么
  • 有哪些基于网站开发的报表设计器七牛云wordpress 代码
  • 网站设置专栏网站建设客户案例
  • 外国网站设计风格网站是com好点还是cn
  • 微网站怎么免费做网站建设实现功能
  • 上海网站建设报价单艺术创意设计图片大全
  • 增加网站备案宁波营销团队外包
  • 免费的建站软件推荐下载wordpress有趣的插件
  • 优化网站价格广告公司简介文案
  • 湖南自考网站建设与管理建站之星网站建设下载版
  • 网站名拍卖价格网站建设需求计划书
  • 如何做自己的淘宝网站wordpress做的企业官网
  • 如何设计制作企业网站中山seo
  • 苏州网站建设开发云南省建设工程招标投标行业协会网站
  • dedecms模板站源码django校园网站开发
  • 长春网站网站建设广州公司注册流程及材料
  • 唯美音乐图文网站建设惠州seo网站管理
  • 服务器iis做网站网站建设与网页设计pdf
  • 网站后台系统的易用性网站解析域名
  • 营销型企业网站建设的内容网络维护工作总结范文
  • 海门网站制作如何建设和优化网站
  • 无锡定制网站建设公众号如何推广产品