网站功能建设中,WordPress使用微博外链,邹城有做网站的吗,张家界旅游网站响应式流是什么#xff1f; 响应式流旨在为无阻塞异步流处理提供一个标准。它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者#xff0c;而不需要发布者阻塞#xff0c;或订阅者有无限制的缓冲区或丢弃。 响应式流模型存在两种基本的实现机制。一种就是传统…响应式流是什么 响应式流旨在为无阻塞异步流处理提供一个标准。它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者而不需要发布者阻塞或订阅者有无限制的缓冲区或丢弃。 响应式流模型存在两种基本的实现机制。一种就是传统开发模式下的“拉”模式即消费者主动从生产者拉取元素而另一种就是“推”模式在这种模式下生产者将元素推送给消费者。相较于“拉”模式“推”模式下的数据处理的资源利用率更好下图所示的就是一种典型的推模式处理流程。 数据流的生产者会持续地生成数据并推送给消费者。这里就引出了流量控制问题即如果数据的生产者和消费者处理数据的速度是不一致的我们应该如何确保系统的稳定性呢
流量控制
生产者生产数据的速率小于消费者的场景 这种场景对于消费者来说没啥压力正常消费就好了这里也就不需要所谓的流量控制了。
生产者生产数据的速率大于消费者的场景 生产者生产数据的速率大于消费者的场景应该是我们业务中经常遇到的场景了这种场景由于消费者处理不过来导致崩溃业界通常的做法是在生产者与消费者之间加一个队列做缓冲。我们知道队列具有存储与转发的功能所以可以用它来进行一定的流量控制。 如何对于流量进行很好的控制这就转变到了如何设计好一个队列了目前 Java 业界主流的队列有以下三种
无界队列
界队列在原则上是拥有无线大小容量的队列可以存放生产者产生的所有消息。 优势确保消费者消费到所有的数据 劣势系统的回弹性降低任何一个系统不可能拥有无限的资源一旦内存等资源耗尽系统就可能会有崩溃的风险。
有界丢弃队列 为了避免上面无界队列的弊端有界丢弃队列采用的是如果队列满了就会采用丢弃后面传入的值这里可以设置一些丢弃策略比如说按照优先级或先进先出等。 优势考虑到资源的限制适合允许丢消息的业务场景。 劣势消息重要性很高的场景不建议采取这种队列
有界阻塞队列 像一些支付金融级别的场景是不允许丢数据的所以我们引出有界阻塞队列我们会在队列消息数量达到上限后阻塞生产者而不是直接丢弃消息。 优势解决了不允许丢数据的业务场景 劣势当队列满了的时候会阻塞生产者停止生产数据这种场景不可能实现异步操作的。
所以无论从回弹性、弹性还是即时响应性出发上述的队列都不是响应式流的上佳解决办法。
背压机制 上面说的那几种队列纯“推”模式下的数据流量会有很多不可控制的因素并不能直接应用而是需要在“推”模式和“拉”模式之间考虑一定的平衡性从而优雅地实现流量控制。这就需要引出响应式系统中非常重要的一个概念——背压机制Backpressure。 什么是背压简单来说就是下游能够向上游反馈流量请求的机制。通过前面的分析我们知道如果消费者消费数据的速度赶不上生产者生产数据的速度时它就会持续消耗系统的资源直到这些资源被消耗殆尽。 这个时候就需要有一种机制使得消费者可以根据自身当前的处理能力通知生产者来调整生产数据的速度这种机制就是背压。采用背压机制消费者会根据自身的处理能力来请求数据而生产者也会根据消费者的能力来生产数据从而在两者之间达成一种动态的平衡确保系统的即时响应性。
响应式流规范 有了背压机制我们再来看下响应式流是如何基于这种机制去设计的一套规范规范详情请参考Reactive Streams
Java API 的响应式流只定义了四个核心接口 PublisherT SubscriberT Subscription ProcessorT,R
publisherT
Publisher 代表的就是一种可以生产无限数据的发布者接口如下
public interface PublisherT {public void subscribe(Subscriber? super T s);
} 可以看到Publisher 里的 subscribe 方法传入的是 Subscriber 接口其实这里用的是回调Publisher 根据收到的请求向当前订阅者 Subscriber 发送元素。
SubscriberT
Subscriber 代表的是一种可以从发布者那里订阅并接收元素的订阅者接口如下
public interface SubscriberT {public void onSubscribe(Subscription s);public void onNext(T t);public void onError(Throwable t);public void onComplete();
} Subscriber 接口定义的这组方法构成了数据流请求和处理的基本流程其中onSubscribe() 从命名上看就是一个回调方法当发布者的 subscribe() 方法被调用时就会触发这个回调。而在该方法中有一个参数 Subscription可以把这个 Subscription 看作是一种用于订阅的上下文对象。Subscription 对象中包含了这次回调中订阅者想要向发布者请求的数据个数。 当订阅关系已经建立那么发布者就可以调用订阅者的 onNext() 方法向订阅者发送一个数据。这个过程是持续不断的直到所发送的数据已经达到 Subscription 对象中所请求的数据个数。这时候 onComplete() 方法就会被触发代表这个数据流已经全部发送结束。而一旦在这个过程中出现了异常那么就会触发 onError() 方法我们可以通过这个方法捕获到具体的异常信息进行处理而数据流也就自动终止了。
Subscription
Subscription 代表的就是一种订阅上下文对象它在订阅者和发布者之间进行传输从而在两者之间形成一种契约关系接口如下
public interface Subscription {public void request(long n);public void cancel();
}
这里的 request() 方法用于请求 n 个元素订阅者可以通过不断调用该方法来向发布者请求数据而 cancel() 方法显然是用来取消这次订阅。请注意Subscription 对象是确保生产者和消费者针对数据处理速度达成一种动态平衡的基础也是流量控制中实现背压机制的关键所在。 ProcessorT,R
Processor 代表的就是订阅者和发布者的处理阶段Processor 接口继承了 Publisher 和 Subscriber 接口。它用于转换发布者——订阅者管道中的元素。Processor订阅类型 T 的数据元素接收并转换为类型 R 的数据并发布变换后的数据。接口如下
public interface ProcessorT,R extends SubscriberT, PublisherR {}
图显示了处理者在发布者——订阅和管道中作为转换器的作用可以拥有多个处理者。 总结 响应式流规范定义的很简洁但实现起来并不简单发布者和订阅者之间的所有交互的异步性质以及背压机制使得实现变得复杂。 响应式流规范非常灵活还可以提供独立的“推”模型和“拉”模型。如果为了实现纯“推”模型我们可以考虑一次请求足够多的元素而对于纯“拉”模型相当于就是在每次调用 Subscriber 的 onNext() 方法时只请求一个新元素。 JDK 9 中提供了 Flow 响应式流接口与响应式流兼容的接口可以看得出JDK 团队后续的发展趋势也是想往响应式流这块靠近。