基于netty实现Dubbo RPC调用

netty是基于NIO(同步非阻塞)开发的网络通信框架;对比传统BIO(阻塞IO),其并发性能有很大提升。而dubbo的底层就是使用netty作为网络框架,本文就手写简单的基于netty的RPC框架。

1 设计步骤

定义一个通用接口,作为服务提供者(provider)和消费者(consumer)之间的操作纽带

创建一个服务提供者,实现通用接口,并返回处理结果;网络方面监听消费者请求

创建一个服务消费者,通过代理模式调用远程服务接口

1.1 程序目录

在这里插入图片描述

1.2 定义一个通用接口

public interface TestService {
    String hello(String msg);
}

2 服务提供者模块

2.1 接口实现

public class TestServiceImpl implements TestService {
    @Override
    public String hello(String msg) {

        System.out.println("TestServiceImpl中hello被调用,参数:" + msg);
        return "你好客户端,我已经收到你的消息:" + msg;
    }
}

2.2 定义一个服务启动类

public class ServerBootStrap {
    public static void main(String[] args) {
        NettyServer nettyServer = new NettyServer(40004);
        nettyServer.init();
    }
}

2.3 创建netty服务端

**此步骤是netty常规服务端创建方式**

public class NettyServer {

    private int port;

    public NettyServer(int port) {
        this.port = port;
    }

    public void init() {
		//创建一个用于接收连接的线程组,参数代表线程个数
        EventLoopGroup boss = new NioEventLoopGroup(1);
        //创建处理操作时间的线程组,没有参数netty会默认线程为内核数*2
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap
                    .group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new NettyServerHandler());

                        }
                    });

            ChannelFuture ch = serverBootstrap.bind(port).sync();
            ch.channel().closeFuture().sync();

        } catch (Exception ex) {
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

}

2.4 服务端业务处理Handler

channelRead0方法用于接收客户端传来的信息,同时对数据进行校验
校验成功后,截取有效参数调用服务接口

public class NettyServerHandler extends SimpleChannelInboundHandler {

    private static String head = "dubbo#TestServie#";
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("收到客户端数据:" + msg);

        if (msg.toString().startsWith(head)) {
            TestService testService = new TestServiceImpl();
            String result = testService.hello(msg.toString().substring(head.length()));

            ctx.writeAndFlush(result);
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("接收到连接请,channelActive被调用:" + ctx.channel().remoteAddress());
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("读取完成");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("断开连接");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("抛出异常");
        ctx.channel().close();
    }
}

3 消费者模块

3.1 创建消费者启动程序

public class ClientBootStrap {
    public static void main(String[] args) {
        NettyClient nettyClient = new NettyClient("127.0.0.1", 40004);
        String head = "dubbo#TestServie#";
       // nettyClient.init();
        TestService service = (TestService) nettyClient.getBean(TestService.class, head);
        String result = service.hello("你好,我是服务消费者");
        System.out.println("调用返回了结果:" + result);
    }
}

3.2 创建消费者网络通信模块

通过代理模式调用

public class NettyClient {

    private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static NettyClientHandler nettyClientHandler;

    private String host;
    private int port;

    public NettyClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    //编写一个代理 请求服务提供者接口
    public Object getBean(final Class<?> serviceClass, final String providerName) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serviceClass}, ((proxy, method, args) -> {
                    System.out.println("开始执行代理");
                    if (nettyClientHandler == null)
                        init();
                    System.out.println("设置代理参数");
                    nettyClientHandler.setPara(providerName + args[0].toString());

                    return executorService.submit(nettyClientHandler).get();
                }));
    }


    private static void init() {
        System.out.println("开始执行init方法");
        nettyClientHandler = new NettyClientHandler();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(nettyClientHandler);
                        }
                    });

            bootstrap.connect("127.0.0.1", 40004).sync();
            // future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
//            worker.shutdownGracefully();
//            System.out.println("执行结束");
        }
    }
}

3.3 创建消费者业务处理handler

成员变量para: 为调用远程接口服务的参数
成员变量result::为调用远程服务器接口返回结果
需要注意的是该handller实现了Callable接口中call()方法;
执行步骤为:
	1、连接建立成功后执行channelActive方法
	2、执行call方法发送数据到服务端,同时阻塞线程
	3、服务端返回结果后执行channelRead0方法,唤醒线程,
	4、执行call方法中wait()后面的步骤,返回结果
public class NettyClientHandler extends SimpleChannelInboundHandler implements Callable {
    private ChannelHandlerContext context;
    private String para;
    private String result;
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.context = ctx;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("断开连接");
    }

    @Override
    public synchronized Object call() throws Exception {
        System.out.println("发送call消息:" + para);
        context.writeAndFlush(para);
        wait();
        return result;
    }

    @Override
    protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        result = o.toString();
        System.out.println("收到服务端的返回消息:" + o);
        notify();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("客户端发生异常");
    }

    void setPara(String str) {
        this.para = str;
    }
}

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>