做网站如何突出网站特色,wordpress slider pro,网址导航下载到桌面,老河口网站设计前言
使用kafkaStream进行流式计算时#xff0c;如果需要对数据进行状态处理#xff0c;那么常用的会遇到kafkaStream的store#xff0c;而store也有Local Store以及Global Store#xff0c;当然也可以使用其他方案的来进行状态保存#xff0c;文本主要理清楚kafkaStream…前言
使用kafkaStream进行流式计算时如果需要对数据进行状态处理那么常用的会遇到kafkaStream的store而store也有Local Store以及Global Store当然也可以使用其他方案的来进行状态保存文本主要理清楚kafkaStream中的Local Store以及Global Store之间的区别和用法以及什么时候选择何种store和当store无法满足我们需求时应该如何使用其他方案来进行数据的状态保存
本文所有方法和代码皆只针对kafka-streams的3.7.0版本pom如下
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-streams/artifactIdversion3.7.0/version
/dependency由于不同版本的KafkaStream在使用上有较大区别也因为KafkaStream不同版本API改动较大所以如果版本不一致使用方法甚至是一些核心概念都会跟本文讲述有所出入并且KafkaStream由于相对小众文档也很少官网的文档也只是一些简单介绍所以需要注意避坑
Local Store和Global Store的共同点和区别点
共同点
1、都是用于流式计算中进行状态存储的
2、具体结构类似使用的都是如KeyValueStoreSessionStore等类
3、实际机制类似会通过内存、本地目录和kafka Topic的变更记录等方式来进行缓存数据更新和恢复
不同点
1、适用场景不同
Local Store 适合用于单个实例的状态管理适合处理单个分区的数据并且缓存数据不会多个实例共享
Global Store 适用于跨实例共享数据状态多个实例通过Topic中的更新记录来跟新进程中的数据
2、使用方法不同
Local Store 可以直接在代码中调用对应类型存储如KeyValueStore的put方法进行更新数据不需要考虑数据一致性因为可见性只有单个实例
Global Store 不能直接调用对应的put和delete方法所有更新和删除缓存都需要通过发送数据到Global 配置的topic中然后自行实现Topic数据消费者实现org.apache.kafka.streams.processor.api.Processor类在消费者类中进行数据更新等操作同时因为需要自己实现更新实例中的数据逻辑数据一致性也需要开发者自行处理虽然正常来说利用Kafka本身的特性很少出现数据一致性问题但是如果多实例之间性能差异和网络环境等差异容易将数据不一致的时长延长如果要求Store一致性强且容忍数据不一致时限短则需要注意考虑Store更新数据消费者的处理能力
3、扩展性
Local Store可以通过增加输入主题的分区数来扩展处理能力但每个实例仍然独立运行。
Global Store需要在多个实例之间共享状态因此在设计时需要考虑如何高效地管理和同步状态。
常见的Store 类型
org.apache.kafka.streams.state.KeyValueStore
org.apache.kafka.streams.state.SessionStore
org.apache.kafka.streams.state.TimestampedKeyValueStore
org.apache.kafka.streams.state.VersionedKeyValueStore
org.apache.kafka.streams.state.WindowStore需要根据实际使用场景选择合适的状态存储类
用法
Local Store
第一步先生成对应类型的StoreBuilder对象如我需要用KeyValueStore,然后状态存储的名字是testLocalStore这个名字不能重复因为会根据消费者id加储存名称创建对应的Topic当然如果是不同的KafkaStream程序消费者id不一致那么重复就没有关系了,因为是KeyValue类型的储存所以需要设定对应的Key和Value数据的序列化对象具体代码如下
StoreBuilderKeyValueStoreString, String kvBuilder Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(testLocalStore), Serdes.String(), Serdes.String());其中Stores.persistentKeyValueStore代表的我得存储是持久化的正常都是会用持久化当然也有存储一些不重要或者程序重启丢失也无所谓的状态数据可以使用Stores.inMemoryKeyValueStore以及基于LRU淘汰机制的储存Stores.lruMap第二个参数Serdes.String()代表存储数据的key是字符串第三个参数同理如果是要存储一些对象也可以使用自定义的序列化类实现
org.apache.kafka.common.serialization.Serializer序列化类以及反序列化类
org.apache.kafka.common.serialization.Deserializer然后定义好即可如
new Serdes.WrapperSerde(new KryoSerializer(TestStoreBean.class),new KryoDeserializer(TestStoreBean.class)其中KryoSerializer和KryoDeserializer是我自定义的使用Kryo序列化Java对象的类TestStoreBean是我保存的状态的数据封装bean
KryoSerializer代码如下
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Output;
import org.apache.kafka.common.serialization.Serializer;import java.io.ByteArrayOutputStream;/*** kryo序列化类* author Raye* since 2024-6-4*/
public class KryoSerializerT implements SerializerT {private static final ThreadLocalKryo KRYO_LOCAL new ThreadLocalKryo() {Overrideprotected Kryo initialValue() {Kryo kryo new Kryo();/*** 不要轻易改变这里的配置更改之后序列化的格式就会发生变化* 上线的同时就必须清除 Redis 里的所有缓存* 否则那些缓存再回来反序列化的时候就会报错*///支持对象循环引用否则会栈溢出kryo.setReferences(true); //默认值就是 true添加此行的目的是为了提醒维护者不要改变这个配置//不强制要求注册类注册行为无法保证多个 JVM 内同一个类的注册编号相同而且业务系统中大量的 Class 也难以一一注册kryo.setRegistrationRequired(false); //默认值就是 false添加此行的目的是为了提醒维护者不要改变这个配置return kryo;}};/*** 获得当前线程的 Kryo 实例** return 当前线程的 Kryo 实例*/public static Kryo getInstance() {return KRYO_LOCAL.get();}private ClassT clz;public KryoSerializer(ClassT clz) {this.clz clz;}Overridepublic byte[] serialize(String s, T t) {if(t null){return null;}ByteArrayOutputStream byteArrayOutputStream new ByteArrayOutputStream();Output output new Output(byteArrayOutputStream);Kryo kryo getInstance();kryo.writeObjectOrNull(output, t,clz);output.flush();return byteArrayOutputStream.toByteArray();}
}
KryoDeserializer代码如下
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import org.apache.kafka.common.serialization.Deserializer;import java.io.ByteArrayInputStream;/*** kryo反序列化类* author Raye* since 2024-6-4*/
public class KryoDeserializerT implements DeserializerT {private static final ThreadLocalKryo KRYO_LOCAL new ThreadLocalKryo() {Overrideprotected Kryo initialValue() {Kryo kryo new Kryo();/*** 不要轻易改变这里的配置更改之后序列化的格式就会发生变化* 上线的同时就必须清除 Redis 里的所有缓存* 否则那些缓存再回来反序列化的时候就会报错*///支持对象循环引用否则会栈溢出kryo.setReferences(true); //默认值就是 true添加此行的目的是为了提醒维护者不要改变这个配置//不强制要求注册类注册行为无法保证多个 JVM 内同一个类的注册编号相同而且业务系统中大量的 Class 也难以一一注册kryo.setRegistrationRequired(false); //默认值就是 false添加此行的目的是为了提醒维护者不要改变这个配置return kryo;}};/*** 获得当前线程的 Kryo 实例** return 当前线程的 Kryo 实例*/public static Kryo getInstance() {return KRYO_LOCAL.get();}private ClassT clz;public KryoDeserializer(ClassT clz) {this.clz clz;}Overridepublic T deserialize(String s, byte[] bytes) {if(bytes null){return null;}ByteArrayInputStream byteArrayInputStream new ByteArrayInputStream(bytes);Input input new Input(byteArrayInputStream);Kryo kryo getInstance();try {return kryo.readObjectOrNull(input, clz);}catch (Exception e){e.printStackTrace();}return null;}
}同理使用LocalStore时可以将代码替换成以下内容
StoreBuilderKeyValueStoreString, TestStoreBean kvBuilder Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(testLocalStore), Serdes.String(), new Serdes.WrapperSerde(new KryoSerializer(TestStoreBean.class),new KryoDeserializer(TestStoreBean.class));有了StoreBuilder对象之后直接在StreamsBuilder对象中添加即可
streamsBuilder.addStateStore(kvBuilder);需要使用时先在处理数据的Processor类中的init方法获取对应的状态存储对象
this.testLocalStore context.getStateStore(testLocalStore);然后就可以在process方法中调用testLocalStore的get、put、delete等方法操作状态存储数据了具体代码如下 Slf4jpublic static class StreamProcessor implements ProcessorString,String,String,String {private KeyValueStoreString,String testLocalStore;private ProcessorContext context;private String toTopic;Overridepublic void init(ProcessorContext context) {this.context context;this.testLocalStore context.getStateStore(testLocalStore);}public StreamProcessor(String toTopic) {this.toTopic toTopic;}Overridepublic void process(RecordString, String record) {testLocalStore.put(key1,testValue1);log.info(testLocalStore key1 : {},testLocalStore.get(key1));testLocalStore.delete(key1);context.forward(record,toTopic);}}其中实现的Processor类全称是org.apache.kafka.streams.processor.api.Processor上面代码只是在数据处理流程中简单保存了数据然后获取出来以及删除没有对流数据做任何处理就直接发送到输出的topic了
完整代码如下 Beanpublic KStreamString,String kStreamTestStore(StreamsBuilder streamsBuilder){log.info(init kStreamTestStore);StoreBuilderKeyValueStoreString, String kvBuilder Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(testLocalStore), Serdes.String(), Serdes.String());streamsBuilder.addStateStore(kvBuilder);KStreamString, String stream streamsBuilder.stream(fromTopic);stream.process(()-new StreamProcessor(toTopic), Named.as(fromTopic),testLocalStore);streamsBuilder.build().addSink(toTopic,toTopic,fromTopic);return stream;}注意由于使用Store需要通过ProcessorContext对象来获取Store对象所以在KafkaStream常用的一些map,mapValue,flatMapValues这些流式计算方法中是没办法使用的只能在一些更底层的Api中去使用如process
Global Store
同Local Store一样需要先生成对应类型的StoreBuilder对象代码跟Local Store一样
StoreBuilderKeyValueStoreString, String kvBuilder Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(testGlobalStore), Serdes.String(), Serdes.String());然后定义处理状态更新日志的Processor类在这个类中可以对缓存数据进行更新和删除操作其他地方都是不能直接修改Global Store的
public class GlobalStoreHandleProcessorK, V implements ProcessorK,V,Void,Void {private KeyValueStoreK, V store;private String storeName;public GlobalStoreHandleProcessor(String storeName) {this.storeName storeName;}Overridepublic void process(RecordK,V record) {if(record null || record.value() null) {return;}store.put(record.key(), record.value());}Overridepublic void init(ProcessorContext context) {this.store context.getStateStore(storeName);}}跟KafkaStream的process是一样的只需要在process方法中对缓存进行更新或者删除操作即可我这里只是简单put操作具体逻辑可以根据自己情况进行处理
在StreamsBuilder对象中添加StoreBuilder对象
streamsBuilder.addGlobalStore(kvBuilder,testGlobalStore, Consumed.with(Serdes.String(),Serdes.String()),()-new GlobalStoreHandleProcessor(testGlobalStore));其中第二个参数testGlobalStore是Global Store绑定的数据变更记录的Topic如果要更新则需要通过向这个topic发送数据来进行更新Global Store中的数据
处理数据的Processor类实例代码
public static class StreamProcessor implements ProcessorString,String,String,String {private KeyValueStoreString,String testGlobalStore;private ProcessorContext context;private String toTopic;Overridepublic void init(ProcessorContext context) {this.context context;this.testGlobalStore context.getStateStore(testGlobalStore);}public StreamProcessor(String toTopic) {this.toTopic toTopic;}Overridepublic void process(RecordString, String record) {testLocalStore.put(jsonObject.getString(key),jsonObject.getString(value));log.info(testLocalStore key1 : {},testGlobalStore.get(key1));//发送更新Global Store的数据context.forward(new Record(testGlobalKey,global value,record.timestamp()),testGlobalStore);context.forward(record,toTopic);}}与Local Store不同的是不能在处理数据流的时候对缓存进行put操作只能通过将数据发送到Global Store关联的topic中在GlobalStoreHandleProcessor中去做更新
完整代码如下 Beanpublic KStreamString,String kStreamTestStore(StreamsBuilder streamsBuilder){log.info(init kStreamTestStore);StoreBuilderKeyValueStoreString, String kvBuilder Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(testGlobalStore), Serdes.String(), Serdes.String());streamsBuilder.addGlobalStore(kvBuilder,testGlobalStore, Consumed.with(Serdes.String(),Serdes.String()),()-new GlobalStoreHandleProcessor(testGlobalStore));KStreamString, String stream streamsBuilder.stream(fromTopic);stream.process(()-new StreamProcessor(toTopic), Named.as(fromTopic));streamsBuilder.build().addSink(toTopic,toTopic,fromTopic);streamsBuilder.build().addSink(testGlobalStore,testGlobalStore,fromTopic);return stream;}与Local Store不同点在于不需要在process方法中添加store的名字但是因为要从process方法中直接将更新Store的数据发送到topic所以需要添加一个Global Store绑定的Topic的输出扩展也就是下面这行代码
streamsBuilder.build().addSink(testGlobalStore,testGlobalStore,fromTopic);不适合的场景
由于KafkaStream Store 没有自动过期数据和过期数据自动删除的概率可能是有但是我没有找到对应文档所以如果我们存储的key集合特别大并且需要自动过期和自动删除那么就不适合使用Store来处理了因为需要我们自行处理删除逻辑尤其是有些场景中并不会对过期的key进行访问所以采用惰性删除基本上不现实但是定时删除因为Store会存储到磁盘如果存储的key很多删除对应数据的时候耗时很长尤其是单次删除大量key的时候可能会直接超时并且还必须要自己处理定时删除的逻辑想要更好的去删除就需要大量时间去开发和优化。
虽然使用内存的Store能稍微好点但是毕竟单个进程内存有限并且正常流处理中如果需要保存状态那么肯定是希望进程重启之后能恢复数据避免计算出错的所以如果是有大量不重复key并且数据需要到期自动删除的话可以直接使用Redis做状态存储并且进过我得实际测试使用Redis并不比Store慢并且在key量越来越大的情况下Redis的性能是完全优于Store的只针对持久化的Store当然使用Redis还是会更使用Global Store一样需要考虑数据一致性的问题不过这个问题可以通过将相同key的数据从Kafka Topic就分配到同一个Topic分区中来避免