Client
Client是一个类,通过这个类可以实现对ES集群的各种操作:Index, Get, Delete , Search,以及对ES集群的管理任务。
Client的构造需要基于TransportClient
TransportClient
TransportClient可以远程连接ES集群,通过一个传输模块,但是它不真正的连接到集群,只是获取集群的一个或多个初始传输地址,在每次请求动作时,才真正连接到ES集群。
Settings
Settings类主要是在启动Client之前,配置一些属性参数,主要配置集群名称cluster.name,还有其他参数:
client.transport.sniff 是否为传输client添加嗅探功能
client.transport.ignore_cluster_name 设为true,忽略连接节点的集群名称验证
client.transport.ping_timeout 设置ping节点时的时间限,默认5s
client.transport.nodes_sampler_interval 设置sample/ping nodes listed 间隔时间,默认5s
1
2
3
4
5
6
7
8
9
10
11
12
13
|
//通过Settings类设置属性参数
Settings settings = Settings.settingsBuilder().put(
"cluster.name"
,
"index-name"
).build();
//启动Client
Client client = TransportClient.builder().settings(settings).build().
addTransportAddress(
new
InetSocketTransportAddress(InetAddress.getByName(
"192.168.xxx.xxx"
),
9300
));
//如果不需要设置参数,直接如下
/*Client client = TransportClient.builder().build().
addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.xxx.xxx"),9300));*/
//关闭Clinet
client.close();
|
Document API
主要分为以下类:Index API , Get API , Delete API , Update API, Multi Get API, Bulk API
es中的增删改查
Index API可以索引一个典型的JSON文档到指定的索引中,并且可以使它可以检索。
产生JSON
JSON产生可以有以下几种方式:
手动拼接一个JSON字符串
使用Map
使用第三方库,比如Jackson
使用内置的XContentFactory.jsonBuilder()
每种类型都会转换为byte[],因此如果对象已经是这种形式,可以直接使用,jsonBuilder是一个高度优化了的JSON产生器,它直接构造byte[]
通过下边的代码讲解四种方法:index-api, get-api, delete-api, update-api
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
/**
* es-api的方法学习:
* 1.prepareIndex方法:索引数据到ElasticSearch
* 2.prepareGet方法:获取信息
* 3.prepareDelete方法:删除信息
* 4.update方法:更新信息
* 4.1 upsert:在使用update方法时:
* a:针对文档不存在的情况时,做出index数据的操作,update无效;
* b:如果文档存在,那么index数据操作无效,update有效;
*/
public
static
void
main(String[] args)
throws
IOException, InterruptedException, ExecutionException {
//通过Settings类设置属性参数
Settings settings = Settings.settingsBuilder().put(
"cluster.name"
,
"myApp"
).build();
//启动Client
Client client =
null
;
try
{
client = TransportClient.builder().settings(settings).build().
addTransportAddress(
new
InetSocketTransportAddress(InetAddress.getByName(
"101.200.124.27"
),
9300
));
}
catch
(UnknownHostException e) {
e.printStackTrace();
}
//执行操作
SimpleDateFormat df =
new
SimpleDateFormat(
"yyyy-MM-dd"
);
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder()
.startObject()
.field(
"user"
,
"yuchen"
)
.field(
"interest"
,
"reading book"
)
.field(
"insert_time"
,df.format(
new
Date()))
.endObject();
//1.prepareIndex方法:索引数据到ElasticSearch
IndexResponse response = client.prepareIndex(
"index-test"
,
"weibo"
,
"4"
)
.setSource(jsonBuilder)
.get();
String _index = response.getIndex();
String _type = response.getType();
String _id = response.getId();
long
_version = response.getVersion();
boolean
created = response.isCreated();
System.out.println(_index+
" "
+_type+
" "
+_id+
" "
+_version+
" "
+created);
//2.prepareGet方法:获取信息
GetResponse getResponse = client.prepareGet(
"index-test"
,
"weibo"
,
"1"
).get();
System.out.println(getResponse.getSourceAsString());
//3.prepareDelete方法:删除信息
DeleteResponse deleteResponse = client.prepareDelete(
"index-test"
,
"weibo"
,
"4"
).get();
System.out.println(deleteResponse.isFound());
//4.update方法:更新信息
UpdateRequest updateRequest =
new
UpdateRequest();
updateRequest.index(
"index-test"
);
updateRequest.type(
"weibo"
);
updateRequest.id(
"1"
);
updateRequest.doc(XContentFactory.jsonBuilder().startObject().field(
"interest"
,
"music"
).endObject());
UpdateResponse updateResponse = client.update(updateRequest).get();
System.out.println(updateResponse.isCreated());
//update方法: 可以为已有的文档添加新的字段
UpdateResponse updateResponse2 = client.prepareUpdate(
"index-test"
,
"weibo"
,
"1"
)
.setDoc(XContentFactory.jsonBuilder()
.startObject()
.field(
"interest2"
,
"reading"
)
.endObject()).get();
System.out.println(updateResponse2.isCreated());
//4.1 upsert:在使用update方法时,a:针对文档不存在的情况时,做出index数据的操作,update无效;
// b:如果文档存在,那么index数据操作无效,update有效;
//先构建一个IndexRequest
IndexRequest indexRequest =
new
IndexRequest(
"index-test"
,
"weibo"
,
"14"
);
indexRequest.source(XContentFactory.jsonBuilder()
.startObject()
.field(
"user"
,
"yuchen2"
)
.field(
"interest"
,
"eating"
)
.field(
"insert_time"
,df.format(
new
Date()))
.endObject());
//再构建一个UpdateRequest,并用IndexRequest关联
UpdateRequest updateRequest3 =
new
UpdateRequest(
"index-test"
,
"weibo"
,
"14"
);
updateRequest3.doc(XContentFactory.jsonBuilder()
.startObject()
.field(
"interest2"
,
"love"
)
.endObject()
).upsert(indexRequest);
client.update(updateRequest3).get();
if
(client !=
null
){
client.close();
}
}
|
批量操作
Multi Get Api 和 Bulk Api可进行批量的增删改查
使用Multi Get Api 批量获取:
1
2
3
4
5
6
7
8
9
10
|
//1. Muti-get Api
//可以指定单个id,也在index,type下指定一个id-list;也可以指定别的index/type
MultiGetResponse multiGetResponse = client.prepareMultiGet()
.add(
"index-test"
,
"weibo"
,
"1"
)
//指定单个id
.add(
"index-test"
,
"weibo"
,
"11"
,
"13"
,
"14"
)
//指定一个id-list
.add(
"index-other"
,
"news"
,
"1"
,
"3"
).get();
//指定别的index/type
for
(MultiGetItemResponse item:multiGetResponse){
GetResponse response = item.getResponse();
System.out.println(response.getSourceAsString());
}
|
Bulk Api批量增加:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
//2.Bulk Api:可以进行批量index和批量删除操作
//2.1批量增加
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareIndex(
"index-test"
,
"weibo"
,
"20"
)
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field(
"user"
,
"yuchen20"
)
.field(
"postDate"
,
new
Date())
.field(
"message"
,
"trying out Elasticsearch"
)
.endObject()
)
);
bulkRequest.add(client.prepareIndex(
"index-test"
,
"weibo"
,
"21"
)
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field(
"user"
,
"yuchen21"
)
.field(
"postDate"
,
new
Date())
.field(
"message"
,
"trying out Elasticsearch"
)
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.get();
if
(bulkResponse.hasFailures()){
//...
}
|
Bulk Api批量删除:
1
2
3
4
5
6
7
8
9
10
11
|
//2.2批量删除
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareDelete(
"index-test"
,
"weibo"
,
"20"
)
);
bulkRequest.add(client.prepareDelete(
"index-test"
,
"weibo"
,
"21"
)
);
BulkResponse bulkResponse = bulkRequest.get();
if
(bulkResponse.hasFailures()){
System.out.println(
"bulk error:"
+bulkResponse.buildFailureMessage());
}
|
Bulk Api 批量更新
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
//2.3批量更新
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareUpdate(
"index-test"
,
"weibo"
,
"11"
).setDoc(XContentFactory
.jsonBuilder().startObject()
.field(
"country"
,
"China"
)
//新添加字段
.endObject()
)
);
bulkRequest.add(client.prepareUpdate(
"index-test"
,
"weibo"
,
"13"
).setDoc(XContentFactory
.jsonBuilder().startObject()
.field(
"user"
,
"yuchen13"
)
//更新字段
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.get();
if
(bulkResponse.hasFailures()){
System.out.println(
"bulk error:"
+bulkResponse.buildFailureMessage());
}
|
BulkProcessor设置批量请求的属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
//BulkProcessor
BulkProcessor bulkProcessor = BulkProcessor.builder(client,
new
BulkProcessor.Listener() {
@Override
public
void
beforeBulk(
long
arg0, BulkRequest arg1) {
//批量执行前做的事情
System.out.println(
"bulk api action starting..."
);
}
@Override
public
void
afterBulk(
long
arg0, BulkRequest arg1, Throwable arg2) {
System.out.println(
"exception:bukl api action ending...:"
+arg2.getMessage());
}
@Override
public
void
afterBulk(
long
arg0, BulkRequest arg1, BulkResponse arg2) {
//正常执行完毕后...
System.out.println(
"normal:bukl api action ending..."
);
}
})
//设置多种条件,对批量操作进行限制,达到限制中的任何一种触发请求的批量提交
.setBulkActions(
1000
)
//设置批量操作一次性执行的action个数,根据请求个数批量提交
//.setBulkSize(new ByteSizeValue(1,ByteSizeUnit.KB))//设置批量提交请求的大小允许的最大值
//.setFlushInterval(TimeValue.timeValueMillis(100))//根据时间周期批量提交请求
//.setConcurrentRequests(1)//设置允许并发请求的数量
//设置请求失败时的补偿措施,重复请求3次
//.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
for
(
int
i =
0
;i<
100000
;i++){
bulkProcessor.add(
new
IndexRequest(
"index-test"
,
"weibo2"
,
""
+i).source(
XContentFactory
.jsonBuilder()
.startObject()
.field(
"name"
,
"yuchen"
+i)
.field(
"interest"
,
"love"
+i)
.endObject()));
}
bulkProcessor.awaitClose(
5
, TimeUnit.MINUTES);
//释放bulkProcessor资源
System.out.println(
"load succeed!"
);
|
默认的参数:
-
sets bulkActions to
1000
-
sets bulkSize to
5mb
-
does not set flushInterval
-
sets concurrentRequests to 1
-
sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.
参考地址:
http://blog.csdn.net/wuyzhen_csdn/article/details/52381697
本文转自yunlielai51CTO博客,原文链接:http://blog.51cto.com/4925054/2084251,如需转载请自行联系原作者