dubbo/dubbox 增加原生thrift及avro支持

简介: (facebook) thrift / (hadoop) avro / (google) probuf(grpc)是近几年来比较抢眼的高效序列化/rpc框架,dubbo框架虽然有thrift的支持,但是依赖的版本较早,只支持0.8.0,而且还对协议做一些扩展,并非原生的thrift协议。

(facebook) thrift / (hadoop) avro / (google) probuf(grpc)是近几年来比较抢眼的高效序列化/rpc框架,dubbo框架虽然有thrift的支持,但是依赖的版本较早,只支持0.8.0,而且还对协议做一些扩展,并非原生的thrift协议。

github上虽然也有朋友对dubbo做了扩展支持原生thrift,但是代码实在太多了,只需要一个类即可:

Thrift2Protocal.java:

package com.alibaba.dubbo.rpc.protocol.thrift2;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

import java.lang.reflect.Constructor;

/**
 * 为dubbo-rpc添加"原生thrift"支持
 * by 杨俊明(http://yjmyzz.cnblogs.com/)
 */
public class Thrift2Protocol extends AbstractProxyProtocol {
    public static final int DEFAULT_PORT = 33208;
    private static final Logger logger = LoggerFactory.getLogger(Thrift2Protocol.class);

    public int getDefaultPort() {
        return DEFAULT_PORT;
    }

    @Override
    protected <T> Runnable doExport(T impl, Class<T> type, URL url)
            throws RpcException {

        logger.info("impl => " + impl.getClass());
        logger.info("type => " + type.getName());
        logger.info("url => " + url);

        TProcessor tprocessor;
        TNonblockingServer.Args tArgs = null;
        String iFace = "$Iface";
        String processor = "$Processor";
        String typeName = type.getName();
        TNonblockingServerSocket transport;
        if (typeName.endsWith(iFace)) {
            String processorClsName = typeName.substring(0, typeName.indexOf(iFace)) + processor;
            try {
                Class<?> clazz = Class.forName(processorClsName);
                Constructor constructor = clazz.getConstructor(type);
                try {
                    tprocessor = (TProcessor) constructor.newInstance(impl);
                    transport = new TNonblockingServerSocket(url.getPort());
                    tArgs = new TNonblockingServer.Args(transport);
                    tArgs.processor(tprocessor);
                    tArgs.transportFactory(new TFramedTransport.Factory());
                    tArgs.protocolFactory(new TCompactProtocol.Factory());
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    throw new RpcException("Fail to create thrift server(" + url + ") : " + e.getMessage(), e);
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                throw new RpcException("Fail to create thrift server(" + url + ") : " + e.getMessage(), e);
            }
        }

        if (tArgs == null) {
            logger.error("Fail to create thrift server(" + url + ") due to null args");
            throw new RpcException("Fail to create thrift server(" + url + ") due to null args");
        }
        final TServer thriftServer = new TNonblockingServer(tArgs);

        new Thread(new Runnable() {
            public void run() {
                logger.info("Start Thrift Server");
                thriftServer.serve();
                logger.info("Thrift server started.");
            }
        }).start();

        return new Runnable() {
            public void run() {
                try {
                    logger.info("Close Thrift Server");
                    thriftServer.stop();
                } catch (Throwable e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        };
    }

    @Override
    protected <T> T doRefer(Class<T> type, URL url) throws RpcException {

        logger.info("type => " + type.getName());
        logger.info("url => " + url);

        try {
            TSocket tSocket;
            TTransport transport;
            TProtocol protocol;
            T thriftClient = null;
            String iFace = "$Iface";
            String client = "$Client";

            String typeName = type.getName();
            if (typeName.endsWith(iFace)) {
                String clientClsName = typeName.substring(0, typeName.indexOf(iFace)) + client;
                Class<?> clazz = Class.forName(clientClsName);
                Constructor constructor = clazz.getConstructor(TProtocol.class);
                try {
                    tSocket = new TSocket(url.getHost(), url.getPort());
                    transport = new TFramedTransport(tSocket);
                    protocol = new TCompactProtocol(transport);
                    thriftClient = (T) constructor.newInstance(protocol);
                    transport.open();
                    logger.info("thrift client opened for service(" + url + ")");
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    throw new RpcException("Fail to create remoting client:" + e.getMessage(), e);
                }
            }
            return thriftClient;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
    }

}

重写父类AbstractProxyProtocol的二个抽象方法doExport及doRefer即可,doExport用于对外暴露RPC服务,在这个方法里启动thrift server,dubbo service provider在启动时会调用该方法。而doRefer用于dubbo service consumer发现服务后,获取对应的rpc-client。 

参考这个思路,avro也很容易集成进来:

AvroProtocol.java

package com.alibaba.dubbo.rpc.protocol.avro;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.reflect.ReflectRequestor;
import org.apache.avro.ipc.reflect.ReflectResponder;

import java.net.InetSocketAddress;

/**
 * 为dubbo-rpc添加avro支持
 * by 杨俊明(http://yjmyzz.cnblogs.com/)
 */
public class AvroProtocol extends AbstractProxyProtocol {
    public static final int DEFAULT_PORT = 40881;
    private static final Logger logger = LoggerFactory.getLogger(AvroProtocol.class);

    public int getDefaultPort() {
        return DEFAULT_PORT;
    }

    @Override
    protected <T> Runnable doExport(T impl, Class<T> type, URL url)
            throws RpcException {

        logger.info("impl => " + impl.getClass());
        logger.info("type => " + type.getName());
        logger.info("url => " + url);

        final Server server = new NettyServer(new ReflectResponder(type, impl),
                new InetSocketAddress(url.getHost(), url.getPort()));
        server.start();

        return new Runnable() {
            public void run() {
                try {
                    logger.info("Close Avro Server");
                    server.close();
                } catch (Throwable e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        };
    }

    @Override
    protected <T> T doRefer(Class<T> type, URL url) throws RpcException {

        logger.info("type => " + type.getName());
        logger.info("url => " + url);

        try {
            NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(url.getHost(), url.getPort()));
            T ref = ReflectRequestor.getClient(type, client);
            logger.info("Create Avro Client");
            return ref;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
    }

}

不要忘记在META-INF/dubbo/internal下添加名为com.alibaba.dubbo.rpc.Protocal的文件,内容为:

avro=com.alibaba.dubbo.rpc.protocol.avro.AvroProtocol

接下来谈谈如何打包到dubbo的jar里:  

dubbo-rpc/pom.xml里,把二个新增的项目加进来:

    <modules>
        ...
        <module>dubbo-rpc-avro</module>
        ...
        <module>dubbo-rpc-thrift2</module>
        ...
            
    </modules>

然后dubbo/pom.xml里:

   <artifactSet>
       <includes>
    ...
           <include>com.alibaba:dubbo-rpc-api</include>
           <include>com.alibaba:dubbo-rpc-avro</include>
          ...
           <include>com.alibaba:dubbo-rpc-thrift2</include>
           ...
       </includes>
   </artifactSet>    

dependencies节也要增加:

<dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>dubbo-rpc-thrift2</artifactId>
            <version>${project.parent.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.thrift</groupId>
                    <artifactId>libthrift</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>dubbo-rpc-avro</artifactId>
            <version>${project.parent.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.avro</groupId>
                    <artifactId>avro</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.avro</groupId>
                    <artifactId>avro-ipc</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

这样打包出来的dubbo-xxx.jar里,就包括新增的Protocol。至于google的protobuf,目前处于3.x -beta阶段,等以后出正式版了,再看情况整合起来。

以上代码已经提交到github:https://github.com/yjmyzz/dubbox (版本号:2.8.4a

thrift/avro协议的使用示例见:https://github.com/yjmyzz/dubbox-sample

最后,对dubbo/thrift/avro/rest这4种协议,做了下简单的对比测试,测试用例很简单:

  public String ping() {
        return "pong";
    }

客户端调用ping方法,服务器返回字符串"pong",在mac book pro上做5万次调用,结果如下:

dubbo RPC testing => 
 50000次RPC调用(dubbo协议),共耗时14778毫秒,平均3383.407715/秒
avro RPC testing => 
 50000次RPC调用(avro协议),共耗时10707毫秒,平均4669.842285/秒
thrift RPC testing => 
 50000次RPC调用(thrift协议),共耗时4667毫秒,平均10713.520508/秒
REST testing => 
 50000次REST调用,共耗时112699毫秒,平均443.659668/秒

这跟预期一致,REST走http协议,自然最慢,avro与dubbo底层的网络通讯都是借助netty实现,在同一个数量级,但是avro的二进制序列化效率更高,所以略快,而thrift则是从里到外,全都是facebook自己实现的,性能最优,完胜其它协议。

个人建议:对于一个服务接口,对外同时提供thrift、REST二种形式的服务实现,内部子系统之间用thrift方式调用(因为thrift跨语言,其实从外部进来的调用,也可以用thrift-rpc方式),一些不方便直接用thrift-client调用的场景,仍然走传统的REST.

目录
相关文章
|
Kubernetes Dubbo 应用服务中间件
Dubbo3实践:基于 API-SERVER 的原生 K8S Service
> 该示例演示了直接以 API-SERVER 为注册中心,将 Dubbo 应用部署到 Kubernetes 并复用 Kubernetes Native Service 的使用示例。 > > 此示例的局限在于需要授予每个 Dubbo 应用访问 API-SERVER 特定资源的权限,同时直接访问和监听 API-SERVER 对中小集群来说并没有什么问题,但对于较大规模集群而言可能给 API-SERV
409 0
|
Dubbo Java 测试技术
分布式RPC框架性能大比拼 dubbo、motan、rpcx、gRPC、thrift的性能比较
Dubbo 是阿里巴巴公司开源的一个Java高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和 Spring框架无缝集成。不过,略有遗憾的是,据说在淘宝内部,dubbo由于跟淘宝另一个类似的框架HSF(非开源)有竞争关系,导致dubbo团队已经解散(参见http://www.oschina.net/news/55059/druid-1-0-9 中的评论),反到是当当网的扩展版本仍在持续发展,墙内开花墙外香。
6798 0
|
JSON 运维 Dubbo
Dubbo 3.1.0 正式发布,数据面原生接入 Service Mesh
Apache Dubbo 3.1.0 作为 Dubbo 规划中的一个重要里程碑版本,标记着 Dubbo 在数据面上全面拥抱 Service Mesh 的工作模式,原生支持接入到 Service Mesh 的体系中。在 8 月 22 日,Dubbo 3.1.0 版本通过社区投票,正式对外发布。
Dubbo 3.1.0 正式发布,数据面原生接入 Service Mesh
|
Dubbo Java 关系型数据库
【分布式事务】tcc-transaction分布式TCC型事务框架搭建与实战案例(基于Dubbo/Dubbox)
作者个人研发的在高并发场景下,提供的简单、稳定、可扩展的延迟消息队列框架,具有精准的定时任务和延迟队列处理功能。自开源半年多以来,已成功为十几家中小型企业提供了精准定时调度方案,经受住了生产环境的考验。为使更多童鞋受益,现给出开源框架地址: https://github.com/sunshinelyz/mykit-delay PS: 欢迎各位Star源码,也可以pr你牛逼哄哄的代码。
203 0
|
Dubbo Java 关系型数据库
tcc-transaction分布式TCC型事务框架搭建与实战案例(基于Dubbo/Dubbox)
一定分布式开发经验的朋友都知道,产品/项目/系统最初为了能够快速迭代上线,往往不太注重产品/项目/系统的高可靠性、高性能与高扩展性,采用单体应用和单实例数据库的架构方式快速迭代开发;当产品/项目/系统做到一定规模的时候,原有的系统架构则不足以支撑义务发展需要
401 0
|
存储 Kubernetes Dubbo
Dubbo 3.0 前瞻之对接 Kubernetes 原生服务
Dubbo 与 Kubernetes 的调度体系的结合,可以让原本需要管理两套平台的运维成本大大减低,而且 Dubbo 适配了 Kubernetes 原生服务也可以让框架本身更加融入云原生体系。基于 Dubbo 3.0 的全新应用级服务发现模型可以更容易对齐 Kubernetes 的服务模型。
Dubbo 3.0 前瞻之对接 Kubernetes 原生服务
|
Dubbo Java 应用服务中间件
分布式服务框架 dubbo/dubbox 入门示例
dubbo是一个分布式的服务架构,可直接用于生产环境作为SOA服务框架。 官网首页:http://dubbo.io/ ,官方用户指南 http://dubbo.io/User+Guide-zh.htm上面的几张图画得不错,完全可以当做SOA架构的学习资料 淘宝将这个项目开源出来以后,得到了不少同行的支持,包括: 当当网的扩展版本dubbox :https://github.
2055 0
|
5月前
|
负载均衡 Dubbo 应用服务中间件
微服务技术系列教程(31) - Dubbo-原理及负载均衡分析
微服务技术系列教程(31) - Dubbo-原理及负载均衡分析
54 0
|
5月前
|
Dubbo Java 应用服务中间件
微服务技术系列教程(30) - Dubbo-SpringCloud与Dubbo区别
微服务技术系列教程(30) - Dubbo-SpringCloud与Dubbo区别
46 0
|
4月前
|
Dubbo Java 应用服务中间件
阿里巴巴资深架构师深度解析微服务架构设计之SpringCloud+Dubbo
软件架构是一个包含各种组织的系统组织,这些组件包括Web服务器,应用服务器,数据库,存储,通讯层),它们彼此或和环境存在关系。系统架构的目标是解决利益相关者的关注点。

热门文章

最新文章