当前位置: 首页 > news >正文

网站首页样式51栗子

网站首页样式,51栗子,手机做图片的网站,wordpress极简杂志主题一、发送消息细节 在发送消息的特别注意: 在版本 3.0 中#xff0c;以前返回 ListenableFuture 的方法已更改为返回 CompletableFuture。为了便于迁移#xff0c;2.9 版本添加了一个方法 usingCompletableFuture#xff08;#xff09;#xff0c;该方法为 CompletableFu…一、发送消息细节 在发送消息的特别注意: 在版本 3.0 中以前返回 ListenableFuture 的方法已更改为返回 CompletableFuture。为了便于迁移2.9 版本添加了一个方法 usingCompletableFuture该方法为 CompletableFuture 返回类型提供了相同的方法;此方法不再可用。 1.1 ProducerConfig 在spring kafka项目当中,提供了Kafka 生产者的相关配置.在类ProducerConfig当中,其值分别定义在不同的常量类当中. 结合上篇当中发送消息的时候控制台输出的日志,具体字段含义如下所示: 通用配置 acks 1生产者要求领导者在确认消息写入之前收到的最少同步副本数量。设置为 1 表示领导者成功写入消息后即确认不等待副本同步完成。auto.include.jmx.reporter true自动包含用于Java Management ExtensionsJMX的报告器以便通过 JMX 监控生产者的指标。batch.size 16384当多个消息发送到同一分区时生产者将尝试在单个请求中发送消息的批量大小以字节为单位。bootstrap.servers [ip:9092]Kafka 集群的服务器地址列表用于建立初始连接。buffer.memory 33554432生产者可以用来缓冲等待发送到服务器的记录的总内存大小以字节为单位。client.id rj-spring-kafka-demo-producer-1生产者的客户端 ID用于在 Kafka 服务器端标识此生产者。compression.type none消息的压缩类型可以是 none、gzip、snappy 等。connections.max.idle.ms 540000在关闭不活动的连接之前连接可以保持空闲的最长时间以毫秒为单位。delivery.timeout.ms 120000消息发送的超时时间包括所有可能的重试。enable.idempotence false是否启用生产者的幂等性确保在重试时不会产生重复的消息。 enable.metrics.push true是否启用推送生产者的指标数据到外部系统。interceptor.classes []生产者拦截器的类列表用于在发送消息之前或之后执行自定义逻辑。key.serializer class.org.apache.kafka.common.serialization.StringSerializer用于序列化消息键的类。linger.ms 0生产者在发送批次之前等待的额外时间以毫秒为单位以允许更多消息积累在批次中。max.block.ms 60000生产者在发送消息或获取元数据等操作中阻塞的最长时间。max.in.flight.requests.per.connection 5在单个连接上允许的最大未确认请求数量。max.request.size 1048576生产者请求的最大大小以字节为单位。metadata.max.age.ms 300000元数据如主题分区信息的过期时间以毫秒为单位之后将强制刷新元数据。metadata.max.idle.ms 300000元数据在没有任何更新的情况下保持有效的最长时间。metric.reporters []自定义的指标报告器类列表。 metrics.num.samples 2用于计算指标的样本数量。metrics.recording.level INFO指标记录的级别例如 INFO、DEBUG 等。metrics.sample.window.ms 30000用于计算指标的时间窗口大小以毫秒为单位。partitioner.adaptive.partitioning.enable true是否启用自适应分区功能根据负载动态调整分区分配。partitioner.availability.timeout.ms 0分区器在确定分区不可用时等待的时间以毫秒为单位。 partitioner.class null自定义分区器的类如果未设置则使用默认分区器。partitioner.ignore.keys false是否忽略消息的键不基于键进行分区。receive.buffer.bytes 32768套接字接收缓冲区的大小以字节为单位。reconnect.backoff.max.ms 1000重新连接的最大退避时间以毫秒为单位。reconnect.backoff.ms 50重新连接的初始退避时间以毫秒为单位。request.timeout.ms 30000生产者请求的超时时间包括发送请求和接收响应的时间。retries 3生产者在发送消息失败时的重试次数。retry.backoff.max.ms 1000重试之间的最大退避时间以毫秒为单位。retry.backoff.ms 100重试之间的初始退避时间以毫秒为单位。 SASL 相关配置用于安全认证): sasl.client.callback.handler.class nullSASL 客户端回调处理程序的类。sasl.jaas.config nullJava Authentication and Authorization ServiceJAAS配置用于 SASL 认证。sasl.kerberos.kinit.cmd /usr/bin/kinitKerberos 的 kinit 命令路径。sasl.kerberos.min.time.before.relogin 60000Kerberos 重新登录之前的最小时间以毫秒为单位。sasl.kerberos.service.name nullKerberos 服务名称。sasl.kerberos.ticket.renew.jitter 0.05Kerberos 票证更新的抖动因子。sasl.kerberos.ticket.renew.window.factor 0.8Kerberos 票证更新的窗口因子。sasl.login.callback.handler.class nullSASL 登录回调处理程序的类。sasl.login.class nullSASL 登录机制的类。sasl.login.connect.timeout.ms nullSASL 登录连接超时时间以毫秒为单位。sasl.login.read.timeout.ms nullSASL 登录读取超时时间以毫秒为单位。sasl.login.refresh.buffer.seconds 300SASL 登录刷新缓冲区时间以秒为单位。sasl.login.refresh.min.period.seconds 60SASL 登录刷新的最小周期以秒为单位。sasl.login.refresh.window.factor 0.8SASL 登录刷新的窗口因子。sasl.login.refresh.window.jitter 0.05SASL 登录刷新的抖动因子。sasl.login.retry.backoff.max.ms 10000SASL 登录重试的最大退避时间以毫秒为单位。sasl.login.retry.backoff.ms 100SASL 登录重试的初始退避时间以毫秒为单位。sasl.mechanism GSSAPISASL 认证机制如 GSSAPI、PLAIN 等。sasl.oauthbearer.clock.skew.seconds 30OAuth Bearer 令牌的时钟偏差时间以秒为单位。sasl.oauthbearer.expected.audience null预期的 OAuth Bearer 令牌受众。sasl.oauthbearer.expected.issuer null预期的 OAuth Bearer 令牌发行者。sasl.oauthbearer.jwks.endpoint.refresh.ms 3600000OAuth Bearer JWKS 端点的刷新时间以毫秒为单位。sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms 10000OAuth Bearer JWKS 端点重试的最大退避时间以毫秒为单位。sasl.oauthbearer.jwks.endpoint.retry.backoff.ms 100OAuth Bearer JWKS 端点重试的初始退避时间以毫秒为单位。sasl.oauthbearer.jwks.endpoint.url nullOAuth Bearer JWKS 端点的 URL。sasl.oauthbearer.scope.claim.name scopeOAuth Bearer 令牌范围声明的名称。sasl.oauthbearer.sub.claim.name subOAuth Bearer 令牌主题声明的名称。sasl.oauthbearer.token.endpoint.url nullOAuth Bearer 令牌端点的 URL。 安全协议相关配置 security.protocol PLAINTEXT生产者使用的安全协议如 PLAINTEXT、SSL、SASL_PLAINTEXT 等。security.providers null安全提供程序的类列表。 网络相关配置 send.buffer.bytes 131072套接字发送缓冲区的大小以字节为单位。socket.connection.setup.timeout.max.ms 30000套接字连接设置的最大超时时间以毫秒为单位。socket.connection.setup.timeout.ms 10000套接字连接设置的初始超时时间以毫秒为单位。 SSL 相关配置用于加密连接 ssl.cipher.suites nullSSL 加密套件列表。ssl.enabled.protocols [TLSv1.2, TLSv1.3]启用的 SSL 协议版本列表。ssl.endpoint.identification.algorithm httpsSSL 端点标识算法。ssl.engine.factory.class nullSSL 引擎工厂的类。ssl.key.password nullSSL 密钥密码。ssl.keymanager.algorithm SunX509SSL 密钥管理器算法。ssl.keystore.certificate.chain nullSSL 密钥库证书链。ssl.keystore.key nullSSL 密钥库的密钥。ssl.keystore.location nullSSL 密钥库的位置。ssl.keystore.password nullSSL 密钥库的密码。ssl.keystore.type JKSSSL 密钥库的类型。ssl.protocol TLSv1.3SSL 协议版本。ssl.provider nullSSL 提供程序。ssl.secure.random.implementation nullSSL 安全随机数生成器的实现。ssl.trustmanager.algorithm PKIXSSL 信任管理器算法。ssl.truststore.certificates nullSSL 信任库证书。ssl.truststore.location nullSSL 信任库的位置。ssl.truststore.password nullSSL 信任库的密码。ssl.truststore.type JKSSSL 信任库的类型。 事务相关配置 transaction.timeout.ms 60000事务的超时时间以毫秒为单位。transactional.id null事务 ID用于标识一个事务性生产者。 序列化相关配置 value.serializer class org.apache.kafka.common.serialization.StringSerializer用于序列化消息值的类。 1.2 sendDefault CompletableFutureSendResultK, V sendDefault(V data);该api要求向模板提供的默认主题发送消息.要使用该模板您可以配置生产者工厂并在模板的构造函数中提供它. Configuration public class KafkaConfig {Beanpublic MapString, Object producerConfig(){MapString, Object map new HashMap();map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ip:9092);map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);map.put(ProducerConfig.RETRIES_CONFIG, 3);return map;}Beanpublic ProducerFactoryInteger, Object producerFactory(){return new DefaultKafkaProducerFactory(producerConfig());}Beanpublic KafkaTemplateInteger,Object kafkaTemplate(){KafkaTemplateInteger, Object kafkaTemplate new KafkaTemplate(producerFactory());// 设置默认主题kafkaTemplate.setDefaultTopic(rj-default-topic);return kafkaTemplate;} }此时发送的时候,可以不必指定主题了,而直接将消息发送到我们自己定义的默认的主题当中了 GetMapping(/default)public String sendDefaultMsg(String msg) throws ExecutionException, InterruptedException {CompletableFutureSendResultInteger, Object completableFuture kafkaTemplate.sendDefault(msg);SendResultInteger, Object sendResult completableFuture.get();log.info(sendResult:{}, sendResult);return 向默认主题发送消息;}从版本 2.5 开始您现在可以覆盖工厂的 ProducerConfig 属性以从同一工厂创建具有不同生产者配置的模板。 Beanpublic KafkaTemplateInteger,Object kafkaTemplate(){KafkaTemplateInteger, Object kafkaTemplate new KafkaTemplate(producerFactory());// 设置默认主题kafkaTemplate.setDefaultTopic(rj-default-topic);return kafkaTemplate;}/*** 从同一个工厂创建具有不同生产者的配置的模块* param producerFactory* return*/Beanpublic KafkaTemplateString, String stringKafkaTemplate(ProducerFactoryString, String producerFactory){return new KafkaTemplate(producerFactory, Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));}// 如果要重用ProducerFactory,则必须修改一下ProducerFactory的初始的泛型,修改为如下的格式Beanpublic ProducerFactory?, ? producerFactory(){return new DefaultKafkaProducerFactory(producerConfig());}当然以上的ProducerFactory相关配置的属性,也可以在application.yml配置文件当中进行配置. 1.3 Message接口 在使用KafkaTemplate发送数据的时候,可以直接发送一个Message.方法定义如下所示: Overridepublic CompletableFutureSendResultK, V send(Message? message) {ProducerRecord?, ? producerRecord this.messageConverter.fromMessage(message, this.defaultTopic);if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jacksonbyte[] correlationId message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);if (correlationId ! null) {producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);}}return observeSend((ProducerRecordK, V) producerRecord);}这里需要注意, Message是在org.springframework.messaging包当中定义的,定义接口如下所示: // 带有 headers 和 body 的通用消息表示形式。 public interface MessageT {/*** Return the message payload.* 消息体*/T getPayload();/*** Return message headers for the message (never {code null} but may be empty).* 消息头,可以在发送的时候指定*/MessageHeaders getHeaders();}这里我们使用实现类GenericMessage即可: // 推荐使用这个构造方法,简单方便 public GenericMessage(T payload, MapString, Object headers) {this(payload, new MessageHeaders(headers)); }public GenericMessage(T payload, MessageHeaders headers) {Assert.notNull(payload, Payload must not be null);Assert.notNull(headers, MessageHeaders must not be null);this.payload payload;this.headers headers; }测试代码: GetMapping(/message)public String sendMessage(){MapString, Object map new HashMap();// 向 Kafka 发送数据时包含主题的headermap.put(KafkaHeaders.TOPIC, Constants.Kafka.TOPIC_NAME);map.put(KafkaHeaders.KEY, rj);// 包含从中接收消息的主题的header。map.put(KafkaHeaders.RECEIVED_TOPIC, Constants.Kafka.TOPIC_NAME);// 创建MessageHeaders对象MessageHeaders messageHeaders new MessageHeaders(map);// 构建Message对象MessageString message new GenericMessage(hello, message!, messageHeaders);// 将Message发送到指定的topicCompletableFutureSendResultInteger, Object completableFuture kafkaTemplate.send(message);completableFuture.whenComplete((result, ex) - {if (ex null) {System.out.println(发送成功);} else {System.out.println(发送失败);}});return 发送成功!;}上述Map集合的key直接使用定义好的即可: KafkaHeaders, 用的时候,需要啥就添加啥. 注意事项: 使用的KafkaTemplate发送消息的时候,要注意泛型匹配的问题.这里步及到key、value的序列化与反序列化操作.如果重用了ProducerFactory则需要注意使用的泛型和发送消息的类型是否能匹配得上. 如上所示: 我们使用的的是private final KafkaTemplateString, String kafkaTemplate,我们通过配置文件,注入到容器的类型是: Bean public KafkaTemplateString, String stringKafkaTemplate(ProducerFactoryString, String producerFactory){return new KafkaTemplate(producerFactory,// 【注意】: 这里重新设置了value的序列化,而对于key的序列化是在构建ProducerFactory的时候,传入的. Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)); }而注入到容器当中ProducerFactory对象,在构建的时候,分别设置了key、value的序列化规则. 以上的泛型必须得能匹配上,或者可以直接转换,否则会抛出异常. 1.4 ProducerListener 使用ProducerListener配置KafkaTemplate以获取带有发送结果成功或失败的异步回调而不是等待Future完成。下面的清单显示了ProducerListener接口的定义 public interface ProducerListenerK, V {// 发送成功void onSuccess(ProducerRecordK, V producerRecord, RecordMetadata recordMetadata);// 发送失败void onError(ProducerRecordK, V producerRecord, RecordMetadata recordMetadata,Exception exception); }默认情况下模板配置了 LoggingProducerListener它会记录错误并且在发送成功时不执行任何操作。 Slf4j public class CustomProducerListener implements ProducerListenerString, String {Overridepublic void onSuccess(ProducerRecordString, String producerRecord, RecordMetadata recordMetadata) {String topic producerRecord.topic();String key producerRecord.key();String value producerRecord.value();Long timestamp producerRecord.timestamp();int partition recordMetadata.partition();long offset recordMetadata.offset();log.info(消息发送成功topic:{},key:{},value:{},timestamp:{},partition:{},offset:{}, topic, key, value, timestamp, partition, offset);}Overridepublic void onError(ProducerRecordString, String producerRecord, RecordMetadata recordMetadata, Exception exception) {String topic producerRecord.topic();String key producerRecord.key();String value producerRecord.value();Long timestamp producerRecord.timestamp();int partition recordMetadata.partition();long offset recordMetadata.offset();log.error(消息发送失败topic:{},key:{},value:{},timestamp:{},partition:{},offset:{}, exception message: {}, topic, key, value, timestamp, partition, offset, exception.getMessage());} }将自定义ProducerListener的对象,配置到KafkaTemplate当中 /*** 从同一个工厂创建具有不同生产者的配置的模块* param producerFactory* return*/ Bean public KafkaTemplateString, String stringKafkaTemplate(ProducerFactoryString, String producerFactory){KafkaTemplateString, String stringStringKafkaTemplate new KafkaTemplate(producerFactory, Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));// 配置ProducerListenerstringStringKafkaTemplate.setProducerListener(new CustomProducerListener());// 配置默认主题stringStringKafkaTemplate.setDefaultTopic(rj-string-topic);return stringStringKafkaTemplate; }发送消息: GetMapping(/listener) public String sendMsgProducerListener(String msg) {CompletableFutureSendResultString, String completableFuture stringKafkaTemplate.sendDefault(msg);completableFuture.whenComplete((result, ex) - {if (ex null) {System.out.println(发送成功);} else {System.out.println(发送失败);}});return 发送成功!; }控制台日志输出如下所示: 2024-09-24T19:20:07.06108:00 INFO 16436 --- [rj-spring-kafka-demo] [demo-producer-1] c.r.k.listener.CustomProducerListener : 消息发送成功topic:rj-string-topic,key:null,value:listener,timestamp:null,partition:0,offset:01.5 发送结果监听CompletableFuture 发送消息的send 方法返回 CompletableFutureSendResult。您可以向侦听器注册回调以异步接收发送的结果. CompletableFutureSendResultInteger, String future template.send(topic-name, msg data); future.whenComplete((result, ex) - {... });如果你希望阻塞发送线程等待结果你可以调用 future 的 get 方法;建议使用带有 timeout 的方法。如果你已经设置了 linger.ms你可能希望在等待之前调用 flush或者为了方便起见模板有一个带有 autoFlush 参数的构造函数该参数会导致模板在每次发送时 flush 在使用 Kafka 生产者发送消息时通常会得到一个表示发送任务的Future对象。如果调用future.get()方法发送线程会被阻塞直到发送结果返回。这意味着发送线程会暂停执行等待消息成功发送到 Kafka 集群并获取结果。然而直接使用get()方法可能会导致线程长时间阻塞在实际应用中可能不太理想所以建议使用带有超时参数的get(long timeout, TimeUnit unit)方法这样可以在一定时间后如果还未获取到结果就不再等待避免无限期阻塞。linger.ms和flush linger.ms属性 当设置了linger.ms生产者属性时生产者会在发送消息时等待一段时间让更多的消息积累在一个批次中以提高发送效率。如果在这段时间内积累了足够多的消息生产者会将这些消息作为一个批次发送出去。 flush()方法 如果希望立即发送部分批处理的消息而不是等待linger.ms指定的时间可以调用flush()方法。这个方法会强制生产者立即发送当前缓冲区内的消息而不管是否满足批次大小或等待时间的条件。 带有autoFlush参数的构造函数 为了方便起见KafkaTemplate有一个带有autoFlush参数的构造函数。当设置autoFlush为true时每次发送消息后模板会自动调用flush()方法确保消息立即发送出去。这在需要立即确认消息发送的场景中非常有用但可能会降低发送效率因为每次发送都不会等待批次积累。 在使用 Kafka 生产者时需要根据实际需求合理选择是否阻塞发送线程等待结果以及是否使用flush()方法或带有autoFlush参数的构造函数来控制消息的发送时机。如果设置了linger.ms属性并且需要在特定情况下立即发送部分批处理的消息可以考虑调用flush()方法或使用带有autoFlush参数的构造函数。 linger.ms属性可以在配置文件当中进行配置. Bean public MapString, Object producerConfig(){// ...// 配置linger.msmap.put(ProducerConfig.LINGER_MS_CONFIG, 500);return map; }SendResult 有两个属性ProducerRecord 和 RecordMetadata。 public class SendResultK, V {// ProducerRecord是生产者发送消息时使用的数据结构private final ProducerRecordK, V producerRecord;// 当生产者成功发送消息后会返回一个RecordMetadata对象private final RecordMetadata recordMetadata;public SendResult(ProducerRecordK, V producerRecord, RecordMetadata recordMetadata) {this.producerRecord producerRecord;this.recordMetadata recordMetadata;}public ProducerRecordK, V getProducerRecord() {return this.producerRecord;}public RecordMetadata getRecordMetadata() {return this.recordMetadata;}Overridepublic String toString() {return SendResult [producerRecord this.producerRecord , recordMetadata this.recordMetadata ];}}1.5.1 ProducerRecord 主题Topic 确定消息的归属主题。Kafka 中的不同主题用于区分不同类型的消息流。例如一个电商系统可能有 “订单主题”、“用户行为主题” 等。 生产者通过指定主题将消息发送到 Kafka 集群中的相应主题 分区Partition 分区的目的是为了实现可扩展性和并行处理。Kafka 将一个主题的数据分布在多个分区上不同的分区可以由不同的消费者或消费者组同时消费。 可以手动指定消息发送到特定分区通常根据消息的键或者特定的业务规则来决定分区。如果不指定Kafka 会根据默认的分区策略来分配分区。 键Key 键在消息处理中有多种用途。一方面它可以用于确定消息的分区。例如基于键的哈希值来决定消息发送到哪个分区这样可以确保具有相同键的消息被发送到同一个分区方便后续的有序处理。 键也可以在消费者端用于消息的分组和聚合。例如在处理订单数据时可以根据订单 ID 作为键将同一订单的不同状态更新消息发送到同一个分区方便消费者对同一订单的消息进行有序处理。 值Value 这是消息的实际内容可以是任何可序列化的对象。例如在一个日志系统中值可以是一条日志记录在电商系统中值可以是一个订单对象或者用户行为事件。 消息头部Headers 消息头部提供了一种在消息中添加额外元数据的方式。这些元数据可以用于传递特定的业务信息或者用于消息的路由和处理。 例如可以在头部添加消息的来源系统、消息的类型、处理优先级等信息 1.5.2 RecordMetadata 主题Topic 确认消息最终被发送到的主题与ProducerRecord中的主题相对应。这可以用于验证消息是否被发送到了正确的主题。 分区Partition 指示消息存储在哪个分区。在消费者端可以根据分区信息来确定从哪个分区读取消息。 对于需要对特定分区进行监控或管理的场景分区信息非常重要。 偏移量Offset 偏移量是消息在分区中的唯一标识它代表了消息在分区中的位置顺序。每个分区中的消息都有一个连续的偏移量。 消费者通过偏移量来确定已经消费到了哪个位置以便在下次消费时从正确的位置继续读取消息。 偏移量也可以用于数据恢复和重新处理消息的场景。例如如果消费者出现故障在恢复时可以根据存储的偏移量重新开始消费。 时间戳Timestamp 时间戳可以由生产者在发送消息时指定也可以由 Kafka 自动生成。时间戳可以用于基于时间的消息处理和查询。 例如可以根据时间戳来查询特定时间段内的消息或者对消息进行时间序列分析 二、接收消息细节 本小节,我们着重介绍一下KafkaListener这个注解的使用. 2.1 字段解释 KafkaListener是 Spring Kafka 提供的用于监听 Kafka 主题消息的注解。 字段名称类型说明idString为监听器指定一个唯一的标识符。如果不指定会自动生成一个 ID。这个 ID 可以用于在代码中通过KafkaListenerEndpointRegistry来获取特定的监听器容器。此外如果设置了这个值并且idIsGroup()为false或者同时设置了groupId()那么这个 ID 将覆盖消费者工厂配置中的groupId属性。支持 SpELSpring Expression Language表达式#{...}和属性占位符${...}。containerFactoryString指定用于创建消息监听器容器的KafkaListenerContainerFactory的 bean 名称。如果不指定则使用默认的容器工厂如果存在。SpEL 表达式可以评估为容器工厂实例或 bean 名称。topicsString[]指定监听器要监听的一个或多个 Kafka 主题名称。可以是主题名称、属性占位符键或 SpEL 表达式表达式必须解析为主题名称。使用组管理Kafka 会将分区分配给组内成员。与topicPattern()和topicPartitions()互斥。topicPatternString使用正则表达式指定要监听的 Kafka 主题模式。可以是主题模式、属性占位符键或 SpEL 表达式表达式必须解析为主题模式支持字符串或java.util.regex.Pattern结果类型。使用组管理Kafka 会将分区分配给组内成员。与topics()和topicPartitions()互斥。topicPartitionsTopicPartition[]当使用手动主题/分区分配时指定监听器要监听的主题和分区。与topicPattern()和topics()互斥。containerGroupString如果提供了这个值监听器容器将被添加到一个以这个值为名称、类型为CollectionMessageListenerContainer的 bean 中。这允许例如遍历这个集合来启动/停止一部分容器。从版本 2.7.3 开始这种集合 beans 已被弃用在 2.8 版本中将被移除。取而代之的是应该使用名称为containerGroup .group且类型为org.springframework.kafka.listener.ContainerGroup的 bean。支持 SpEL 表达式和属性占位符。errorHandlerString设置一个KafkaListenerErrorHandler bean 的名称在监听器方法抛出异常时调用。如果是 SpEL 表达式可以评估为KafkaListenerErrorHandler实例或 bean 名称。groupIdString覆盖消费者工厂的group.id属性仅针对这个监听器。支持 SpEL 表达式和属性占位符。idIsGroupboolean如果groupId()未提供当这个值为true时使用id()如果提供了作为消费者的group.id属性当这个值为false时使用消费者工厂中的group.id属性。clientIdPrefixString如果提供了这个值将覆盖消费者工厂配置中的客户端 ID 属性。对于每个容器实例会添加一个后缀‘-n’以确保在使用并发时的唯一性。支持 SpEL 表达式和属性占位符。beanRefString一个伪 bean 名称在这个注解中的 SpEL 表达式中用于引用定义这个监听器的当前 bean。这允许访问封闭 bean 中的属性和方法。默认值为’__listener’。concurrencyString覆盖容器工厂的concurrency设置针对这个监听器。可以是属性占位符或 SpEL 表达式评估为一个Number然后使用Number#intValue()获取值。autoStartupString设置为true或false以覆盖容器工厂中的默认自动启动设置。可以是属性占位符或 SpEL 表达式评估为Boolean或String然后使用Boolean#parseBoolean(String)获取值。propertiesString[]Kafka 消费者属性它们将覆盖消费者工厂中具有相同名称的任何属性如果消费者工厂支持属性覆盖。支持的语法与 JavaProperties文件中的键值对语法相同。group.id和client.id将被忽略。支持 SpEL 表达式和属性占位符。SpEL 表达式必须解析为String、String[]或CollectionString其中数组或集合的每个成员是一个属性名值格式与上述语法相同。splitIterablesboolean当为false且返回类型是Iterable时将结果作为单个回复记录的值返回而不是为每个元素创建单独的记录。默认值为true。如果回复类型是IterableMessage?则忽略此设置。contentTypeConverterString设置一个SmartMessageConverter如CompositeMessageConverter的 bean 名称结合org.springframework.messaging.MessageHeaders#CONTENT_TYPE头来执行转换到所需类型。如果是 SpEL 表达式可以评估为SmartMessageConverter实例或 bean 名称。batchString覆盖容器工厂的batchListener属性。监听器方法签名应该接收一个List?。如果不设置将使用容器工厂的设置。不支持 SpEL 和属性占位符因为监听器类型不能是可变的。filterString设置一个RecordFilterStrategy bean 名称以覆盖在容器工厂上配置的策略。如果是 SpEL 表达式可以评估为RecordFilterStrategy实例或 bean 名称。infoString静态信息将作为一个头添加键为org.springframework.kafka.support.KafkaHeaders#LISTENER_INFO。例如可以在RecordInterceptor、RecordFilterStrategy或监听器本身中用于任何目的。支持 SpEL 表达式和属性占位符但必须解析为String或byte[]。如果使用输入记录的头创建出站记录这个头将被剥离。containerPostProcessorString设置一个ContainerPostProcessor的 bean 名称允许在创建和配置监听器容器后对其进行自定义。这个后处理器仅应用于当前监听器容器与ContainerCustomizer不同后者应用于所有监听器容器。这个后处理器在容器自定义器如果存在之后应用。 2.2 获取Header的值 在监听消息的时候,可以通过Header(headers)的方式读取发送消息设置的headers的值. KafkaListener(groupId rj, topics Constants.Kafka.TOPIC_NAME)public void listen2(String msg, Header(KafkaHeaders.RECEIVED_TOPIC) String topic){log.info(receive msg: {}, topic: {}, msg, topic);}日志输出: 2024-09-24T20:20:57.50108:00 INFO 1692 --- [rj-spring-kafka-demo] [ntainer#0-0-C-1] com.rj.kafka.listener.UserListener : receive msg: hello, message!, topic: rj-topic2.3 获取ConsumerRecord ConsumerRecord是消费者从 Kafka 主题中读取消息时所使用的数据结构, 具体包含如下细节: 主题和分区信息 topic() 返回消息所属的 Kafka 主题名称。主题是消息的逻辑分类用于区分不同类型的消息流。例如一个电商系统可能有 “订单主题”、“用户行为主题” 等。通过这个方法可以确定消息来自哪个主题方便消费者根据不同的主题进行不同的处理逻辑。 partition() 返回消息所在的分区编号。Kafka 将一个主题的数据分布在多个分区上不同的分区可以由不同的消费者或消费者组同时消费以实现可扩展性和并行处理。了解消息所在的分区可以用于一些特定的场景例如在需要对特定分区进行监控或管理时或者在一些需要根据分区进行数据处理的情况下。 偏移量信息 offset() 返回消息在所属分区中的偏移量。偏移量是消息在分区中的唯一标识代表消息在分区中的位置顺序。随着新消息的不断写入偏移量会不断递增。消费者通过偏移量来确定已经消费到了哪个位置以便在下次消费时从正确的位置继续读取消息。偏移量也可以用于数据恢复和重新处理消息的场景。例如如果消费者出现故障在恢复时可以根据存储的偏移量重新开始消费。 键和值信息 key() 返回消息的键。消息的键可以用于分区策略以及在一些场景下可以帮助消费者进行消息的有序处理。例如基于键的哈希值来决定消息发送到哪个分区这样可以确保具有相同键的消息被发送到同一个分区方便后续的有序处理。键的类型通常是可序列化的对象可以根据具体的业务需求来设置和使用。 value() 返回消息的实际内容。这是消费者真正关心的消息数据可以是任何可序列化的对象。例如在一个日志系统中值可以是一条日志记录在电商系统中值可以是一个订单对象或者用户行为事件。 时间戳信息 timestamp() 返回消息的时间戳。时间戳可以由生产者在发送消息时指定也可以由 Kafka 自动生成。时间戳可以用于基于时间的消息处理和查询。例如可以根据时间戳来查询特定时间段内的消息或者对消息进行时间序列分析。 headers headers() 返回一个Headers对象其中包含了消息的头部信息。消息头部提供了一种在消息中添加额外元数据的方式。这些元数据可以用于传递特定的业务信息或者用于消息的路由和处理。例如可以在头部添加消息的来源系统、消息的类型、处理优先级等信息。消费者可以通过读取头部信息来进行更加灵活的消息处理。 同样的操作,也可以在获取方法的参数内传入ConsumerRecord,从中获取需要的信息.操作如下所示: KafkaListener(groupId rj, topics Constants.Kafka.TOPIC_NAME)public void listen2(ConsumerRecordString, String record, String msg, Header(KafkaHeaders.RECEIVED_TOPIC) String topic){log.info(receive record: {}, record);log.info(receive msg: {}, topic: {}, msg, topic);}日志输出: 2024-09-24T20:34:47.75908:00 INFO 11296 --- [rj-spring-kafka-demo] [ntainer#0-0-C-1] com.rj.kafka.listener.UserListener : receive record: ConsumerRecord(topic rj-topic, partition 0, leaderEpoch 0, offset 10, CreateTime 1727181287120, serialized key size 2, serialized value size 15, headers RecordHeaders(headers [], isReadOnly false), key rj, value hello, message!) 2024-09-24T20:34:47.75908:00 INFO 11296 --- [rj-spring-kafka-demo] [ntainer#0-0-C-1] com.rj.kafka.listener.UserListener : receive msg: hello, message!, topic: rj-topic2.4 Acknowledgment 在 Kafka 中Acknowledgment确认机制主要用于确保消息被正确处理和持久化以保证数据的可靠性。 在 Kafka 生产者发送消息和消费者处理消息的过程中确认机制起着关键作用。它确保消息在不同阶段的成功处理从而提高系统的可靠性和数据的完整性。如果没有适当的确认机制可能会出现消息丢失或重复处理的情况这在许多关键业务场景中是不可接受的。 生产者端的确认机制 acks acks是生产者的一个重要配置参数用于控制生产者在发送消息后等待多少个副本的确认。 设置acks0, 表示 生产者在将消息发送到 Kafka 服务器后不会等待任何确认。这种设置提供了最低的延迟但也意味着如果在消息发送后但在 Kafka 服务器成功存储之前发生故障消息可能会丢失。适用于对延迟要求极高且可以容忍消息丢失的场景例如实时日志收集其中丢失少量日志不会对系统产生重大影响。设置acks1表示只要领导者副本成功接收到消息并写入磁盘生产者就会认为消息发送成功。这种设置提供了较低的延迟但如果领导者副本在确认后出现故障可能会导致消息丢失。如果领导者副本在确认后但在其他副本同步之前出现故障可能会导致消息丢失。适用于对延迟有一定要求但也需要一定程度可靠性的场景例如一些实时数据分析系统其中少量数据丢失可以通过后续的数据处理进行补偿。设置acksall或acks-1表示生产者需要等待所有同步副本都成功接收到消息并写入磁盘后才认为消息发送成功。这提供了最高的可靠性但会增加延迟因为生产者需要等待多个副本的确认。确保了即使领导者副本出现故障消息也不会丢失因为其他同步副本中仍然存在该消息。适用于对数据可靠性要求极高的场景例如金融交易系统或关键业务数据的处理。 消费者端的确认机制 自动提交和手动提交 Kafka 消费者可以选择自动提交偏移量或手动提交偏移量。自动提交消费者会定期自动提交已经消费的消息的偏移量。这种方式比较方便但可能会导致在处理消息过程中出现故障时已经消费的消息被重复处理。例如如果消费者在自动提交偏移量后但在完成消息处理之前出现故障重新启动后会从上次提交的偏移量开始消费导致已经处理过的消息被再次处理。手动提交消费者可以在处理完一批消息后手动提交偏移量。这提供了更精细的控制可以确保在消息被正确处理后才提交偏移量避免重复处理。手动提交可以在代码中通过调用commitSync()同步提交或commitAsync()异步提交方法来实现。 提交偏移量的时机 在手动提交时需要谨慎选择提交偏移量的时机。一般来说可以在确认消息已经被成功处理并持久化到外部系统如果需要后再提交偏移量。例如在一个数据处理管道中消费者从 Kafka 读取消息进行数据转换和存储到数据库中。只有在数据库存储成功后才提交偏移量以确保消息不会被重复处理。 确认机制与性能的权衡 延迟和吞吐量 较高的确认级别如acksall和手动提交偏移量通常会增加延迟因为生产者和消费者需要等待更多的确认。这可能会影响系统的整体性能和吞吐量。在设计系统时需要根据具体的业务需求和性能要求来权衡可靠性和性能。如果业务对数据的可靠性要求非常高可以选择更严格的确认机制但可能需要接受较低的吞吐量和较高的延迟。如果性能是关键因素可以适当降低确认级别或使用自动提交偏移量但需要注意可能出现的数据丢失和重复处理的风险。 在接口 Acknowledgment当中的acknowledge()十分重要, 这个方法用于消费者确认已经成功处理了一条消息。当消费者调用这个方法时Kafka 会记录该消费者对特定消息的处理确认并且可以根据配置决定是否更新消费者的偏移量。 使用场景: 在手动提交偏移量的情况下消费者通常在确认消息已经被成功处理后调用这个方法。例如在一个数据处理管道中消费者从 Kafka 读取消息进行数据转换和存储到数据库中。只有在数据库存储成功后才调用acknowledge()方法来确认消息的处理。正确使用这个方法可以确保消息不会被重复处理同时也可以保证在出现故障时能够正确地恢复处理进度。如果消费者在处理消息后没有正确地确认可能会导致消息被重复处理或者在消费者出现故障后无法正确地恢复处理进度。 KafkaListener(groupId rj, topics Constants.Kafka.TOPIC_NAME)public void listen2(ConsumerRecordString, String record, String msg, Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Acknowledgment acknowledgment){log.info(receive record: {}, record);log.info(receive msg: {}, topic: {}, msg, topic);// 处理各种各样的业务逻辑之后,再进行消息确认.acknowledgment.acknowledge();}三、总结 本文对Kafka的消息发送、接收当中常用的功能做了一些较为详细的分析.这些在实际开发当中较为常用. 下一篇着重介绍一下kafka当中有关集群的知识.
http://www.hkea.cn/news/14261892/

相关文章:

  • 买一个成品网站多少钱阿里云注销网站
  • 东莞设计网站建设方案电子商务网站的开发方式
  • 单位网站建设维护论文企业网页建设公司费用怎么收
  • 网站建设 外包是什么意思网站建设软件下载
  • 水果门户网站建设加强公司门户网站建设方案
  • 占酷设计网站官网入口the7 wordpress哪个好
  • 西安自助建站系统宣传页免费模板
  • 图书网站建设费用明细网站制作 常州
  • 设计师接单网站微软手机做网站服务器吗
  • 关于网站建设的问题鹤壁建设企业网站公司
  • 河南金城建设工程有限公司网站网站建设推销话术
  • 做镜像网站wordpress 自定义js
  • 网站建设需要提供功能目录吗免费wordpress主机
  • 建设网站企业哪家好漯河企业网站建设
  • 厦门电商网站建设有口碑的网站建设
  • 网站开发技术大学教材宁德市人社局
  • 网站的特效代码微信最火公众号排行
  • 无锡做网站优化价格上海软件系统开发公司
  • 网站建设与管理专业课程网站建立费用多少钱
  • 墓地网站建设价格媒体平台化
  • 网站搭建的费用嵌入式软件开发前景怎么样
  • 免费个人博客建站网站优化外包顾问
  • 做信息网站的盈利方式有哪些开发新闻类网站
  • 做网站的技术体系安装ss和wordpress
  • 一 网站开发背景做盗版漫画网站
  • 网站建网站建设seo帮帮您温州seo排名优化
  • 做网站的服务器用什么 系统好人人开发app
  • 网站集约化建设工作汇报做网站和网页
  • 网站建设演示pptwordpress 外企模板
  • 长沙做信息seo网站淄博企业网站