环境:es6.4.0 springBoot1.5.9 mysql8.X logstash6.4.0 kibana6.4.0 redhat6
开发工具:eclipse Oxygen.3a Release (4.7.3a)
开发步骤:
1.新建springBoot工程,过程略,网上有好多教程大家可以自己尝试
2.引入logstash6.4依赖
<!--elasticsearch 开始 --> 2 <dependency> 3 <groupId>org.elasticsearch</groupId> 4 <artifactId>elasticsearch</artifactId> 5 <version>${elasticsearch.version}</version> 6 </dependency> 7 <dependency> 8 <groupId>org.elasticsearch.client</groupId> 9 <artifactId>elasticsearch-rest-high-level-client</artifactId> 10 <version>${elasticsearch.version}</version> 11 </dependency> 12 <dependency> 13 <groupId>org.apache.logging.log4j</groupId> 14 <artifactId>log4j-api</artifactId> 15 <version>2.8.2</version> 16 </dependency> 17 <dependency> 18 <groupId>org.apache.logging.log4j</groupId> 19 <artifactId>log4j-core</artifactId> 20 <version>2.8.2</version> 21 </dependency> 22 <dependency> 23 <groupId>org.elasticsearch.client</groupId> 24 <artifactId>elasticsearch-rest-client</artifactId> 25 <version>${elasticsearch.version}</version> 26 </dependency> 27 <dependency> 28 <groupId>org.elasticsearch.client</groupId> 29 <artifactId>elasticsearch-rest-client-sniffer</artifactId> 30 <version>${elasticsearch.version}</version> 31 </dependency> 32 <!--elasticsearch 结束 -->
3.yml配置文件
server: port: 8090 spring: application: name: Elasticsearch data: elasticsearch: cluster-name: elasticsearch-application cluster-nodes: 192.168.177.128:9200 jackson: default-property-inclusion: non_null logging: level: root: info
4.测试类
1 import java.io.IOException; 2 3 import org.elasticsearch.action.get.GetRequest; 4 import org.elasticsearch.action.get.GetResponse; 5 import org.elasticsearch.action.search.ClearScrollRequest; 6 import org.elasticsearch.action.search.ClearScrollResponse; 7 import org.elasticsearch.action.search.SearchRequest; 8 import org.elasticsearch.action.search.SearchResponse; 9 import org.elasticsearch.action.search.SearchScrollRequest; 10 import org.elasticsearch.client.RequestOptions; 11 import org.elasticsearch.client.RestHighLevelClient; 12 import org.elasticsearch.common.unit.TimeValue; 13 import org.elasticsearch.index.query.BoolQueryBuilder; 14 import org.elasticsearch.index.query.MatchQueryBuilder; 15 import org.elasticsearch.index.query.QueryBuilders; 16 import org.elasticsearch.index.query.WildcardQueryBuilder; 17 import org.elasticsearch.search.Scroll; 18 import org.elasticsearch.search.SearchHit; 19 import org.elasticsearch.search.SearchHits; 20 import org.elasticsearch.search.builder.SearchSourceBuilder; 21 import org.junit.Test; 22 import org.junit.runner.RunWith; 23 import org.springframework.beans.factory.annotation.Autowired; 24 import org.springframework.boot.test.context.SpringBootTest; 25 import org.springframework.test.context.junit4.SpringRunner; 26 27 import com.elasticsearch.Es_App; 28 29 30 /** 31 * @author 33 */ 34 @RunWith(SpringRunner.class) 35 @SpringBootTest(classes = Es_App.class) 36 public class test { 37 @Autowired 38 RestHighLevelClient client; 39 // 这个时间并不需要长到可以处理所有的数据,仅仅需要足够长来处理前一批次的结果。每个 scroll 请求(包含 scroll 参数)设置了一个新的失效时间。 40 final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L)); 41 42 @Test 43 public void getIndexTest() { 44 45 GetRequest request = new GetRequest("db_user", "user", "4"); 46 try { 47 48 GetResponse response = client.get(request); 49 System.out.println(response); 50 } catch (IOException e) { 51 52 e.printStackTrace(); 53 } 54 } 55 83 84 @Test 85 public void wildcard() throws Exception { 86 WildcardQueryBuilder mqb = QueryBuilders.wildcardQuery("username", "z*"); 87 WildcardQueryBuilder mqb1 = QueryBuilders.wildcardQuery("name.keyword", "李*"); 88 BoolQueryBuilder bq = QueryBuilders.boolQuery(); 89 bq.should(mqb); 90 bq.should(mqb1); 91 92 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 93 sourceBuilder.size(300); 94 sourceBuilder.query(bq); 95 sourceBuilder.fetchSource(new String[] { "username", "name" }, null); 96 System.out.println(sourceBuilder.toString()); 97 98 SearchRequest searchRequest = new SearchRequest("db_user").types("user"); 99 searchRequest.source(sourceBuilder); 100 searchRequest.scroll(scroll); 101 102 SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); 103 104 SearchHit[] searchHits = searchResponse.getHits().getHits();106 for (SearchHit searchHit : searchHits) { 107 System.out.println(searchHit.getSourceAsString()); 108 //String sourceAsString = searchHit.getSourceAsString(); 109 } 110 111 // 遍历搜索命中的数据,直到没有数据 112 String scrollId = searchResponse.getScrollId(); 113 while (searchHits != null && searchHits.length > 0) { 114 SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); 115 scrollRequest.scroll(scroll); 116 try { 117 searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT); 118 } catch (IOException e) { 119 e.printStackTrace(); 120 } 121 scrollId = searchResponse.getScrollId(); 122 searchHits = searchResponse.getHits().getHits(); 123 if (searchHits != null && searchHits.length > 0) { 124 125 for (SearchHit searchHit : searchHits) { 126 System.out.println(searchHit.getSourceAsString()); 127 // String sourceAsString = searchHit.getSourceAsString(); 128 } 129 } 130 } 131 // 清除滚屏 132 ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); 133 clearScrollRequest.addScrollId(scrollId);// 也可以选择setScrollIds()将多个scrollId一起使用 134 ClearScrollResponse clearScrollResponse = null; 135 try { 136 clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); 137 } catch (IOException e) { 138 e.printStackTrace(); 139 } 140 boolean succeeded = clearScrollResponse.isSucceeded(); 141 System.out.println("succeeded:" + succeeded); 142 } 143 144 @Test 145 public void test2() { 146 } 147 148 }
关于测试类说明:使用springBoot自动注入需要使用高版本的junit,博主在测试时使用的时4.1.2版本的junit,依赖如下:
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency>
es6中如果需要定义查询语句中的字段的类型,可以使用 字段.类型格式定义,如:username.keyword。
5.配置类:
import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; @Configuration public class ElasticsearchConfiguration implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchConfiguration.class); @Value("${spring.data.elasticsearch.cluster-nodes}") private String clusterNodes; private RestHighLevelClient restHighLevelClient; /** * 控制Bean的实例化过程 * * @return * @throws Exception */ @Override public RestHighLevelClient getObject() throws Exception { return restHighLevelClient; } /** * 获取接口返回的实例的class * * * @return */ @Override public Class<?> getObjectType() { return RestHighLevelClient.class; } @Override public void destroy() throws Exception { try { if (restHighLevelClient != null) { restHighLevelClient.close(); } } catch (final Exception e) { LOG.error("Error closing ElasticSearch client: ", e); } } @Override public boolean isSingleton() { return false; } @Override public void afterPropertiesSet() throws Exception { restHighLevelClient = buildClient(); } private RestHighLevelClient buildClient() { try { restHighLevelClient = new RestHighLevelClient( RestClient.builder( new HttpHost( clusterNodes.split(":")[0], Integer.parseInt(clusterNodes.split(":")[1]), "http"))); } catch (Exception e) { LOG.error(e.getMessage()); } return restHighLevelClient; } }
使用logstash将mysql数据库数据导入到es
logstash6.4.0安装 (linux):
1.下载linux版安装包,解压安装文件。
2. 在logstash解压包下创建mysql文件夹(博主是导入mysql数据库数据,所已文件夹名字为mysql) ,将mysql版本相对应驱动jar包放入mysql文件中
3.创建logstash连接mysql数据库配置文件,如下:
input { stdin { } jdbc { # 连接的数据库地址和哪一个数据库,指定编码格式,禁用SSL协议,设定自动重连 #jdbc_connection_string => "jdbc:mysql://192.168.177.128:3306/webject?characterEncoding=UTF-8&useSSL=false&autoReconnect=true" jdbc_connection_string=> "jdbc:mysql://192.168.177.128:3306/webject?allowMultiQueries=true&useAffectedRows=true&useSSL=false&serverTimezone=Asia/Shanghai&autoReconnect=true&useUnicode=true&characterEncoding=utf8" # 你的账户密码 jdbc_user => "root" jdbc_password => "123456" # 连接数据库的驱动包,建议使用绝对地址 jdbc_driver_library => "mysql/mysql-connector-java-8.0.12.jar" # 这是不用动就好 jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" #处理中文乱码问题 codec => plain { charset => "UTF-8"} #使用其它字段追踪,而不是用时间 use_column_value => true #追踪的字段 tracking_column => id record_last_run => true #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值 last_run_metadata_path => "mysql/station_parameter.txt" jdbc_default_timezone => "Asia/Shanghai" #statement_filepath => "mysql/jdbc.sql" statement => "select * from user where id > :sql_last_value" #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录 clean_run => false # 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟 schedule => "*/5 * * * *" type => "jdbc" } } filter { json { source => "message" remove_field => ["message"] } } output { elasticsearch { # 要导入到的Elasticsearch所在的主机 hosts => "192.168.177.128:9200" # 要导入到的Elasticsearch的索引的名称 index => "db_user" # 类型名称(类似数据库表名) document_type => "user" # 主键名称(类似数据库主键) document_id => "%{id}" # es 账号 user => elastic password => changeme } stdout { # JSON格式输出 codec => json_lines } }
4.启动logstash
进入bin目录 输入以下启动命令: ./logstash -f mysql/jdbc.conf
说明:
导入后,以后导入数据库会从最后的导入位置开始,博主在jdbc.conf中配置的是id,所以下一次导入会从最后的一个id开始导入,如果需要重新开始导入,则删除mysql目录下的station_parameter.txt文件即可。
不同版本logstash连接数据的配置文件的格式可能有所区别。