模板网站zencart,初级软件工程师报考条件,上海建设摩托车科技有限公司官网,网络营销零基础培训前言#xff1a;利用netty异步事件驱动的网络通信模型#xff0c;来实现rpc通信
一、大致目录结构#xff1a; 二、两个端#xff1a;服务端#xff08;发布#xff09;#xff0c;客户端#xff08;订阅消费#xff09;#xff0c;上代码#xff1a;
1.服务端利用netty异步事件驱动的网络通信模型来实现rpc通信
一、大致目录结构 二、两个端服务端发布客户端订阅消费上代码
1.服务端发布
RPCServer
代码
public class RpcServer {private MapString, Object registryMap new HashMap();private ListString classCache new ArrayList();// 1.实现发布实现方式// 1.1查找指定目录下的所有接口放入一个集合中// 1.2遍历所有接口放入一个map中存放接口路径及接口名称// 1.3发送方法相关信息public void publish(String providerPackage) throws Exception {getProviderClass(providerPackage);doRegister();NioEventLoopGroup parentGroup new NioEventLoopGroup();NioEventLoopGroup childGroup new NioEventLoopGroup();try {ServerBootstrap serverBootstrap new ServerBootstrap();serverBootstrap.group(parentGroup, childGroup).option(ChannelOption.SO_BACKLOG, 1024).childOption(ChannelOption.SO_KEEPALIVE, true).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline socketChannel.pipeline();pipeline.addLast(new ObjectEncoder());pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));pipeline.addLast(new RpcServerHandler(registryMap));}});ChannelFuture future serverBootstrap.bind(9999).sync();System.out.println(服务端监听9999端口启动成功。。。);future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {parentGroup.shutdownGracefully();childGroup.shutdownGracefully();}}private void doRegister() throws Exception {if (classCache.size() 0) {for (String className :classCache){Class? clazz Class.forName(className);String interfaceName clazz.getInterfaces()[0].getName();registryMap.put(interfaceName, clazz.newInstance());}}}/*** 获取当前目录下的所有接口汇总到一个集合中* param providerPackage*/private void getProviderClass(String providerPackage) {URL resource this.getClass().getClassLoader().getResource(providerPackage.replaceAll(\\., /));File file null;if (resource ! null) {file new File(resource.getFile());}if (file ! null) {for (File f : Objects.requireNonNull(file.listFiles())) {if (f.isDirectory()) {getProviderClass(providerPackage . f.getName());} else if (f.getName().endsWith(.class)) {String fileName f.getName().replace(.class, ).trim();classCache.add(providerPackage . fileName);}}}}}
服务端处理器用于处理消费端发送过来的接口数据
代码
public class RpcServerHandler extends ChannelInboundHandlerAdapter {private MapString, Object registryMap;public RpcServerHandler(MapString, Object registryMap) {this.registryMap registryMap;}Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(rpc-server收到客户端消息 msg);if (msg instanceof InvokeMessage) {InvokeMessage method (InvokeMessage) msg;Object result rpc-server端没有该方法;// 判断是否存在该方法if (registryMap.containsKey(method.getClassName())) {Object provider registryMap.get(method.getClassName());result provider.getClass().getMethod(method.getMethodName(), method.getParamTypes()).invoke(provider, method.getParamValues());}// 把方法返回结果发给订阅者ctx.channel().writeAndFlush(result);ctx.close();}}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
接口的实现类这里的接口是客户端的接口Api
代码
public class SsenServiceApiImpl implements SsenServiceApi {Overridepublic String hellRpc(String name) {return name 实现类方法;}
}
2.客户端订阅消费
这里采用JDK原生的基于接口的动态代理f发
public class RpcProxy {// JDK基于接口的动态代理用于创建代理对象public static T T create(Class? clazz) {return (T) Proxy.newProxyInstance(clazz.getClassLoader(),new Class[]{clazz},new InvocationHandler() {Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {if (Object.class.equals(method.getDeclaringClass())) {return method.invoke(proxy, method, args);}// 通过netty将接口信息发送给提供者获取指定方法return RpcInvoke(clazz, method, args);}});}private static Object RpcInvoke(Class? clazz, Method method, Object[] args) {NioEventLoopGroup eventGroup new NioEventLoopGroup();RpcClientHandler rpcClientHandler new RpcClientHandler();try {Bootstrap bootstrap new Bootstrap();bootstrap.group(eventGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline socketChannel.pipeline();pipeline.addLast(new ObjectEncoder());pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)));pipeline.addLast(rpcClientHandler);}});// 绑定指定服务地址ChannelFuture future bootstrap.connect(localhost, 9999).sync();// 指定接口信息发送给提供者InvokeMessage invokeMessage new InvokeMessage();invokeMessage.setClassName(clazz.getName());invokeMessage.setMethodName(method.getName());invokeMessage.setParamTypes(method.getParameterTypes());invokeMessage.setParamValues(args);future.channel().writeAndFlush(invokeMessage).sync();future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {eventGroup.shutdownGracefully();}return rpcClientHandler.getResult();}}