生产者消费者导入MQ的依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
virtual-host: myHost

RabbitMQ的配置信息
两个消息队列 分别是数据增改 和 删除的队列

创建主题(topic)类型的交换机,并绑定刚刚创建的两个消息队列,并分别设置相应的key,消费者可以通过不同的key来判断该消费那一条消息队列的数据。

生产者的配置
所谓的生产者就是我们数据库的服务方,当我们对数据库的数据进行增删改的时候,我们应该像消息队列发送消息来通知ES我们进行了增删改操作,以便ES进行数据的同步。

/**
* RabbitMQ的配置
*/
@Configuration
public class RabbitMQConfig {

public static final String QUEUE_COURSE_SAVE = "queue.course.save";
public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
public static final String KEY_COURSE_SAVE = "key.course.save";
public static final String KEY_COURSE_REMOVE = "key.course.remove";
public static final String COURSE_EXCHANGE = "edu.course.exchange";

@Bean
public Queue queueCourseSave() {
return new Queue(QUEUE_COURSE_SAVE);
}

@Bean
public Queue queueCourseRemove() {
return new Queue(QUEUE_COURSE_REMOVE);
}

@Bean
public TopicExchange topicExchange() {
return new TopicExchange(COURSE_EXCHANGE);
}

@Bean
public Binding bindCourseSave() {
return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);
}

@Bean
public Binding bindCourseRemove() {
return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);

 

生产者控制层
生产者发送消息的主要方法

@Autowired
RabbitTemplate rabbitTemplate;
rabbitTemplate.convertAndSend(交换机的名称,消息的key,消息内容);

 

@Slf4j
@RestController
public class CourseController {

@Autowired
private ICourseService courseService;

@Autowired
RabbitTemplate rabbitTemplate;


@PostMapping("/course-upload")
public ResponseEntity<String> upload(MultipartFile file) throws IOException {
//创建文件输入流
InputStream inputStream = file.getInputStream();
//获得文件名
String filename = file.getOriginalFilename();
//调用文件上传方法
OSSUtil.upload(inputStream,filename);
//回调上传的文件
String url = OSSUtil.getURL(filename);
//返回前端
return ResponseEntity.ok(url);
}

@GetMapping("/courses")
public ResponseEntity<Page<Course>> findAllPage(@RequestParam("current") Integer current, @RequestParam("PAGE_SIZE") Integer PAGE_SIZE){
Page<Course> page = new Page<>(current,PAGE_SIZE);
return ResponseEntity.ok(courseService.page(page));
}

@GetMapping("/course")
public ResponseEntity<Course> findOne(@RequestParam("id") Integer id){
return ResponseEntity.ok(courseService.findOne(id));
}

@PostMapping("/course")
public ResponseEntity<String> add(@RequestBody Course course){
courseService.save(course);
rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_SAVE, JSON.toJSONString(course));
return ResponseEntity.ok("ok");
}

@PutMapping("/course")
public ResponseEntity<String> modify(@RequestBody Course course){
courseService.updateById(course);
rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_SAVE,JSON.toJSONString(course));
return ResponseEntity.ok("ok");
}

@DeleteMapping("/course/{id}")
public ResponseEntity<String> deleteProductHandovers(@PathVariable("id") Integer id){
courseService.removeById(id);
rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_REMOVE,id);
return ResponseEntity.ok("ok");
}
}

消费者对消息队列进行监听
所谓的消费者就是ES服务的操作方,通过实时的对消息队列的监听,通过消息队列对应的key值来进行选择服务的调用,不同的key调用不同的服务,获取服务方传输的数据,然后进行数据的同步。

@Slf4j
@Component
public class CourseMQListener {

public static final String QUEUE_COURSE_SAVE = "queue.course.save";
public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
public static final String KEY_COURSE_SAVE = "key.course.save";
public static final String KEY_COURSE_REMOVE = "key.course.remove";
public static final String COURSE_EXCHANGE = "course.exchange";

@Autowired
ICourseService courseService;

/**
* 监听课程添加操作
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),
exchange = @Exchange(value = COURSE_EXCHANGE,
type = ExchangeTypes.TOPIC,
ignoreDeclarationExceptions = "true")
, key = KEY_COURSE_SAVE)})
public void receiveCourseSaveMessage(String message) {
try {
log.info("课程添加:{}",message);
Course course = JSON.parseObject(message,Course.class);
//将消息转为课程,保存到es中
courseService.saveOrUpdate(course);
log.info("添加完成:{}",course);
} catch (Exception ex) {
ex.printStackTrace();
}
}

/**
* 监听课程删除操作
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),
exchange = @Exchange(value = COURSE_EXCHANGE,
type = ExchangeTypes.TOPIC,
ignoreDeclarationExceptions = "true")
, key = KEY_COURSE_REMOVE)})
public void receiveCourseDeleteMessage(Long id) {
try {
courseService.removeById(id);
log.info("课程删除完成:{}",id);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}



本期内容就到这里啦~以上内容均可在 方包博客http://fang1688.cn 网站直接搜索名称访问哦。欢迎感兴趣的小伙伴试试,如果本文对您有帮助,也请帮忙点个 赞 + 在看 啦!❤️

欢迎大家加入方包的优派编程学习圈子,和多名小伙伴们一起交流学习,向方包 1 对 1 提问、跟着方包做项目、领取大量编程资源等。Q群763256989欢迎想一起学习进步的小伙伴~

另外方包最近开发了一款工具类的小程序方包工具箱」,功能包括:抖音、小红书、快手去水印,天气预报,小说在线免费阅读(内含上万部热门小说),历史今天,生成图片二维码,图片识别文字,ai伪原创文章,数字摇号抽奖,文字转语音MP3功能...

送福利!关注下方的公众号:优派编程回复资料,即可获得软件app下载资源和python、java等编程学习资料!

   
点击卡片关注「优派编程」
定期分享 it编程干货

 ⬇️ 点击链接阅读原文直达 方包博客

1 对 “利用RabbitMQ实现mysql与ElasticSearch的数据同步”的想法;

发表评论

您的电子邮箱地址不会被公开。

+ 43 = 49