spark2.1.0之源码分析——RPC服务器TransportServer

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81062342 提示:阅读本文前最好先阅读:《Spark2.
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81062342

提示:阅读本文前最好先阅读:

  1. 《Spark2.1.0之内置RPC框架》
  2. 《spark2.1.0之源码分析——RPC配置TransportConf》
  3. 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》

       TransportServer是RPC框架的服务端,可提供高效的、低级别的流服务。在说明《Spark2.1.0之内置RPC框架》一文所展示的图1中的记号②时提到过TransportContext的createServer方法用于创建TransportServer,其实现见代码清单1。

代码清单1         创建RPC服务端

  public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, null, port, rpcHandler, bootstraps);
  }

  public TransportServer createServer(
      String host, int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, host, port, rpcHandler, bootstraps);
  }

  public TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
    return createServer(0, bootstraps);
  }

  public TransportServer createServer() {
    return createServer(0, Lists.<TransportServerBootstrap>newArrayList());
  }

代码清单1中列出了四个名为createServer的重载方法,但是它们最终调用了TransportServer的构造器(见代码清单2)来创建TransportServer实例。

代码清单2         TransportServer的构造器

  public TransportServer(
      TransportContext context,
      String hostToBind,
      int portToBind,
      RpcHandler appRpcHandler,
      List<TransportServerBootstrap> bootstraps) {
    this.context = context;
    this.conf = context.getConf();
    this.appRpcHandler = appRpcHandler;
    this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));

    try {
      init(hostToBind, portToBind);
    } catch (RuntimeException e) {
      JavaUtils.closeQuietly(this);
      throw e;
    }
  }

TransportServer的构造器中的各个变量分别为:

  • context:即参数传递的TransportContext的引用;
  • conf:即TransportConf,这里通过调用TransportContext的getConf获取;
  • appRpcHandler:即RPC请求处理器RpcHandler;
  • bootstraps:即参数传递的TransportServerBootstrap列表;

TransportServer的构造器(见代码清单2)中调用了init方法,init方法用于对TransportServer进行初始化,见代码清单3。

代码清单3         TransportServer的初始化

  private void init(String hostToBind, int portToBind) {
    // 根据Netty的API文档,Netty服务端需同时创建bossGroup和workerGroup
    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    EventLoopGroup bossGroup =
      NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
    EventLoopGroup workerGroup = bossGroup;
    // 创建一个汇集ByteBuf但对本地线程缓存禁用的分配器
    PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
      conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
    // 创建Netty的服务端根引导程序并对其进行配置
    bootstrap = new ServerBootstrap()
      .group(bossGroup, workerGroup)
      .channel(NettyUtils.getServerChannelClass(ioMode))
      .option(ChannelOption.ALLOCATOR, allocator)
      .childOption(ChannelOption.ALLOCATOR, allocator);

    if (conf.backLog() > 0) {
      bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
    }
    if (conf.receiveBuf() > 0) {
      bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
    }
    if (conf.sendBuf() > 0) {
      bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
    }
    // 为根引导程序设置管道初始化回调函数
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        RpcHandler rpcHandler = appRpcHandler;
        for (TransportServerBootstrap bootstrap : bootstraps) {
          rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
        }
        context.initializePipeline(ch, rpcHandler);
      }
    });
    // 给根引导程序绑定Socket的监听端口
    InetSocketAddress address = hostToBind == null ?
        new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
    channelFuture = bootstrap.bind(address);
    channelFuture.syncUninterruptibly();

    port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
    logger.debug("Shuffle server started on port: {}", port);
  }

代码清单3中TransportServer初始化的步骤如下:

  1. 创建bossGroup和workerGroup(根据Netty的API文档,Netty服务端需同时创建bossGroup和workerGroup。);
  2. 创建一个汇集ByteBuf但对本地线程缓存禁用的分配器;

  3. 调用Netty的API创建Netty的服务端根引导程序并对其进行配置;

  4. 为根引导程序设置管道初始化回调函数,此回调函数首先设置TransportServerBootstrap到根引导程序中,然后调用TransportContext的initializePipeline方法初始化Channel的pipeline;

  5. 给根引导程序绑定Socket的监听端口,最后返回监听的端口。

提示:代码清单3中使用了NettyUtils工具类的很多方法,在《附录G Netty与NettyUtils》中有对它们的详细介绍。EventLoopGroup、PooledByteBufAllocator、ServerBootstrap都是Netty提供的API,对于它们的更多介绍,请访问http://netty.io/

 

 

关于《Spark内核设计的艺术 架构设计与实现》

经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:

 

纸质版售卖链接如下:

京东:https://item.jd.com/12302500.html

相关文章
|
4月前
|
Java 应用服务中间件 API
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
|
3月前
|
网络协议 调度 C语言
live555 RTSP服务器与客户端通信源码分析
live555已经发展了很多年,不过最新的live555版本,笔者没有编译通过,最终选择了2019.8.28的live555代码,如果有需要的同学,可以自行去Index of /pub/contrib/live555/ (videolan.org)去下载,不过需要自己去编译,我的编译环境是windows版本,网上有很多关于如何将其编译为VS版本的live555的,如果有需要的同学,可以在博客下留言,我会给你发一个(自己对一些代码进行了注释,不过都是自己的理解,不一定正确)。对于代码的分析:RTSP服务器使用的testOnDemandRTSPServer.cpp,RTSP客户端使用的testRT
87 0
|
7月前
|
负载均衡 Dubbo Java
RPC框架-dubbo:架构及源码分析-初篇
在自学或面试dubbo时,相关的问题有很多,例如dubbo 的基本工作原理,这是使用过dubbo后应该知道的。包括dubbo的分层架构、长短链接选择、二进制协议支持;之后是使用方式(服务的注册、发现、调用方式),基础配置(超时时间、线程数),这些是最基本的。 在这些问题之后,就可以继续深入底层:关于连接方式,使用长连接还是短连接?为什么? dubbo的二进制协议支持哪些,之间有什么区别/优缺点等等,也可以考察在使用过程中遇到过哪些问题,是如何解决的。这些都需要深入理解,并且有真实、长时间使用经验。
139 0
|
前端开发 Java 应用服务中间件
《SpringBoot启动流程七》:源码分析SpringBoot如何内嵌并启动Tomcat服务器的?
《SpringBoot启动流程七》:源码分析SpringBoot如何内嵌并启动Tomcat服务器的?
318 0
《SpringBoot启动流程七》:源码分析SpringBoot如何内嵌并启动Tomcat服务器的?
|
消息中间件 Java RocketMQ
RocketMQ源码分析-Rpc通信模块(remoting)二
今天继续RocketMQ-Rpc通信模块(remoting)的源码分析。上一章提到了主要的start()方法执行流程,如果有不清楚的地方可以一起讨论哈,这篇文章会继续解读主要方法,按照惯例先看看NettyRemotingAbstract的类图,看类图知方法。和NettyEventExecutor以及MQ的交互流程。 按照惯例先看看NettyRemotingAbstract的类图,看类图知方法,文中会挑重要方法和主要流程解读。
405 0
RocketMQ源码分析-Rpc通信模块(remoting)二
|
消息中间件 编解码 网络协议
RocketMQ源码分析-Rpc通信模块(remoting)一
上篇文章分析了Rocketmq的nameServer的源码,在继续分析源码之前,先考虑一个问题,设计一个mq并且是高性能的mq最最核心的问题是什么,我个人认为主要是有俩个方面,1:消息的网络传输,2:消息的读写,这两个决定了mq的高性能。
470 0
RocketMQ源码分析-Rpc通信模块(remoting)一
|
SpringCloudAlibaba 负载均衡 关系型数据库
RPC框架(5 - 实现基于 Nacos 的服务器注册与发现)
RPC框架(5 - 实现基于 Nacos 的服务器注册与发现)
|
编解码 JSON 网络协议
透视RPC协议:SOFA-BOLT协议源码分析
最近在看Netty相关的资料,刚好SOFA-BOLT是一个比较成熟的Netty自定义协议栈实现,于是决定研读SOFA-BOLT的源码,详细分析其协议的组成,简单分析其客户端和服务端的源码实现。当前阅读的源码是2021-08左右的SOFA-BOLT仓库的master分支源码。
289 0
透视RPC协议:SOFA-BOLT协议源码分析
【Zookeeper】源码分析之服务器(五)之ObserverZooKeeperServer
前面分析了FollowerZooKeeperServer,接着分析ObserverZooKeeperServer。
111 0
【Zookeeper】源码分析之服务器(四)之FollowerZooKeeperServer
 前面分析了LeaderZooKeeperServer,接着分析FollowerZooKeeperServer。
125 0