ElasticSearch API for JAVA 学习笔记

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介:

Client

Client是一个类,通过这个类可以实现对ES集群的各种操作:Index, Get, Delete , Search,以及对ES集群的管理任务。

Client的构造需要基于TransportClient

TransportClient

TransportClient可以远程连接ES集群,通过一个传输模块,但是它不真正的连接到集群,只是获取集群的一个或多个初始传输地址,在每次请求动作时,才真正连接到ES集群。

Settings

Settings类主要是在启动Client之前,配置一些属性参数,主要配置集群名称cluster.name,还有其他参数:

client.transport.sniff  是否为传输client添加嗅探功能

client.transport.ignore_cluster_name 设为true,忽略连接节点的集群名称验证

client.transport.ping_timeout 设置ping节点时的时间限,默认5s

client.transport.nodes_sampler_interval 设置sample/ping nodes listed 间隔时间,默认5s

1
2
3
4
5
6
7
8
9
10
11
12
13
         //通过Settings类设置属性参数
         Settings settings = Settings.settingsBuilder().put( "cluster.name" , "index-name" ).build();
         
         //启动Client
         Client client = TransportClient.builder().settings(settings).build().
                 addTransportAddress( new  InetSocketTransportAddress(InetAddress.getByName( "192.168.xxx.xxx" ), 9300 ));
         
         //如果不需要设置参数,直接如下
         /*Client client = TransportClient.builder().build().
                 addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.xxx.xxx"),9300));*/
         
         //关闭Clinet
         client.close();

Document API

主要分为以下类:Index API , Get API , Delete API , Update API, Multi Get API, Bulk API

es中的增删改查

Index API可以索引一个典型的JSON文档到指定的索引中,并且可以使它可以检索。

产生JSON

JSON产生可以有以下几种方式:

手动拼接一个JSON字符串

使用Map

使用第三方库,比如Jackson

使用内置的XContentFactory.jsonBuilder()

每种类型都会转换为byte[],因此如果对象已经是这种形式,可以直接使用,jsonBuilder是一个高度优化了的JSON产生器,它直接构造byte[]

通过下边的代码讲解四种方法:index-api, get-api, delete-api, update-api

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
      * es-api的方法学习:
      * 1.prepareIndex方法:索引数据到ElasticSearch
      * 2.prepareGet方法:获取信息
      * 3.prepareDelete方法:删除信息
      * 4.update方法:更新信息
      *   4.1 upsert:在使用update方法时:
      *       a:针对文档不存在的情况时,做出index数据的操作,update无效;
      *        b:如果文档存在,那么index数据操作无效,update有效;
      */
     public  static  void  main(String[] args)  throws  IOException, InterruptedException, ExecutionException {
         //通过Settings类设置属性参数  
         Settings settings = Settings.settingsBuilder().put( "cluster.name" , "myApp" ).build();  
           
         //启动Client  
         Client client =  null ;
         try  {
             client = TransportClient.builder().settings(settings).build().  
                     addTransportAddress( new  InetSocketTransportAddress(InetAddress.getByName( "101.200.124.27" ), 9300 ));
         catch  (UnknownHostException e) {
             e.printStackTrace();
         }  
         //执行操作
         SimpleDateFormat df =  new  SimpleDateFormat( "yyyy-MM-dd" );
         XContentBuilder jsonBuilder = XContentFactory.jsonBuilder()
                 .startObject()
                 .field( "user" , "yuchen" )
                 .field( "interest" , "reading book" )
                 .field( "insert_time" ,df.format( new  Date()))
                 .endObject();
         //1.prepareIndex方法:索引数据到ElasticSearch
         IndexResponse response = client.prepareIndex( "index-test" , "weibo" , "4" )
             .setSource(jsonBuilder)
             .get();
         
         String _index = response.getIndex();
         String _type = response.getType();
         String _id = response.getId();
         long  _version = response.getVersion();
         boolean  created = response.isCreated();
         System.out.println(_index+ " " +_type+ " " +_id+ " " +_version+ " " +created);
         
         //2.prepareGet方法:获取信息
         GetResponse getResponse = client.prepareGet( "index-test" , "weibo" , "1" ).get();
         System.out.println(getResponse.getSourceAsString());
         
         //3.prepareDelete方法:删除信息
         DeleteResponse deleteResponse = client.prepareDelete( "index-test" , "weibo" , "4" ).get();
         System.out.println(deleteResponse.isFound());
         
         //4.update方法:更新信息
         UpdateRequest updateRequest =  new  UpdateRequest();
         updateRequest.index( "index-test" );
         updateRequest.type( "weibo" );
         updateRequest.id( "1" );
         updateRequest.doc(XContentFactory.jsonBuilder().startObject().field( "interest" , "music" ).endObject());
         UpdateResponse updateResponse = client.update(updateRequest).get();
         System.out.println(updateResponse.isCreated());
         //update方法: 可以为已有的文档添加新的字段
         UpdateResponse updateResponse2 = client.prepareUpdate( "index-test" "weibo" "1" )
             .setDoc(XContentFactory.jsonBuilder()
                 .startObject()
                 .field( "interest2" , "reading" )
                 .endObject()).get();
         System.out.println(updateResponse2.isCreated());
         //4.1 upsert:在使用update方法时,a:针对文档不存在的情况时,做出index数据的操作,update无效;
         //                          b:如果文档存在,那么index数据操作无效,update有效;
         //先构建一个IndexRequest
         IndexRequest indexRequest =  new  IndexRequest( "index-test" , "weibo" , "14" );
         indexRequest.source(XContentFactory.jsonBuilder()
                 .startObject()
                 .field( "user" , "yuchen2" )
                 .field( "interest" , "eating" )
                 .field( "insert_time" ,df.format( new  Date()))
                 .endObject());
         //再构建一个UpdateRequest,并用IndexRequest关联
         UpdateRequest updateRequest3 =  new  UpdateRequest( "index-test" , "weibo" , "14" );
         updateRequest3.doc(XContentFactory.jsonBuilder()
                 .startObject()
                 .field( "interest2" , "love" )
                 .endObject()
                 ).upsert(indexRequest);
         client.update(updateRequest3).get();
         
         if (client !=  null ){
             client.close();  
         }
     }

批量操作

Multi Get Api 和 Bulk Api可进行批量的增删改查

使用Multi Get Api 批量获取:

1
2
3
4
5
6
7
8
9
10
         //1. Muti-get Api
         //可以指定单个id,也在index,type下指定一个id-list;也可以指定别的index/type
         MultiGetResponse multiGetResponse = client.prepareMultiGet()
                 .add( "index-test" , "weibo" , "1" ) //指定单个id
                 .add( "index-test" , "weibo" , "11" , "13" , "14" ) //指定一个id-list
                 .add( "index-other" , "news" , "1" , "3" ).get(); //指定别的index/type
         for (MultiGetItemResponse item:multiGetResponse){
             GetResponse response = item.getResponse();
             System.out.println(response.getSourceAsString());
         }

Bulk Api批量增加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
         //2.Bulk Api:可以进行批量index和批量删除操作
         //2.1批量增加
         BulkRequestBuilder bulkRequest = client.prepareBulk();
         bulkRequest.add(client.prepareIndex( "index-test" "weibo" "20" )
             .setSource(XContentFactory.jsonBuilder()
                     .startObject()
                         .field( "user" "yuchen20" )
                         .field( "postDate" new  Date())
                         .field( "message" "trying out Elasticsearch" )
                     .endObject()
                   )
           );
         bulkRequest.add(client.prepareIndex( "index-test" "weibo" "21" )
                 .setSource(XContentFactory.jsonBuilder()
                         .startObject()
                             .field( "user" "yuchen21" )
                             .field( "postDate" new  Date())
                             .field( "message" "trying out Elasticsearch" )
                         .endObject()
                       )
               );
         
         BulkResponse bulkResponse = bulkRequest.get();
         if (bulkResponse.hasFailures()){
             //...
         }

Bulk Api批量删除:

1
2
3
4
5
6
7
8
9
10
11
         //2.2批量删除
         BulkRequestBuilder bulkRequest = client.prepareBulk();
         bulkRequest.add(client.prepareDelete( "index-test" "weibo" "20" )
           );
         bulkRequest.add(client.prepareDelete( "index-test" "weibo" "21" )
               );
         
         BulkResponse bulkResponse = bulkRequest.get();
         if (bulkResponse.hasFailures()){
             System.out.println( "bulk error:" +bulkResponse.buildFailureMessage());
         }

Bulk Api 批量更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
         //2.3批量更新
         BulkRequestBuilder bulkRequest = client.prepareBulk();
         bulkRequest.add(client.prepareUpdate( "index-test" "weibo" "11" ).setDoc(XContentFactory
                 .jsonBuilder().startObject()
                 .field( "country" , "China" ) //新添加字段
                 .endObject()
                 )
           );
         bulkRequest.add(client.prepareUpdate( "index-test" "weibo" "13" ).setDoc(XContentFactory
                 .jsonBuilder().startObject()
                 .field( "user" , "yuchen13" ) //更新字段
                 .endObject()
                 )
               );
         
         BulkResponse bulkResponse = bulkRequest.get();
         if (bulkResponse.hasFailures()){
             System.out.println( "bulk error:" +bulkResponse.buildFailureMessage());
         }

BulkProcessor设置批量请求的属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
         //BulkProcessor
         BulkProcessor bulkProcessor = BulkProcessor.builder(client,  new  BulkProcessor.Listener() {
             @Override
             public  void  beforeBulk( long  arg0, BulkRequest arg1) {
                 //批量执行前做的事情
                 System.out.println( "bulk api action starting..." );
             }
             @Override
             public  void  afterBulk( long  arg0, BulkRequest arg1, Throwable arg2) {
                 System.out.println( "exception:bukl api action ending...:" +arg2.getMessage());
             }
             @Override
             public  void  afterBulk( long  arg0, BulkRequest arg1, BulkResponse arg2) {
                 //正常执行完毕后...
                 System.out.println( "normal:bukl api action ending..." );
             }
         })
         //设置多种条件,对批量操作进行限制,达到限制中的任何一种触发请求的批量提交
         .setBulkActions( 1000 ) //设置批量操作一次性执行的action个数,根据请求个数批量提交
         //.setBulkSize(new ByteSizeValue(1,ByteSizeUnit.KB))//设置批量提交请求的大小允许的最大值
         //.setFlushInterval(TimeValue.timeValueMillis(100))//根据时间周期批量提交请求
         //.setConcurrentRequests(1)//设置允许并发请求的数量
         //设置请求失败时的补偿措施,重复请求3次
         //.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
         .build();
         for ( int  i = 0 ;i< 100000 ;i++){
             bulkProcessor.add( new  IndexRequest( "index-test" , "weibo2" , "" +i).source(
                     XContentFactory
                     .jsonBuilder()
                     .startObject()
                     .field( "name" , "yuchen" +i)
                     .field( "interest" , "love" +i)
                     .endObject()));
         }
         bulkProcessor.awaitClose( 5 , TimeUnit.MINUTES); //释放bulkProcessor资源
         System.out.println( "load succeed!" );


默认的参数:

  • sets bulkActions to 1000

  • sets bulkSize to 5mb

  • does not set flushInterval

  • sets concurrentRequests to 1

  • sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.


参考地址:

http://blog.csdn.net/wuyzhen_csdn/article/details/52381697











本文转自yunlielai51CTO博客,原文链接:http://blog.51cto.com/4925054/2084251,如需转载请自行联系原作者


相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
1天前
|
安全 Java API
java借助代理ip,解决访问api频繁导致ip被禁的问题
java借助代理ip,解决访问api频繁导致ip被禁的问题
|
3天前
|
存储 安全 Java
说说Java 8 引入的Stream API
说说Java 8 引入的Stream API
8 0
|
3天前
|
分布式计算 Java API
Java 8新特性之Lambda表达式与Stream API
【4月更文挑战第16天】本文将介绍Java 8中的两个重要新特性:Lambda表达式和Stream API。Lambda表达式是Java 8中引入的一种新的编程语法,它允许我们将函数作为参数传递给其他方法,从而使代码更加简洁、易读。Stream API是Java 8中引入的一种新的数据处理方式,它允许我们以声明式的方式处理数据,从而使代码更加简洁、高效。本文将通过实例代码详细讲解这两个新特性的使用方法和优势。
|
4天前
|
安全 Java API
RESTful API设计与实现:Java后台开发指南
【4月更文挑战第15天】本文介绍了如何使用Java开发RESTful API,重点是Spring Boot框架和Spring MVC。遵循无状态、统一接口、资源标识和JSON数据格式的设计原则,通过创建控制器处理HTTP请求,如示例中的用户管理操作。此外,文章还提及数据绑定、验证、异常处理和跨域支持。最后,提出了版本控制、安全性、文档测试以及限流和缓存的最佳实践,以确保API的稳定、安全和高效。
|
7天前
|
存储 Java 关系型数据库
掌握Java 8 Stream API的艺术:详解流式编程(一)
掌握Java 8 Stream API的艺术:详解流式编程
37 1
|
10天前
|
Java Maven 索引
java 链接Elasticsearch
java 链接Elasticsearch
|
16天前
|
前端开发 Java API
构建RESTful API:Java中的RESTful服务开发
【4月更文挑战第3天】本文介绍了在Java环境中构建RESTful API的重要性及方法。遵循REST原则,利用HTTP方法处理资源,实现CRUD操作。在Java中,常用框架如Spring MVC简化了RESTful服务开发,包括定义资源、设计表示层、实现CRUD、考虑安全性、文档和测试。通过Spring MVC示例展示了创建RESTful服务的步骤,强调了其在现代Web服务开发中的关键角色,有助于提升互操作性和用户体验。
构建RESTful API:Java中的RESTful服务开发
|
23天前
|
Java
elasticsearch使用java程序添加删除修改
elasticsearch使用java程序添加删除修改
9 0
|
26天前
|
Java 数据库连接 API
Java 学习路线:基础知识、数据类型、条件语句、函数、循环、异常处理、数据结构、面向对象编程、包、文件和 API
Java 是一种广泛使用的、面向对象的编程语言,始于1995年,以其跨平台性、安全性和可靠性著称,应用于从移动设备到数据中心的各种场景。基础概念包括变量(如局部、实例和静态变量)、数据类型(原始和非原始)、条件语句(if、else、switch等)、函数、循环、异常处理、数据结构(如数组、链表)和面向对象编程(类、接口、继承等)。深入学习还包括包、内存管理、集合框架、序列化、网络套接字、泛型、流、JVM、垃圾回收和线程。构建工具如Gradle、Maven和Ant简化了开发流程,Web框架如Spring和Spring Boot支持Web应用开发。ORM工具如JPA、Hibernate处理对象与数
90 3
|
27天前
|
分布式计算 Java 程序员
Java 8新特性之Lambda表达式与Stream API
本文将详细介绍Java 8中的两个重要新特性:Lambda表达式和Stream API。Lambda表达式是Java 8中引入的一种简洁、匿名的函数表示方法,它允许我们将函数作为参数传递给其他方法。而Stream API则是一种新的数据处理方式,它允许我们以声明式的方式处理数据,从而提高代码的可读性和可维护性。通过本文的学习,你将能够掌握Lambda表达式和Stream API的基本用法,以及如何在项目中应用这两个新特性。
30 10