RabbitMq学习——Springboot整合rabbitmq之fanout广播配置(三)
一、简介
fanout配置是一种不用配置routingkey的一中交换机形式。
只要队列和指定的交换机进行了绑定配置,当交换机收到生产者推送来的消息后,就会将消息广播至已绑定好的队列中。
流程如图所示:
二、fanout交换机的配置
2.1、配置文件编写前的说明
如上图所示,我们需要配置一个交换机,两个消息队列,并与指定的交换机进行绑定操作。
2.2、配置文件的编写
2.2.1、bean的配置
我们给交换机起名:fanoutExchange。
给队列起名:fanoutQueue1、fanoutQueue2。
分别向servlet容器中注入交换机exchange的实例化bean、两个queue的实例化bean、以及两个消息队列分别和这个交换机绑定的bean配置。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 广播模式的交换机和队列配置
* @author 7651
*
*/
@Configuration
public class FanoutExchangeRabbitMqConfig {
private static Logger log = LoggerFactory.getLogger(FanoutExchangeRabbitMqConfig.class);
/**
* 申明配置交换机
* @return
*/
@Bean(name="getFanoutExchange")
public FanoutExchange getFanoutExchange(){
return new FanoutExchange("fanoutExchange", true, false);
}
/**
* 申明队列1
* @return
*/
@Bean(name="getFanoutQueue1")
public Queue getFanoutQueue1(){
//name, durable, exclusive, autoDelete, arguments
//name, durable, exclusive, autoDelete
return new Queue("fanoutQueue1", true, false, false);
}
/**
* 申明队列2
* @return
*/
@Bean(name="getFanoutQueue2")
public Queue getFanoutQueue2(){
//name, durable, exclusive, autoDelete, arguments
//name, durable, exclusive, autoDelete
return new Queue("fanoutQueue2", true, false, false);
}
/**
* 绑定队列1
* @return
*/
@Bean
public Binding bindFanoutExchangeAndQueue1(@Qualifier(value="getFanoutExchange") FanoutExchange getFanoutExchange,
@Qualifier(value="getFanoutQueue1") Queue getFanoutQueue1){
return BindingBuilder.bind(getFanoutQueue1).to(getFanoutExchange);
}
/**
* 绑定队列2
* @return
*/
@Bean
public Binding bindFanoutExchangeAndQueue2(@Qualifier(value="getFanoutExchange") FanoutExchange getFanoutExchange,
@Qualifier(value="getFanoutQueue2") Queue getFanoutQueue2){
return BindingBuilder.bind(getFanoutQueue2).to(getFanoutExchange);
}
}
2.2.2、消息发送
这里沿用上一讲中的消息发送方法。
public interface IMessageServcie {
public void sendMessage(String exchange,String routingKey,String msg);
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import cn.linkpower.service.IMessageServcie;
@Component
public class MessageServiceImpl implements IMessageServcie,ConfirmCallback,ReturnCallback {
private static Logger log = LoggerFactory.getLogger(MessageServiceImpl.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendMessage(String exchange,String routingKey,String msg) {
//消息发送失败返回到队列中, yml需要配置 publisher-returns: true
rabbitTemplate.setMandatory(true);
//消息消费者确认收到消息后,手动ack回执
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
//发送消息
rabbitTemplate.convertAndSend(exchange,routingKey,msg);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" ");
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("---- confirm ----");
if(ack){
log.info("---- confirm ----ack==true cause="+cause);
}else{
log.info("---- confirm ----ack==false cause="+cause);
}
}
}
2.2.3、消息的发送控制类
由于我们的消息生产者生产消息,发送队列之前,是先将消息推送至消息转发器,然后由消息转发器广播消息至每个绑定的消息队列中。
所以,我们只需要将消息发送至指定的exchange即可。
@Autowired
private IMessageServcie messageServiceImpl;
@RequestMapping("/sendFanoutMsg")
@ResponseBody
public String sendFanoutMsg(HttpServletRequest request){
String msg = request.getParameter("msg");
messageServiceImpl.sendMessage("fanoutExchange", "", msg);
return "sendFanoutMsg";
}
2.2.4、消息消费者
1、配置好了交换机,消息队列以及将消息队列和消息转发器事项了绑定操作。
2、配置好了消息生产者发送消息至指定的消息转发器上。
所以此处我们需要配置消息消费者消费rabbitmq中的消息信息。
我们上面配置的是两个消息队列,所以我们需要配置两个消费者bean ,分别监听对应的队列即可。
消费者一:@RabbitListener(queues=“fanoutQueue1”)
import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
@RabbitListener(queues="fanoutQueue1")
public class FanoutMsgConsumerQueue1 {
@RabbitHandler
public void process(String msg,Channel channel, Message message) throws IOException {
//拿到消息延迟消费
try {
Thread.sleep(1000*8);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//正确执行 手动ack
//假设收到消息失败呢? 假定收到消息是 message 表示失败
if("message".equalsIgnoreCase(msg)){
/*channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, false);*/
channel.basicReject(message.getMessageProperties().getDeliveryTag(),
true);
System.err.println("get fanout msg1 failed msg = "+msg);
}else{
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
System.out.println("get fanout msg1 success msg = "+msg);
}
} catch (Exception e) {
//消费者处理出了问题,需要告诉队列信息消费失败
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, false);
System.err.println("get fanout msg1 failed msg = "+msg);
}
}
}
消费者二:@RabbitListener(queues=“fanoutQueue2”)
import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
@RabbitListener(queues="fanoutQueue2")
public class FanoutMsgConsumerQueue2 {
@RabbitHandler
public void process(String msg,Channel channel, Message message) throws IOException {
//拿到消息延迟消费
try {
Thread.sleep(1000*8);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//正确执行 手动ack
//假设收到消息失败呢? 假定收到消息是 message 表示失败
if("message".equalsIgnoreCase(msg)){
/*channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, false);*/
channel.basicReject(message.getMessageProperties().getDeliveryTag(),
true);
System.err.println("get fanout msg2 failed msg = "+msg);
}else{
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
System.out.println("get fanout msg2 success msg = "+msg);
}
} catch (Exception e) {
//消费者处理出了问题,需要告诉队列信息消费失败
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, false);
System.err.println("get fanout msg2 failed msg = "+msg);
}
}
}
2.3、测试
2019-12-24 15:49:20.268 ---- [AMQP Connection 127.0.0.1:5672] ---- INFO cn.linkpower.service.impl.MessageServiceImpl - ---- confirm ----
2019-12-24 15:49:20.269 ---- [AMQP Connection 127.0.0.1:5672] ---- INFO cn.linkpower.service.impl.MessageServiceImpl - ---- confirm ----ack==true cause=null
get fanout msg1 success msg = 111112233
get fanout msg2 success msg = 111112233