【NFDM-388】新足裏 生足とストッキングの足裏 Vol.3
前边咱们先容了Spring Boot 整合 Elasticsearch 完毕数据查询检索的功能,在试验名目中,咱们的数据一般存储在数据库中,而且跟着业务的发送,数据也会随时变化。
那么奈何保证数据库中的数据与Elasticsearch存储的索引数据保捏一致呢? 最原始的决策便是:当数据发生增更正操作时同步更新Elasticsearch。然而这么的诡计耦合太高。接下来咱们先容一种特别浮浅的数据同步方式:Logstash 数据同步。
一、Logstash简介 1.什么是Logstashlogstash是一个开源的劳动器端数据处置器具。浮浅来说,便是一根具备及时数据传输智商的管说念,谨慎将数据信息从管说念的输入端传输到管说念的输出端;与此同期这根管说念还不错让你左证我方的需求在中间加上滤网,Logstash提供里好多功能远大的滤网以骄横你的各式期骗场景。
Logstash常用于日记系统中作念日记相聚栽种,最常用于ELK中行为日记相聚器使用。
2.Logstash的架构旨趣Logstash的基本过程架构:input=》 filter =》 output 。
input(输入):相聚各式方式,大小和着手数据,从各个劳动器中相聚数据。常用的有:jdbc、file、syslog、redis等。
filter(过滤器)谨慎数据处置与迂回。主淌若将event通过output发出之前对其完毕的某些处置功能。
免费视频output(输出):将咱们过滤出的数据保存到那些数据库和关系存储中,。
【NFDM-388】新足裏 生足とストッキングの足裏 Vol.3
3.Logstash奈何与Elasticsearch数据同步
试验名目中,咱们不成能通过手动添加的方式将数据插入索引库,是以需要借助第三方器具,将数据库的数据同步到索引库。此时,Logstash出现了,它不错将不同数据库的数据同步到Elasticsearch中。保证数据库与Elasticsearch的数据保捏一致。
当今救济数据库与ES数据同步的插件有好多,个东说念主以为Logstash是繁多同步mysql数据到es的插件中,最褂讪况兼最容易建树的一个。
二、装配LogstashLogstash的使用本领也很浮浅,底下老师一下,Logstash是奈何使用的。需要讲明的是:这里以windows 环境为例,演示Logstash的装配和建树。
1.下载Logstash领先,下载对应版块的Logstash包,不错通过上头提供下载elasticsearch的地址进行下载,完成后解压。
上头是Logstash解压后的目次,咱们需要关怀是bin目次中的实行文献和config中的建树文献。一般坐褥情况下,会使用Linux劳动器,况兼会将Logstash建树成自启动的劳动。这里测试的话,凯旋启动。
2.建树Logstash接下来,建树Logstash。需要咱们编写建树文献,左证官网和网上提供的建树文献,将其进行修改。
第一步:在Logstash根目次下创建mysql文献夹,添加mysql.conf建树文献,建树Logstash需要的相应信息,具体建树如下:
input { stdin { } jdbc { # mysql数据库聚会 jdbc_connection_string => "jdbc:mysql://localhost:3306/book_test?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC" # mysqly用户名和密码 jdbc_user => "root" jdbc_password => "root" # 驱动建树 jdbc_driver_library => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\mysql-connector-java-8.0.20.jar" # 驱动类名 jdbc_driver_class => "com.mysql.cj.jdbc.Driver" #jdbc_paging_enabled => "true" #jdbc_page_size => "50000" jdbc_default_timezone => "Asia/Shanghai" # 实行指定的sql文献 statement_filepath => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\sql\bookquery.sql" use_column_value => true # 是否将字段名迂回为小写,默许true(如果罕见据序列化、反序列化需求,淡薄改为false); lowercase_column_names => false # 需要纪录的字段,用于增量同步,需是数据库字段 tracking_column => updatetime # Value can be any of: numeric,timestamp,Default value is "numeric" tracking_column_type => timestamp # record_last_run前次数据存放位置; record_last_run => true #上一个sql_last_value值的存放文献旅途, 必须要在文献中指定字段的驱动值 last_run_metadata_path => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\sql\logstash_default_last_time.log" # 是否排除last_run_metadata_path的纪录,需要增量同步时此字段必须为false; clean_run => false # 设立监听 各字段含义 分 时 天 月 年 ,默许一王人为*代表含义:每分钟都更新 schedule => "* * * * *" # 索引类型 type => "id" } } output { elasticsearch { #es劳动器 hosts => ["10.2.1.231:9200"] #ES索引称号 index => "book" #自增ID document_id => "%{id}" } stdout { codec => json_lines } }
第二步:将mysql-connector-java.jar 拷贝到前边建树的目次下。上头的mysql.conf建树的是:C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\
mysql-connector-java-8.0.20.jar。那么jar包拷贝到此目次下即可:
上头是mysql的驱动,如果是sqlserver数据库,下载SqlServer对应的驱动即可。搁置的位置要与mysql.conf 建树文献中的jdbc_driver_library 地址保捏一致。
第三步:创建sql目次,创建bookquery.sql文献用于保存需要实行的sql 剧本。示例代码如下:
select * from book where updatetime >= :sql_last_value order by updatetime desc
这里使用的增量更新,是以使用:sql_last_value 纪录上一次纪录的终末技能。
3.启动Logstash投入logstash的bin目次,实行如下号令:
logstash.bat -f C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\mysql.conf
启动告捷之后,Logstash就会自动定时将数据写入到Elasticsearch。如下图所示:
同步完成后,咱们使用Postman查询Elasticsearch,考据索引是否都创建告捷。在postman中,发送 Get 央求:
http://10.2.1.231:9200/book/_search 。复返收尾如下图所示:
不错看到,数据库中的数据照旧通过Logstash同步至Elasticsearch。讲明Logstash建树告捷。
三、创建查询劳动数据同步完成后,接下来咱们使用Spring Boot 构建Elasticsearch查询劳动。领先创建Spring Boot名目并整合Elasticsearch,这个之前都照旧先容过,不澄莹的一又友不错看我之前的著作。
接下来演示奈何封装完好的数据查询劳动。
1.数据实体@Document( indexName = "book" , replicas = 0) public class Book { @Id private Long id; @Field(analyzer = "ik_max_word",type = FieldType.Text) private String bookName; @Field(analyzer = "ik_max_word",type = FieldType.Text) private String author; private float price; private int page; @Field(type = FieldType.Date,format = DateFormat.custom,pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") private Date createTime; @Field(type = FieldType.Date,format = DateFormat.custom,pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") private Date updateTime; @Field(analyzer = "ik_max_word",type = FieldType.Text) private String category; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getBookName() { return bookName; } public void setBookName(String bookName) { this.bookName = bookName; } public String getAuthor() { return author; } public void setAuthor(String author) { this.author = author; } public float getPrice() { return price; } public void setPrice(float price) { this.price = price; } public int getPage() { return page; } public void setPage(int page) { this.page = page; } public String getCategory() { return category; } public void setCategory(String category) { this.category = category; } public Book(){ } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } }2.央求封装类
public class BookQuery { public String category; public String bookName; public String author; public int priceMin; public int priceMax; public int pageMin; public int pageMax; public String sort; public String sortType; public int page; public int limit; }3.创建Controller放手器
@RestController public class ElasticSearchController { @Autowired private ElasticsearchRestTemplate elasticsearchRestTemplate; /** * 查询信息 * @param * @return */ @PostMapping(value = "/book/query") public JSONResult query(@RequestBody BookQuery bookQuery){ Query query= getQueryBuilder(bookQuery); SearchHits<Book> searchHits = elasticsearchRestTemplate.search(query, Book.class); List<SearchHit<Book>> result = searchHits.getSearchHits(); return JSONResult.ok(result); } public Query getQueryBuilder(BookQuery query) { BoolQueryBuilder builder = boolQuery(); // 匹配器 腌臜查询部分,分析器使用ik (ik_max_word) List<QueryBuilder> must = builder.must(); if (query.getBookName()!=null && !query.getBookName().isEmpty()) must.add(wildcardQuery("bookName", "*" +query.getBookName()+ "*")); if (query.getCategory()!=null && !query.getCategory().isEmpty()) must.add(wildcardQuery("category", "*" +query.getCategory()+ "*")); if (query.getAuthor()!=null && !query.getAuthor().isEmpty()) must.add(wildcardQuery("author", "*" +query.getAuthor()+ "*")); // 筛选器 精准查询部分 List<QueryBuilder> filter = builder.filter(); // 限度查询 if (query.getPriceMin()>0 && query.getPriceMax()>0) { RangeQueryBuilder price = rangeQuery("price").gte(query.getPriceMin()).lte(query.getPriceMax()); filter.add(price); } // 限度查询 if (query.getPageMin()>0 && query.getPageMax()>0) { RangeQueryBuilder page = rangeQuery("page").gte(query.getPageMin()).lte(query.getPageMax()); filter.add(page); } // 分页 PageRequest pageable = PageRequest.of(query.getPage() - 1, query.getLimit()); // 排序 SortBuilder sort = SortBuilders.fieldSort("price").order(SortOrder.DESC); //设立高亮遵守 String preTag = "<font color='#dd4b39'>";//google的色值 String postTag = "</font>"; HighlightBuilder.Field highlightFields = new HighlightBuilder.Field("category").preTags(preTag).postTags(postTag); Query searchQuery = new NativeSearchQueryBuilder() .withQuery(builder) .withHighlightFields(highlightFields) .withPageable(pageable) .withSort(sort) .build(); return searchQuery; } }4.测西席证
启动名目,在Postman中,央求
http://localhost:8080/book/query 接口查询竹帛信息数据。稽查接口复返情况。
咱们看到接口告捷复返数据。讲明数据查询劳动创建告捷。
终末
以上,咱们就把使用Spring Boot + Elasticsearch + Logstash 完毕完好的数据查询检索劳动先容完毕。