专注Java领域技术
我们一直在努力

二十、网络编程-Netty-在-Dubbo-中如何应用

前言

众所周知,国内知名框架 Dubbo 底层使用的是 Netty 作为网络通信,那么内部到底是如何使用的呢?今天我们就来一探究竟。

1. dubbo 的 Consumer 消费者如何使用 Netty

注意:此次代码使用了从 githubclonedubbo 源码中的 dubbo-demo 例子。

代码如下:

System.setProperty("java.net.preferIPv4Stack", "true");
   ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
   context.start();
    // @1
   DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
   int a = 0;
   while (true) {
       try {
           Thread.sleep(1000);
           System.err.println( ++ a + " ");

           String hello = demoService.sayHello("world"); // call remote method
           System.out.println(hello); // get result

       } catch (Throwable throwable) {
           throwable.printStackTrace();
       }
   }

当代码执行到 @1 的时候,会调用 Spring 容器的 getBean 方法,而 dubbo 扩展了 FactoryBean,所以,会调用 getObject 方法,该方法会创建代理对象。

这个过程中会调用 DubboProtocol 实例的 getClients(URL url)方法,当这个给定的 URLclient 没有初始化则创建,然后放入缓存,代码如下:

这个 initClient 方法就是创建 Nettyclient 的。

最终调用的就是抽象父类 AbstractClient 的构造方法,构造方法中包含了创建 Socket 客户端,连接客户端等行为。

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
   doOpen();
   connect();
}

doOpent 方法用来创建 Nettybootstrap

protected void doOpen() throws Throwable {
   NettyHelper.setNettyLoggerFactory();
   bootstrap = new ClientBootstrap(channelFactory);
   bootstrap.setOption("keepAlive", true);
   bootstrap.setOption("tcpNoDelay", true);
   bootstrap.setOption("connectTimeoutMillis", getTimeout());
   final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
   bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
       public ChannelPipeline getPipeline() {
           NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
           ChannelPipeline pipeline = Channels.pipeline();
           pipeline.addLast("decoder", adapter.getDecoder());
           pipeline.addLast("encoder", adapter.getEncoder());
           pipeline.addLast("handler", nettyHandler);
           return pipeline;
       }
   });
}

connect 方法用来连接提供者:

protected void doConnect() throws Throwable {
   long start = System.currentTimeMillis();
   ChannelFuture future = bootstrap.connect(getConnectAddress());
   boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
   if (ret && future.isSuccess()) {
       Channel newChannel = future.getChannel();
       newChannel.setInterestOps(Channel.OP_READ_WRITE);
   } 
}

上面的代码中,调用了 bootstrapconnect 方法,熟悉的 Netty 连接操作。当然这里使用的是 jbossnetty3,稍微有点区别。当连接成功后,注册写事件,准备开始向提供者传递数据。

main 方法中调用 demoService.sayHello(“world”) 的时候,最终会调用 HeaderExchangeChannelrequest 方法,通过 channel 进行请求。

public ResponseFuture request(Object request, int timeout) throws RemotingException {
   Request req = new Request();
   req.setVersion("2.0.0");
   req.setTwoWay(true);
   req.setData(request);
   DefaultFuture future = new DefaultFuture(channel, req, timeout);
   channel.send(req);
   return future;
}

send 方法中最后调用 jboss Netty 中继承了 NioSocketChannelNioClientSocketChannelwrite 方法。完成了一次数据的传输。

  1. dubboProvider 提供者如何使用 Netty
    Provider demo 代码:
System.setProperty("java.net.preferIPv4Stack", "true");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"});
context.start();
System.in.read(); // press any key to exit

Provider 作为被访问方,肯定是一个Server 模式的 Socket。如何启动的呢?

Spring 容器启动的时候,会调用一些扩展类的初始化方法,比如继承了 InitializingBeanApplicationContextAwareApplicationListener

dubbo 创建了 ServiceBean 继承了一个监听器。Spring 会调用他的 onApplicationEvent 方法,该类有一个 export 方法,用于打开 ServerSocket

然后执行了 DubboProtocolcreateServer 方法,然后创建了一个 NettyServer 对象。NettyServer 对象的 构造方法同样是 doOpen 方法和。代码如下:

protected void doOpen() throws Throwable {
   NettyHelper.setNettyLoggerFactory();
   ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
   ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
   ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
   bootstrap = new ServerBootstrap(channelFactory);

   final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
   channels = nettyHandler.getChannels();
   bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
       public ChannelPipeline getPipeline() {
           NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
           ChannelPipeline pipeline = Channels.pipeline();
           pipeline.addLast("decoder", adapter.getDecoder());
           pipeline.addLast("encoder", adapter.getEncoder());
           pipeline.addLast("handler", nettyHandler);
           return pipeline;
       }
   });
   channel = bootstrap.bind(getBindAddress());
}

该方法中,看到了熟悉的 boss 线程,worker 线程,和 ServerBootstrap,在添加了编解码 handler 之后,添加一个 NettyHandler,最后调用 bind 方法,完成绑定端口的工作。和我们使用 Netty 是一摸一样。

3. 总结

可以看到,dubbo 使用 Netty 还是挺简单的,消费者使用 NettyClient,提供者使用 NettyServerProvider 启动的时候,会开启端口监听,使用我们平时启动 Netty 一样的方式。

ClientSpring getBean 的时候,会创建 Client,当调用远程方法的时候,将数据通过 dubbo 协议编码发送到 NettyServer,然后 NettServer 收到数据后解码,并调用本地方法,并返回数据,完成一次完美的 RPC 调用。

好,关于 dubbo 如何使用 Netty 就简短的介绍到这里。

写完了如果写得有什么问题,希望读者能够给小编留言,也可以点击此处扫下面二维码关注微信公众号

赞(0) 打赏
未经允许不得转载:Java小咖秀 » 二十、网络编程-Netty-在-Dubbo-中如何应用
免责声明

抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

专注Java技术 100年

联系我们联系我们

你默默的关注就是最好的打赏~

支付宝扫一扫打赏

微信扫一扫打赏