基于Thrift实现跨语言服务

  1. 云栖社区>
  2. 博客>
  3. 正文

基于Thrift实现跨语言服务

shiyanjuncn 2016-04-14 09:40:05 浏览2469
展开阅读全文

假设,现在我们有这样一个需求:
要通过一个代理平台,将查询请求转发到后端服务器进行查询。后端存在多种查询服务器,查询方式也不同,比如,有基于SQL的关系数据库查询,也有基于搜索引擎Solr的查询。通过代理平台,将

服务暴露给具有任何编程语言技能的开发人员进行调用。
我们可以选择Thrift来定义语言中性的服务接口,然后通过Thrift编译器将定义生成多种编程语言的客户端代码框架,服务器端使用指定语言进行开发,如Java,最后通过连接Thrift服务器来进行查

询调用。
根据我们的需求,后端服务使用Java实现,而外部使用C#进行调用返回结果,再执行进一步的处理。

Thrift服务定义

首先,看一下,我们给出的示例服务定义,文件命名为queryproxy.thrift,内容如下所示:

01 namespace java org.shirdrn.queryproxy.thrift.protocol
02 namespace csharp Query.Proxy.Thrift.Protocol
03 namespace py queryproxy.thrift.protocol
04
05 typedef i16 short
06 typedef i32 int
07 typedef i64 long
08
09 enum QueryType {
10 SOLR = 1,
11 SQL = 2
12 }
13
14 struct QueryParams {
15 1:QueryType type,
16 2:string table,
17 3:list<string> paramList
18 }
19
20 struct QueryResult {
21 1:int offset,
22 2:int length
23 3:list<string> results
24 }
25
26 exception QueryFailureException {
27 1:string message
28 }
29
30 service QueryProxyService {
31
32 QueryResult query(1:QueryParams paramList) throws (1:QueryFailureException qe)
33
34 }

上面定义的内容的含义如下所示:

  • QueryType 指定查询类型,包括两种类型:查询Solr服务器,或SQL查询
  • QueryParams 用来设置请求参数
  • QueryResult 是返回结果对象,封装了查询结果列表,我们将查询结果以JSON列表形式返回
  • QueryFailureException 如果查询失败,返回该异常
  • QueryProxyService 定义了服务调用接口

编译Thrift服务定义

根据上面定义的服务,使用Thrift编译器生成不同编程语言的代码,我们生成用于服务器端的Java代码,和客户端服务调用的C#代码,执行命令如下所示:

1 thrift --gen java queryproxy.thrift
2 thrift --gen csharp queryproxy.thrift

然后可以在当前目录下面查看到编译生成的代码目录:

1 ls
2 gen-csharp gen-java

可以直接基于这些代码进行开发服务器端和客户端代码,详见后面说明。

Thrift服务实现

我们使用Java语言实现服务器端的Thrift服务。首先,需要从Thrift的发行包中给出的jar文件,添加到classpath中,基于该库文件进行服务开发。
通过上一步使用Thrift编译器编译生成的Java代码,可以看到一个服务接口:

1 org.shirdrn.queryproxy.thrift.protocol.QueryProxyService.Iface

我们要对两种类型的查询服务(Solr和SQL)给出实现,首先基于该服务接口来抽象出一层,在抽象类中定义了服务配置对象(读取Properties文件),其中配置文件内容大概如下所示:

1 query.proxy.thrift.port=9966
2 query.proxy.thrift.worker.thread.minCount=1
3 query.proxy.thrift.worker.thread.maxCount=200
4 query.proxy.solr.zkHost=master:2181

抽象服务类如下所示:

01 package org.shirdrn.queryproxy.common;
02
03 import java.io.Closeable;
04
05 import org.shirdrn.queryproxy.thrift.protocol.QueryProxyService.Iface;
06
07 public abstract class ConfiguredQueryService implements Iface, Closeable {
08
09 protected final Configurable context;
10
11 public ConfiguredQueryService(Configurable context) {
12 super();
13 this.context = context;
14 }
15 }

然后实现上面提到的两种类型的服务,都基于该抽象类进行开发。

  • Solr查询服务实现

因为后端已经存在一个Solr查询服务器集群(SolrCloud),我们实际上是通过solrj客户端调用来执行查询,所以Thrift服务端的查询也是基于这个原理。下面,看一下Solr查询服务实现类

SolrQueryService的实现内容,代码如下所示:

01 package org.shirdrn.queryproxy.thrift.service.solr;
02
03 import java.io.IOException;
04 import java.net.MalformedURLException;
05 import java.util.HashMap;
06 import java.util.Iterator;
07 import java.util.Map;
08
09 import org.apache.commons.logging.Log;
10 import org.apache.commons.logging.LogFactory;
11 import org.apache.solr.client.solrj.SolrServerException;
12 import org.apache.solr.client.solrj.impl.CloudSolrServer;
13 import org.apache.solr.client.solrj.response.QueryResponse;
14 import org.apache.solr.common.params.CommonParams;
15 import org.apache.solr.common.params.MapSolrParams;
16 import org.apache.solr.common.params.SolrParams;
17 import org.apache.thrift.TException;
18 import org.shirdrn.queryproxy.common.Configurable;
19 import org.shirdrn.queryproxy.common.ConfiguredQueryService;
20 import org.shirdrn.queryproxy.thrift.protocol.QueryFailureException;
21 import org.shirdrn.queryproxy.thrift.protocol.QueryParams;
22 import org.shirdrn.queryproxy.thrift.protocol.QueryResult;
23 import org.shirdrn.queryproxy.utils.ResultUtils;
24
25 public class SolrQueryService extends ConfiguredQueryService {
26
27 private static final Log LOG = LogFactory.getLog(SolrQueryService.class);
28 private CloudSolrServer solrServer;
29 private static final String writerType = "json";
30
31 public SolrQueryService(Configurable context) {
32 super(context);
33 String zkHost = context.get("query.proxy.solr.zkHost");
34 try {
35 solrServer = new CloudSolrServer(zkHost);
36 } catch (MalformedURLException e) {
37 throw new RuntimeException(e);
38 }
39 }
40
41 @Override
42 public QueryResult query(QueryParams params) throws QueryFailureException, TException {
43 int offset = 0;
44 int length = 10;
45 Map<String,String> map = new HashMap<>();
46 Iterator<String> iter = params.getParamListIterator();
47 while(iter.hasNext()) {
48 String kv = iter.next();
49 if(kv != null) {
50 String[] items = kv.split("=");
51 if(items.length == 2) {
52 String key = items[0].trim();
53 String value = items[1].trim();
54 map.put(key, value);
55 if(key.equals(CommonParams.START)) {
56 offset = Integer.parseInt(value);
57 }
58 if(key.equals(CommonParams.ROWS)) {
59 length = Integer.parseInt(value);
60 }
61 }
62 }
63 }
64 map.put("collection", params.getTable());
65 map.put("wt", writerType);
66 LOG.info("Solr params: " + map);
67
68 // query using Solr
69 QueryResponse response = null;
70 SolrParams solrParams = new MapSolrParams(map);
71 try {
72 response = solrServer.query(solrParams);
73 } catch (SolrServerException e) {
74 LOG.error("Failed to query solr server: ", e);
75 throw new QueryFailureException(e.toString());
76 }
77
78 // process result
79 QueryResult result = new QueryResult();
80 result.setOffset(offset);
81 result.setLength(length);
82 if(response != null) {
83 result.setResults(ResultUtils.getJSONResults(response));
84 }
85 return result;
86 }
87
88 @Override
89 public void close() throws IOException {
90 solrServer.shutdown();
91 }
92
93 }

为简单起见,上面只是使用了一个CloudSolrServer客户端来连接Solr服务器集群(通过ZooKeeper集群)。在query方法中,首先解析查询参数数据,然后构建成Solr查询支持的参数格式,提交到

Solr查询服务器集群,然后等待返回结果QueryResponse ,接着从返回的QueryResponse对象中提取查询命中的结果文档集合,然后转换成Thrift服务定义中满足的返回结果对象形式,通过下面的累

ResultUtils类来实现转换操作,getJSONResults(response)实现如下所示:

01 private static final String KEY_VERSION = "_version_";
02 private static final DateFormat DF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
03
04 public static List<String> getJSONResults(QueryResponse response) {
05 ListIterator<SolrDocument> iter = response.getResults().listIterator();
06 List<String> resultDocs = new ArrayList<String>();
07 while(iter.hasNext()) {
08 SolrDocument doc = iter.next();
09 JSONObject jDoc = new JSONObject();
10 Set<String> ks = doc.keySet();
11 if(ks.contains(KEY_VERSION)) {
12 ks.remove(KEY_VERSION);
13 }
14 for(String key : ks) {
15 Object v = doc.getFieldValue(key);
16 if(v instanceof Date) {
17 jDoc.put(key, DF.format((Date) v));
18 continue;
19 }
20 jDoc.put(key, v);
21 }
22 resultDocs.add(jDoc.toString());
23 }
24 return resultDocs;
25 }

一条结果,构建一个JSON对象,返回一个JSON对象列表。这样,Solr查询服务的Thrift服务就实现了。

  • SQL查询服务

基于关系数据库的SQL查询就比较容易了,我们简单地使用JDBC来直接进行。我们基于MysQL数据库,实现SQL查询的JDBC配置文件内容,如下所示:

1 jdbc.jdbcUrl=jdbc:mysql://localhost:3306/wordpress?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true
2 jdbc.driverClass=com.mysql.jdbc.Driver
3 jdbc.user=shirdrn
4 jdbc.password=shiyanjun

Thrift查询服务实现类为SQLQueryService,实现代码,如下所示:

01 package org.shirdrn.queryproxy.thrift.service.sql;
02
03 import java.io.IOException;
04 import java.sql.Connection;
05 import java.sql.DriverManager;
06 import java.sql.ResultSet;
07 import java.sql.SQLException;
08 import java.sql.Statement;
09
10 import org.apache.commons.logging.Log;
11 import org.apache.commons.logging.LogFactory;
12 import org.apache.thrift.TException;
13 import org.shirdrn.queryproxy.common.Configurable;
14 import org.shirdrn.queryproxy.common.ConfiguredQueryService;
15 import org.shirdrn.queryproxy.thrift.protocol.QueryFailureException;
16 import org.shirdrn.queryproxy.thrift.protocol.QueryParams;
17 import org.shirdrn.queryproxy.thrift.protocol.QueryResult;
18 import org.shirdrn.queryproxy.utils.PropertiesConfig;
19 import org.shirdrn.queryproxy.utils.ResultUtils;
20
21 public class SQLQueryService extends ConfiguredQueryService {
22
23 private static final Log LOG = LogFactory.getLog(SQLQueryService.class);
24 private static String JDBC_PROPERTIES = "jdbc.properties";
25 Configurable jdbcConf;
26 private String jdbcUrl;
27 private String user;
28 private String password;
29 Connection connection;
30
31 public SQLQueryService(Configurable context) {
32 super(context);
33 jdbcConf = new PropertiesConfig(JDBC_PROPERTIES);
34 String driverClass = jdbcConf.get("jdbc.driverClass");
35 try {
36 Class.forName(driverClass);
37 jdbcUrl = jdbcConf.get("jdbc.jdbcUrl");
38 user = jdbcConf.get("jdbc.user");
39 password = jdbcConf.get("jdbc.password");
40 } catch (ClassNotFoundException e) {
41 throw new RuntimeException(e);
42 } finally {
43 LOG.info("JDBC: driver=" + driverClass + ", url=" + jdbcUrl + ", user="+ user + ", password=******");
44 }
45 }
46
47 @Override
48 public QueryResult query(QueryParams params) throws QueryFailureException, TException {
49 QueryResult result = new QueryResult();
50 if(!params.getParamList().isEmpty()) {
51 // get SQL statement
52 String sql = params.getParamList().remove(0);
53 Connection conn = getConnection();
54 Statement stmt = null;
55 ResultSet rs = null;
56 try {
57 stmt = conn.createStatement();
58 rs = stmt.executeQuery(sql);
59 result.setResults(ResultUtils.getJSONResults(rs, params.getParamList()));
60 } catch (SQLException e) {
61 throw new QueryFailureException(e.toString());
62 }
63 }
64 return result;
65 }
66
67 private synchronized final Connection getConnection() {
68 try {
69 if(connection == null || connection.isClosed()) {
70 if(user != null) {
71 connection = DriverManager.getConnection(jdbcUrl, user, password);
72 } else {
73 connection = DriverManager.getConnection(jdbcUrl);
74 }
75 }
76 } catch (SQLException e) {
77 e.printStackTrace();
78 }
79 return connection;
80 }
81
82 @Override
83 public void close() throws IOException {
84 if(connection != null) {
85 try {
86 connection.close();
87 } catch (SQLException e) {
88 throw new IOException(e);
89 }
90 }
91 }
92 }

上面也使用了ResultUtils类实现了结果的转换方法,实现如下所示:

01 public static List<String> getJSONResults(ResultSet rs, List<String> fields) throwsSQLException {
02 List<String> results = new ArrayList<String>();
03 while(rs.next()) {
04 JSONObject jo = new JSONObject();
05 for(String field : fields) {
06 jo.put(field, rs.getObject(field).toString());
07 }
08 results.add(jo.toString());
09 }
10 return results;
11 }

返回一组JSON对象,客户端只需要单个解析每一个对象即可。这样基于SQL的Thrift服务也实现完成了。

上面的两类服务都已经实现了,我们最终还要组合成一个服务,然后通过Thrift协议暴露给外部。组合服务的实现类同样实现了org.shirdrn.queryproxy.thrift.protocol.QueryProxyService.Iface

接口,实现代码如下所示:

01 package org.shirdrn.queryproxy;
02
03 import java.io.Closeable;
04 import java.io.IOException;
05 import java.util.HashMap;
06 import java.util.Map;
07
08 import org.apache.commons.logging.Log;
09 import org.apache.commons.logging.LogFactory;
10 import org.apache.thrift.TException;
11 import org.shirdrn.queryproxy.common.Configurable;
12 import org.shirdrn.queryproxy.thrift.protocol.QueryFailureException;
13 import org.shirdrn.queryproxy.thrift.protocol.QueryParams;
14 import org.shirdrn.queryproxy.thrift.protocol.QueryProxyService.Iface;
15 import org.shirdrn.queryproxy.thrift.protocol.QueryResult;
16 import org.shirdrn.queryproxy.thrift.protocol.QueryType;
17 import org.shirdrn.queryproxy.utils.ReflectionUtils;
18
19 public class ThriftQueryService implements Iface {
20
21 private static final Log LOG = LogFactory.getLog(ThriftQueryService.class);
22 private Configurable context;
23 static Map<QueryType, Iface> SERVICES = new HashMap<QueryType, Iface>(0);
24 static {
25 Runtime.getRuntime().addShutdownHook(new Thread() {
26 @Override
27 public void run() {
28 for(Map.Entry<QueryType, Iface> entry : SERVICES.entrySet()) {
29 try {
30 ((Closeable) entry.getValue()).close();
31 } catch (IOException e) {
32 e.printStackTrace();
33 } finally {
34 LOG.info("Closed: type=" + entry.getKey() + ", service="+ entry.getValue());
35 }
36 }
37 }
38 });
39 }
40
41 @Override
42 public QueryResult query(QueryParams params) throws QueryFailureException, TException {
43 int type = params.getType().getValue();
44 Iface service = SERVICES.get(QueryType.findByValue(type));
45 if(service == null) {
46 throw new QueryFailureException("Unknown service: type=" + params.getType().name());
47 }
48 return service.query(params);
49 }
50
51 public void register(QueryType queryType, Class<?> serviceClass) {
52 Iface service = (Iface) ReflectionUtils.getInstance(serviceClass, newObject[] {context});
53 SERVICES.put(queryType, service);
54 }
55
56 public void setContext(Configurable context) {
57 this.context = context;
58 }
59 }

上面实现,就是通过一个注册方法,将前面实现的两类服务注册管理起来。
下面,我们看一下,基于Thrift的库将组合后的服务以Thrift协议暴露给外部,实际上就是单独启动了一个Thrift服务(关联一个端口),实现的Thrift服务器代码,如下所示:

01 package org.shirdrn.queryproxy;
02
03 import org.apache.commons.logging.Log;
04 import org.apache.commons.logging.LogFactory;
05 import org.apache.thrift.TProcessor;
06 import org.apache.thrift.protocol.TBinaryProtocol;
07 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
08 import org.apache.thrift.server.TServer;
09 import org.apache.thrift.server.TThreadPoolServer;
10 import org.apache.thrift.server.TThreadPoolServer.Args;
11 import org.apache.thrift.transport.TServerSocket;
12 import org.apache.thrift.transport.TTransportException;
13 import org.shirdrn.queryproxy.common.Configurable;
14 import org.shirdrn.queryproxy.thrift.protocol.QueryType;
15 import org.shirdrn.queryproxy.thrift.protocol.QueryProxyService.Iface;
16 import org.shirdrn.queryproxy.thrift.protocol.QueryProxyService.Processor;