咸宁网站建设,企业管理课程,天猫分销平台,seo博客网址温馨提示#xff1a;本文需要结合上一篇 gRPC 文章一起食用#xff0c;否则可能看不懂。 前面一篇文章松哥和大家聊了 gRPC 的基本用法#xff0c;今天我们再来稍微深入一点点#xff0c;来看下 gRPC 中四种不同的通信模式。
gRPC 中四种不同的通信模式分别是#xff1a;… 温馨提示本文需要结合上一篇 gRPC 文章一起食用否则可能看不懂。 前面一篇文章松哥和大家聊了 gRPC 的基本用法今天我们再来稍微深入一点点来看下 gRPC 中四种不同的通信模式。
gRPC 中四种不同的通信模式分别是
一元 RPC服务端流 RPC客户端流 RPC双向流 RPC
接下来松哥就通过四个完整的案例来分别和向伙伴们演示这四种不同的通信模式。
1. 准备工作
关于 gRPC 的基础知识我们就不啰嗦了咱们直接来看我今天的 proto 文件如下
这次我新建了一个名为 book.proto 的文件这里主要定义了一些图书相关的方法如下
syntax proto3;option java_multiple_files true;
option java_package org.javaboy.grpc.demo;
option java_outer_classname BookServiceProto;
import google/protobuf/wrappers.proto;package book;service BookService {rpc addBook(Book) returns (google.protobuf.StringValue);rpc getBook(google.protobuf.StringValue) returns (Book);rpc searchBooks(google.protobuf.StringValue) returns (stream Book);rpc updateBooks(stream Book) returns (google.protobuf.StringValue);rpc processBooks(stream google.protobuf.StringValue) returns (stream BookSet);
}message Book {string id 1;repeated string tags 2;string name 3;float price 4;string author 5;
}message BookSet {string id 1;repeated Book bookList 3;
}这个文件中有一些内容我们在上篇文章中都讲过了讲过的我就不再重复了我说一些上篇文章没有涉及到的东西
由于我们在这个文件中引用了 Google 提供的 StringValuegoogle.protobuf.StringValue所以这个文件上面我们首先用 import 导入相关的文件导入之后才可以使用。在方法参数和返回值中出现的 stream就表示这个方法的参数或者返回值是流的形式其实就是数据可以多次传输。message 中出现了一个上篇文章没有的关键字 repeated这个表示这个字段可以重复可以简单理解为这就是我们 Java 中的数组。
好了和上篇文章相比本文主要就是这几个地方不一样。
proto 文件写好之后按照上篇文章介绍的方法进行编译生成对应的代码这里就不再重复了。
2. 一元 RPC
一元 RPC 是一种比较简单的 RPC 模式其实说白了我们上篇文章和大家介绍的就是一种一元 RPC也就是客户端发起一个请求服务端给出一个响应然后请求结束。
上面我们定义的五个方法中addBook 和 getBook 都算是一种一元 RPC。
2.1 addBook
先来看 addBook 方法这个方法的逻辑很简单我们提前在服务端准备一个 Map 用来保存 BookaddBook 调用的时候就把 book 对象存入到 Map 中并且将 book 的 ID 返回大家就这样一件事来看看服务端的代码
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private MapString, Book bookMap new HashMap();public BookServiceImpl() {Book b1 Book.newBuilder().setId(1).setName(三国演义).setAuthor(罗贯中).setPrice(30).addTags(明清小说).addTags(通俗小说).build();Book b2 Book.newBuilder().setId(2).setName(西游记).setAuthor(吴承恩).setPrice(40).addTags(志怪小说).addTags(通俗小说).build();Book b3 Book.newBuilder().setId(3).setName(水浒传).setAuthor(施耐庵).setPrice(50).addTags(明清小说).addTags(通俗小说).build();bookMap.put(1, b1);bookMap.put(2, b2);bookMap.put(3, b3);}Overridepublic void addBook(Book request, StreamObserverStringValue responseObserver) {bookMap.put(request.getId(), request);responseObserver.onNext(StringValue.newBuilder().setValue(request.getId()).build());responseObserver.onCompleted();}
}看过上篇文章的小伙伴我觉得这段代码应该很好理解。
客户端调用方式如下
public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel ManagedChannelBuilder.forAddress(localhost, 50051).usePlaintext().build();BookServiceGrpc.BookServiceStub stub BookServiceGrpc.newStub(channel);addBook(stub);}private static void addBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch new CountDownLatch(1);stub.addBook(Book.newBuilder().setPrice(99).setId(100).setName(java).setAuthor(javaboy).build(), new StreamObserverStringValue() {Overridepublic void onNext(StringValue stringValue) {System.out.println(stringValue.getValue() stringValue.getValue());}Overridepublic void onError(Throwable throwable) {}Overridepublic void onCompleted() {countDownLatch.countDown();System.out.println(添加完毕);}});countDownLatch.await();}
}这里我使用了 CountDownLatch 来实现线程等待等服务端给出响应之后客户端再结束。这里在回调的 onNext 方法中我们就可以拿到服务端的返回值。
2.2 getBook
getBook 跟上面的 addBook 类似先来看服务端代码如下
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private MapString, Book bookMap new HashMap();public BookServiceImpl() {Book b1 Book.newBuilder().setId(1).setName(三国演义).setAuthor(罗贯中).setPrice(30).addTags(明清小说).addTags(通俗小说).build();Book b2 Book.newBuilder().setId(2).setName(西游记).setAuthor(吴承恩).setPrice(40).addTags(志怪小说).addTags(通俗小说).build();Book b3 Book.newBuilder().setId(3).setName(水浒传).setAuthor(施耐庵).setPrice(50).addTags(明清小说).addTags(通俗小说).build();bookMap.put(1, b1);bookMap.put(2, b2);bookMap.put(3, b3);}Overridepublic void getBook(StringValue request, StreamObserverBook responseObserver) {String id request.getValue();Book book bookMap.get(id);if (book ! null) {responseObserver.onNext(book);responseObserver.onCompleted();} else {responseObserver.onCompleted();}}
}这个 getBook 就是根据客户端传来的 id从 Map 中查询到一个 Book 并返回。
客户端调用代码如下
public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel ManagedChannelBuilder.forAddress(localhost, 50051).usePlaintext().build();BookServiceGrpc.BookServiceStub stub BookServiceGrpc.newStub(channel);getBook(stub);}private static void getBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch new CountDownLatch(1);stub.getBook(StringValue.newBuilder().setValue(2).build(), new StreamObserverBook() {Overridepublic void onNext(Book book) {System.out.println(book book);}Overridepublic void onError(Throwable throwable) {}Overridepublic void onCompleted() {countDownLatch.countDown();System.out.println(查询完毕);}});countDownLatch.await();}
}小伙伴们大概也能看出来addBook 和 getBook 基本上操作套路是一模一样的。
3. 服务端流 RPC
前面的一元 RPC客户端发起一个请求服务端给出一个响应请求就结束了。服务端流则是客户端发起一个请求服务端给一个响应序列这个响应序列组成一个流。
上面我们给出的 searchBook 就是这样一个例子searchBook 是传递图书的 tags 参数然后在服务端查询哪些书的 tags 满足条件将满足条件的书全部都返回去。
我们来看下服务端的代码
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private MapString, Book bookMap new HashMap();public BookServiceImpl() {Book b1 Book.newBuilder().setId(1).setName(三国演义).setAuthor(罗贯中).setPrice(30).addTags(明清小说).addTags(通俗小说).build();Book b2 Book.newBuilder().setId(2).setName(西游记).setAuthor(吴承恩).setPrice(40).addTags(志怪小说).addTags(通俗小说).build();Book b3 Book.newBuilder().setId(3).setName(水浒传).setAuthor(施耐庵).setPrice(50).addTags(明清小说).addTags(通俗小说).build();bookMap.put(1, b1);bookMap.put(2, b2);bookMap.put(3, b3);}Overridepublic void searchBooks(StringValue request, StreamObserverBook responseObserver) {SetString keySet bookMap.keySet();String tags request.getValue();for (String key : keySet) {Book book bookMap.get(key);int tagsCount book.getTagsCount();for (int i 0; i tagsCount; i) {String t book.getTags(i);if (t.equals(tags)) {responseObserver.onNext(book);break;}}}responseObserver.onCompleted();}
}小伙伴们看下这段 Java 代码应该很好理解
首先从 request 中提取客户端传来的 tags 参数。遍历 bookMap查看每一本书的 tags 是否等于客户端传来的 tags如果相等说明添加匹配则通过 responseObserver.onNext(book); 将这本书写回到客户端。等所有操作都完成后执行 responseObserver.onCompleted();表示服务端的响应序列结束了这样客户端也就知道请求结束了。
我们来看看客户端的代码如下
public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel ManagedChannelBuilder.forAddress(localhost, 50051).usePlaintext().build();BookServiceGrpc.BookServiceStub stub BookServiceGrpc.newStub(channel);searchBook(stub);}private static void searchBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch new CountDownLatch(1);stub.searchBooks(StringValue.newBuilder().setValue(明清小说).build(), new StreamObserverBook() {Overridepublic void onNext(Book book) {System.out.println(book);}Overridepublic void onError(Throwable throwable) {}Overridepublic void onCompleted() {countDownLatch.countDown();System.out.println(查询完毕);}});countDownLatch.await();}
}客户端的代码好理解搜索的关键字是 明清小说每当服务端返回一次数据的时候客户端回调的 onNext 方法就会被触发一次当服务端之行了 responseObserver.onCompleted(); 之后客户端的 onCompleted 方法也会被触发。
这个就是服务端流客户端发起一个请求服务端通过 onNext 可以多次写回数据。
4. 客户端流 RPC
客户端流则是客户端发起多个请求服务端只给出一个响应。
上面的 updateBooks 就是一个客户端流的案例客户端想要修改图书可以发起多个请求修改多本书服务端则收集多次修改的结果将之汇总然后一次性返回给客户端。
我们先来看看服务端的代码
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private MapString, Book bookMap new HashMap();public BookServiceImpl() {Book b1 Book.newBuilder().setId(1).setName(三国演义).setAuthor(罗贯中).setPrice(30).addTags(明清小说).addTags(通俗小说).build();Book b2 Book.newBuilder().setId(2).setName(西游记).setAuthor(吴承恩).setPrice(40).addTags(志怪小说).addTags(通俗小说).build();Book b3 Book.newBuilder().setId(3).setName(水浒传).setAuthor(施耐庵).setPrice(50).addTags(明清小说).addTags(通俗小说).build();bookMap.put(1, b1);bookMap.put(2, b2);bookMap.put(3, b3);}Overridepublic StreamObserverBook updateBooks(StreamObserverStringValue responseObserver) {StringBuilder sb new StringBuilder(更新的图书 ID 为);return new StreamObserverBook() {Overridepublic void onNext(Book book) {bookMap.put(book.getId(), book);sb.append(book.getId()).append(,);}Overridepublic void onError(Throwable throwable) {}Overridepublic void onCompleted() {responseObserver.onNext(StringValue.newBuilder().setValue(sb.toString()).build());responseObserver.onCompleted();}};}
}客户端每发送一本书来就会触发服务端的 onNext 方法然后我们在这方法中进行图书的更新操作并记录更新结果。最后我们在 onCompleted 方法中将更新结果汇总返回给客户端基本上就是这样一个流程。
我们再来看看客户端的代码
public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel ManagedChannelBuilder.forAddress(localhost, 50051).usePlaintext().build();BookServiceGrpc.BookServiceStub stub BookServiceGrpc.newStub(channel);updateBook(stub);}private static void updateBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch new CountDownLatch(1);StreamObserverBook request stub.updateBooks(new StreamObserverStringValue() {Overridepublic void onNext(StringValue stringValue) {System.out.println(stringValue.getValue() stringValue.getValue());}Overridepublic void onError(Throwable throwable) {}Overridepublic void onCompleted() {System.out.println(更新完毕);countDownLatch.countDown();}});request.onNext(Book.newBuilder().setId(1).setName(a).setAuthor(b).build());request.onNext(Book.newBuilder().setId(2).setName(c).setAuthor(d).build());request.onCompleted();countDownLatch.await();}
}在客户端这块updateBooks 方法会返回一个 StreamObserver 对象调用该对象的 onNext 方法就是给服务端传递数据了可以传递多个数据调用该对象的 onCompleted 方法就是告诉服务端数据传递结束了此时也会触发服务端的 onCompleted 方法服务端的 onCompleted 方法执行之后进而触发了客户端的 onCompleted 方法。
5. 双向流 RPC
双向流其实就是 3、4 小节的合体。即客户端多次发送数据服务端也多次响应数据。
我们先来看下服务端的代码
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private MapString, Book bookMap new HashMap();private ListBook books new ArrayList();public BookServiceImpl() {Book b1 Book.newBuilder().setId(1).setName(三国演义).setAuthor(罗贯中).setPrice(30).addTags(明清小说).addTags(通俗小说).build();Book b2 Book.newBuilder().setId(2).setName(西游记).setAuthor(吴承恩).setPrice(40).addTags(志怪小说).addTags(通俗小说).build();Book b3 Book.newBuilder().setId(3).setName(水浒传).setAuthor(施耐庵).setPrice(50).addTags(明清小说).addTags(通俗小说).build();bookMap.put(1, b1);bookMap.put(2, b2);bookMap.put(3, b3);}Overridepublic StreamObserverStringValue processBooks(StreamObserverBookSet responseObserver) {return new StreamObserverStringValue() {Overridepublic void onNext(StringValue stringValue) {Book b Book.newBuilder().setId(stringValue.getValue()).build();books.add(b);if (books.size() 3) {BookSet bookSet BookSet.newBuilder().addAllBookList(books).build();responseObserver.onNext(bookSet);books.clear();}}Overridepublic void onError(Throwable throwable) {}Overridepublic void onCompleted() {BookSet bookSet BookSet.newBuilder().addAllBookList(books).build();responseObserver.onNext(bookSet);books.clear();responseObserver.onCompleted();}};}
}这段代码没有实际意义单纯为了给小伙伴们演示双向流我的操作逻辑是客户端传递多个 ID 到服务端然后服务端根据这些 ID 构建对应的 Book 对象然后三个三个一组再返回给客户端。客户端每次发送一个请求都会触发服务端的 onNext 方法我们在这个方法中对请求分组返回。最后如果还有剩余的请求我们在 onCompleted() 方法中返回。
再来看看客户端的代码
public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel ManagedChannelBuilder.forAddress(localhost, 50051).usePlaintext().build();BookServiceGrpc.BookServiceStub stub BookServiceGrpc.newStub(channel);processBook(stub);}private static void processBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch new CountDownLatch(1);StreamObserverStringValue request stub.processBooks(new StreamObserverBookSet() {Overridepublic void onNext(BookSet bookSet) {System.out.println(bookSet bookSet);System.out.println();}Overridepublic void onError(Throwable throwable) {}Overridepublic void onCompleted() {System.out.println(处理完毕);countDownLatch.countDown();}});request.onNext(StringValue.newBuilder().setValue(a).build());request.onNext(StringValue.newBuilder().setValue(b).build());request.onNext(StringValue.newBuilder().setValue(c).build());request.onNext(StringValue.newBuilder().setValue(d).build());request.onCompleted();countDownLatch.await();}
}这个客户端的代码跟第四小节一模一样不再赘述了。
好啦这就是松哥和小伙伴们介绍的 gRPC 的四种不同的通信模式文章中只给出了一些关键代码如果小伙伴们没看明白建议结合上篇文章一起阅读就懂啦