域名与网站名称的关系,抚州营销型网站建设,麻章手机网站建设公司,厦门旅游网站设计目录 一、前置知识1、Lambda表达式2、函数式接口 Function3、StreamAPI4、Reactive-Stream1#xff09;几个实际的问题2#xff09;Reactive-Stream是什么#xff1f;3#xff09;核心接口4#xff09;处理器 Processor5#xff09;总结 二、Reactor核心1、Reactor1… 目录 一、前置知识1、Lambda表达式2、函数式接口 Function3、StreamAPI4、Reactive-Stream1几个实际的问题2Reactive-Stream是什么3核心接口4处理器 Processor5总结 二、Reactor核心1、Reactor1介绍2响应式编程3Reactor核心特性1、Mono和Flux2、subscribe()3、流的取消4、BaseSubscriber5、背压Backpressure 和请求重塑Reshape Requests6、以编程的方式创建队列7、handle()的使用8、自定义线程调度 课程内容
一、前置知识
1、Lambda表达式
interface MyInterface {int sum(int i, int j);
}interface MyHaha {int haha();default int heihei() {return 2;}; //默认实现
}FunctionalInterface //检查注解帮我们快速检查我们写的接口是否函数式接口
interface MyHehe {int hehe(int i);}/*** lambda简化函数式接口实例创建** param args*/public static void aaaa(String[] args) {//1、自己创建实现类对象MyInterface myInterface new MyInterfaceImpl();System.out.println(myInterface.sum(1, 2));//2、创建匿名实现类MyInterface myInterface1 new MyInterface() {Overridepublic int sum(int i, int j) {return i * i j * j;}};
// System.out.println(myInterface1.sum(2, 3));//冗余写法//3、lambda表达式:语法糖 参数列表 箭头 方法体MyInterface myInterface2 (x, y) - {return x * x y * y;};System.out.println(myInterface2.sum(2, 3));}//参数位置最少情况MyHaha myHaha () - {return 1;};MyHehe myHehe y - {return y * y;};MyHehe hehe2 y - y - 1;//总结//1)、参数类型可以不写只写(参数名)参数变量名随意定义;// 参数表最少可以只有一个 ()或者只有一个参数名//2、方法体如果只有一句话{} 可以省略2、函数式接口 Function
接口中有且只有一个未实现的方法这个接口就叫做函数式接口
函数式接口的出入参定义 1、有入参无出参【消费者】BiConsumer BiConsumerString,String function (a,b)-{ //能接受两个入参System.out.println(哈哈a呵呵b);};function.accept(1,2);2、有入参有出参【多功能函数】 Function
FunctionString,Integer function (String x) - Integer.parseInt(x);System.out.println(function.apply(2));3、无入参无出参【普通函数】
Runnable runnable () - System.out.println(aaa);
new Thread(runnable).start();4、无入参 有出参【提供者】 supplier
SupplierString supplier ()- UUID.randomUUID().toString();
String s supplier.get();
System.out.println(s);java.util.function包下的所有function定义 ● Consumer 消费者 ● function 功能函数 ● Supplier 提供者 ● Predicate 断言 get/test/apply/accept调用的函数方法
3、StreamAPI
中间操作Intermediate Operations
filter过滤 挑出我们用的元素map 映射 一一映射a 变成 b mapToInt、mapToLong、mapToDouble flatMap一对多映射
filter、 map、mapToInt、mapToLong、mapToDouble flatMap、flatMapToInt、flatMapToLong、flatMapToDouble mapMulti、mapMultiToInt、mapMultiToLong、mapMultiToDouble、 parallel、unordered、onClose、sequential distinct、sorted、peek、limit、skip、takeWhile、dropWhile、
终止操作Terminal Operation forEach、forEachOrdered、toArray、reduce、collect、toList、min、 max、count、anyMatch、allMatch、noneMatch、findFirst、findAny、iterator 4、Reactive-Stream
1几个实际的问题
当请求量巨大的时候tomcat会被压垮此时就需要采取背压的策略让tomcat根据自己的能力主动去消费请求。
服务器核心数固定时多线程情况下线程越多越好吗 当核心数固定时线程并不是越多越好操作系统是分时间片执行任务的当线程越多时线程间的切换就越频繁导致cpu性能消耗越多。
2Reactive-Stream是什么
Reactive-Streams 是一个标准规范定义了异步数据流处理的 API 和行为规则专注于解决异步流式数据的**背压Backpressure**问题。
主要特性
异步通过非阻塞方式处理数据流。流式处理支持连续数据流的逐步消费避免一次性加载大量数据。背压机制允许消费者控制生产者的速率防止消费者被超量数据淹没。非阻塞在处理数据时不阻塞线程提高资源利用率。
背压机制Backpressure Reactive-Streams 的核心之一是通过 Subscription 提供背压支持。
消费者可以通过 request(n) 方法控制生产者的生产速率。 如果消费者处理能力不足可以减少请求数据量避免内存溢出或阻塞。
使用场景 事件流处理如消息队列、用户事件。 高性能网络请求如 RESTful API、WebSocket。 大数据流处理需要逐步消费大规模数据的场景。 异步系统集成将不同系统间的数据流通过异步方式连接起来。 3核心接口
Publisher发布者产生数据流Subscriber订阅者 消费数据流Subscription订阅关系 订阅关系是发布者和订阅者之间的关键接口。订阅者通过订阅来表示对发布者产生的数据的兴趣。订阅者可以请求一定数量的元素也可以取消订阅。Processor处理器 处理器是同时实现了发布者和订阅者接口的组件。它可以接收来自一个发布者的数据进行处理并将结果发布给下一个订阅者。处理器在Reactor中充当中间环节代表一个处理阶段允许你在数据流中进行转换、过滤和其他操作。
【扩展】 以前的编程模型是命令式编程 过程编程全自定义 流式编程是响应式|声明式编程说清楚要干什么最终结果要怎么样
public class MyFlowDemo {public static void main(String[] args) {// 1、定义一个发布者发布数据SubmissionPublisherString publisher new SubmissionPublisher();//3、定义一个订阅者 订阅者感兴趣发布者的数据Flow.SubscriberString subscriber new Flow.SubscriberString() {private Flow.Subscription subscription;Override //在订阅时 onXxxx在xxx事件发生时执行这个回调public void onSubscribe(Flow.Subscription subscription) {System.out.println(Thread.currentThread() 订阅开始了 subscription);this.subscription subscription;//从上游请求一个数据subscription.request(1);}Override //在下一个元素到达时 执行这个回调 接受到新数据public void onNext(String item) {System.out.println(Thread.currentThread() 订阅者接受到数据 item);//从上游请求一个数据subscription.request(1);}Override //在错误发生时public void onError(Throwable throwable) {System.out.println(Thread.currentThread() 订阅者接受到错误信号 throwable);}Override //在完成时public void onComplete() {System.out.println(Thread.currentThread() 订阅者接受到完成信号);}};publisher.subscribe(subscriber);for (int i 0; i 10; i) {publisher.submit(p- i);}try {Thread.sleep(20000);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}4处理器 Processor public class FlowDemo {//定义流中间操作处理器 只用写订阅者的接口static class MyProcessor extends SubmissionPublisherString implements Flow.ProcessorString,String {private Flow.Subscription subscription; //保存绑定关系Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(processor订阅绑定完成);this.subscription subscription;subscription.request(1); //找上游要一个数据}Override //数据到达触发这个回调public void onNext(String item) {System.out.println(processor拿到数据item);//再加工item 哈哈;submit(item);//把我加工后的数据发出去subscription.request(1); //再要新数据}Overridepublic void onError(Throwable throwable) {}Overridepublic void onComplete() {}}/*** 1、Publisher发布者* 2、Subscriber订阅者* 3、Subscription 订阅关系* 4、Processor 处理器* param args*///发布订阅模型观察者模式public static void main(String[] args) throws InterruptedException {//1、定义一个发布者 发布数据SubmissionPublisherString publisher new SubmissionPublisher();//2、定一个中间操作 给每个元素加个 哈哈 前缀MyProcessor myProcessor1 new MyProcessor();//3、定义一个订阅者 订阅者感兴趣发布者的数据Flow.SubscriberString subscriber new Flow.SubscriberString() {private Flow.Subscription subscription;Override //在订阅时 onXxxx在xxx事件发生时执行这个回调public void onSubscribe(Flow.Subscription subscription) {System.out.println(Thread.currentThread()订阅开始了subscription);this.subscription subscription;//从上游请求一个数据subscription.request(1);}Override //在下一个元素到达时 执行这个回调 接受到新数据public void onNext(String item) {System.out.println(Thread.currentThread()订阅者接受到数据item);if(item.equals(p-7)){subscription.cancel(); //取消订阅}else {subscription.request(1);}}Override //在错误发生时public void onError(Throwable throwable) {System.out.println(Thread.currentThread()订阅者接受到错误信号throwable);}Override //在完成时public void onComplete() {System.out.println(Thread.currentThread()订阅者接受到完成信号);}};//4、绑定发布者和订阅者publisher.subscribe(myProcessor1); //此时处理器相当于订阅者myProcessor1.subscribe(subscriber); //此时处理器相当于发布者//绑定操作就是发布者记住了所有订阅者都有谁有数据后给所有订阅者把数据推送过去。// publisher.subscribe(subscriber);for (int i 0; i 10; i) {//发布10条数据if(i 5){
// publisher.closeExceptionally(new RuntimeException(5555));}else {publisher.submit(p-i);}//publisher发布的所有数据在它的buffer区//中断
// publisher.closeExceptionally();}//ReactiveStream//jvm底层对于整个发布订阅关系做好了 异步缓存区处理 响应式系统//发布者通道关闭publisher.close();// publisher.subscribe(subscriber2);//发布者有数据订阅者就会拿到Thread.sleep(20000);}
}
5总结 二、Reactor核心
响应式编程
1、底层基于数据缓冲队列 消息驱动模型 异步回调机制 2、编码流式编程 链式调用 声明式API 3、效果优雅全异步 消息实时处理 高吞吐量 占用少量资源
回调机制类似于SpringBoot的事件机制在SpringBoot应用的启动过程中触发事件。
1、Reactor 1介绍
Reactor 是一个用于在JVM构建非阻塞应用的响应式编程框架
2响应式编程 3Reactor核心特性
1、Mono和Flux
Mono: 0|1 数据流 Flux: N数据流
响应式流元素内容 信号完成/异常
2、subscribe()
传递钩子函数
flux.subscribe(v- System.out.println(v v), //流元素消费throwable - System.out.println(throwable throwable), //感知异常结束()- System.out.println(流结束了...) //感知正常结束
);自定义消费者
flux.subscribe(new BaseSubscriberString() {// 生命周期钩子1 订阅关系绑定的时候触发Overrideprotected void hookOnSubscribe(Subscription subscription) {// 流被订阅的时候触发System.out.println(绑定了...subscription);//找发布者要数据request(1); //要1个数据
// requestUnbounded(); //要无限数据}Overrideprotected void hookOnNext(String value) {System.out.println(数据到达正在处理value);request(1); //要1个数据}// hookOnComplete、hookOnError 二选一执行Overrideprotected void hookOnComplete() {System.out.println(流正常结束...);}Overrideprotected void hookOnError(Throwable throwable) {System.out.println(流异常...throwable);}Overrideprotected void hookOnCancel() {System.out.println(流被取消...);}Overrideprotected void hookFinally(SignalType type) {System.out.println(最终回调...一定会被执行);}});
3、流的取消
消费者调用 cancle() 取消流的订阅 flux.subscribe(new BaseSubscriberString() {// 生命周期钩子1 订阅关系绑定的时候触发Overrideprotected void hookOnSubscribe(Subscription subscription) {// 流被订阅的时候触发System.out.println(绑定了...subscription);//找发布者要数据request(1); //要1个数据
// requestUnbounded(); //要无限数据}Overrideprotected void hookOnNext(String value) {System.out.println(数据到达正在处理value);if(value.equals(哈哈5)){cancel(); //取消流}request(1); //要1个数据}// hookOnComplete、hookOnError 二选一执行Overrideprotected void hookOnComplete() {System.out.println(流正常结束...);}Overrideprotected void hookOnError(Throwable throwable) {System.out.println(流异常...throwable);}Overrideprotected void hookOnCancel() {System.out.println(流被取消...);}Overrideprotected void hookFinally(SignalType type) {System.out.println(最终回调...一定会被执行);}});4、BaseSubscriber
自定义消费者推荐直接编写 BaseSubscriber 的逻辑
5、背压Backpressure 和请求重塑Reshape Requests
背压 request(1)
Overrideprotected void hookOnSubscribe(Subscription subscription) {request(1); //要1个数据}Overrideprotected void hookOnNext(Integer value) {request(1); //要1个数据}buffer缓冲
FluxListInteger flux Flux.range(1, 10) //原始流10个.buffer(3).log();//缓冲区缓冲3个元素: 消费一次最多可以拿到三个元素 凑满数批量发给消费者
//
// //一次发一个一个一个发
// 10元素buffer(3)消费者请求4次数据消费完成6、以编程的方式创建队列
同步环境-generate // 同步情况下创建流public void generate() {// 以编程方式创建序列
// FluxObject flux Flux.generate(sink - {
// for (int i 0; i 10; i) {
// sink.next(哈哈哈 i);
// }
// });// FluxObject flux Flux.generate(() - 0, (state, sink) - {
// sink.next(state);
// return state 1;
// });FluxObject flux Flux.generate(() - 0, (state, sink) - {if (state 5) {sink.next(state);} else {sink.complete(); // 流创建完成}return state 1;});flux.log().subscribe(System.out::println);}流可以被取消
// 可取消public static void disposable() throws IOException {FluxObject flux Flux.generate(() - 0, (state, sink) - {sink.next(state);return state 1;});// disposable可以被取消Disposable disposable flux.log().subscribe(System.out::println);new Thread(() -{try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}disposable.dispose();}).start();System.in.read();}多线程-create 异步创建flux对象
7、handle()的使用
自定义流中元素处理规则 // 测试handle// 自定义流中元素处理规则public static void handle() {Flux.range(1,10).handle((value, sink)-{System.out.println(拿到的值 value);String v 转换 value;sink.next(v);}).subscribe(System.out::println);}8、自定义线程调度
指定发布者和订阅者的处理线程 // 自定义线程调度public static void custom() {Scheduler s Schedulers.newParallel(parallel-scheduler, 4);final FluxString flux Flux.range(1, 2).map(i - 10 i).log().publishOn(s) // 改变发布者的线程
// .subscribeOn() //改变订阅者的线程.map(i - value i);//只要不指定线程池默认发布者用的线程就是订阅者的线程new Thread(() - flux.subscribe(System.out::println)).start();}