这篇文章主要介绍“Elasticsearch和MySQL之间的数据同步问题怎么解决”,在日常操作中,相信很多人在Elasticsearch和MySQL之间的数据同步问题怎么解决问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Elasticsearch和MySQL之间的数据同步问题怎么解决”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
Elasticsearch中的数据是来自于Mysql数据库的,因此当数据库中的数据进行增删改后,Elasticsearch中的数据,索引也必须跟着做出改变。而对于管理服务(MySQL)和搜索服务(Elasticsearch)往往会在不同的微服务上。
可以通过微服务之间的同步调用来解决数据同步问题,虽然实现起来比较简单,但是在搜索服务中引入管理服务时,业务的耦合度相对来说是比较高的。因此可以通过消息队列的形式来异步通知管理服务的改变。这样做的有优点是耦合度较低,但是依赖于消息队列的耦合度。
首先在两块微服务中引入RabbitMQ的依赖:
引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
创建常量类,将交换机和队列名称设置为常量,使用时方便取名。
public class MqConstants { /** * 交换机名称 */ public final static String HOTEL_EXCHANGE = "hotel.topic"; /** * 监听新增和修改的队列 */ public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue"; /** * 监听删除的队列 */ public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue"; /** * 新增或修改的路由键 */ public final static String HOTEL_INSERT_KEY = "hotel.insert"; /** * 删除的路由键 */ public final static String HOTEL_DELETE_KEY = "hotel.delete"; }
创建配置类,声明交换机,并将路由键,队列与交换机之间互相绑定:
@Configuration public class MqConfig { @Bean public TopicExchange topicExchange(){ return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false); } @Bean public Queue insertQueue(){ return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true); } @Bean public Queue deleteQueue(){ return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true); } @Bean public Binding insertQueueBinding(){ return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY); } @Bean public Binding deleteQueueBinding(){ return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY); } }
编写controller层,将酒店数据保存到数据库中,并将消息发送到消息队列里:
@Autowired private IHotelService hotelService; @Autowired private RabbitTemplate rabbitTemplate; @PutMapping public void save(@RequestBody Hotel hotel){ hotelService.save(hotel); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId()); } @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id){ hotelService.removeById(id); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id); }
在service中加入两个方法,根据id增加和根据id删除:
void insertById(Long id); void deleteById(Long id);
利用Ctrl+Alt+B的快捷键,在service的实现类里面重写方法进行实现:
@Override public void insertById(Long id) { try { Hotel hotel = getById(id); HotelDoc hotelDoc = new HotelDoc(hotel); // 1.准备Request IndexRequest request = new IndexRequest("hotel").id(hotelDoc.getId().toString()); // 2.准备请求参数DSL,其实就是文档的JSON字符串 request.source(JSON.toJSONString(hotelDoc), XContentType.JSON); // 3.发送请求 client.index(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } } @Override public void deleteById(Long id) { try { DeleteRequest request = new DeleteRequest("hotel", id.toString()); // 2.发送请求 client.delete(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } }
这样即可完成Elasticsearch和MySQL之间的数据同步。