RabbitMq学习——Springboot整合rabbitmq之fanout广播配置(三)

一、简介

fanout配置是一种不用配置routingkey的一中交换机形式。
只要队列和指定的交换机进行了绑定配置,当交换机收到生产者推送来的消息后,就会将消息广播至已绑定好的队列中。
流程如图所示:
在这里插入图片描述
在这里插入图片描述

二、fanout交换机的配置

2.1、配置文件编写前的说明

在这里插入图片描述
如上图所示,我们需要配置一个交换机,两个消息队列,并与指定的交换机进行绑定操作。

2.2、配置文件的编写

2.2.1、bean的配置

我们给交换机起名:fanoutExchange
给队列起名:fanoutQueue1fanoutQueue2


分别向servlet容器中注入交换机exchange的实例化bean、两个queue的实例化bean、以及两个消息队列分别和这个交换机绑定的bean配置。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 广播模式的交换机和队列配置
 * @author 7651
 *
 */
@Configuration
public class FanoutExchangeRabbitMqConfig {
	private static Logger log = LoggerFactory.getLogger(FanoutExchangeRabbitMqConfig.class);
	
	/**
	 * 申明配置交换机
	 * @return
	 */
	@Bean(name="getFanoutExchange")
	public FanoutExchange getFanoutExchange(){
		return new FanoutExchange("fanoutExchange", true, false);
	}
	
	/**
	 * 申明队列1
	 * @return
	 */
	@Bean(name="getFanoutQueue1")
	public Queue getFanoutQueue1(){
		//name, durable, exclusive, autoDelete, arguments
		//name, durable, exclusive, autoDelete
		return new Queue("fanoutQueue1", true, false, false);
	}
	
	/**
	 * 申明队列2
	 * @return
	 */
	@Bean(name="getFanoutQueue2")
	public Queue getFanoutQueue2(){
		//name, durable, exclusive, autoDelete, arguments
		//name, durable, exclusive, autoDelete
		return new Queue("fanoutQueue2", true, false, false);
	}
	
	/**
	 * 绑定队列1
	 * @return
	 */
	@Bean
	public Binding bindFanoutExchangeAndQueue1(@Qualifier(value="getFanoutExchange") FanoutExchange getFanoutExchange,
			@Qualifier(value="getFanoutQueue1") Queue getFanoutQueue1){
		return BindingBuilder.bind(getFanoutQueue1).to(getFanoutExchange);
	}
	/**
	 * 绑定队列2
	 * @return
	 */
	@Bean
	public Binding bindFanoutExchangeAndQueue2(@Qualifier(value="getFanoutExchange") FanoutExchange getFanoutExchange,
			@Qualifier(value="getFanoutQueue2") Queue getFanoutQueue2){
		return BindingBuilder.bind(getFanoutQueue2).to(getFanoutExchange);
	}
}

2.2.2、消息发送

这里沿用上一讲中的消息发送方法。

public interface IMessageServcie {
	public void sendMessage(String exchange,String routingKey,String msg);
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import cn.linkpower.service.IMessageServcie;

@Component
public class MessageServiceImpl implements IMessageServcie,ConfirmCallback,ReturnCallback {
	
	private static Logger log = LoggerFactory.getLogger(MessageServiceImpl.class);
	
	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	@Override
	public void sendMessage(String exchange,String routingKey,String msg) {
		//消息发送失败返回到队列中, yml需要配置 publisher-returns: true
		rabbitTemplate.setMandatory(true);
		//消息消费者确认收到消息后,手动ack回执
		rabbitTemplate.setConfirmCallback(this);
		rabbitTemplate.setReturnCallback(this);
		
		//发送消息
		rabbitTemplate.convertAndSend(exchange,routingKey,msg);
	}

	@Override
	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
		log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" ");
	}
	
	@Override
	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
		log.info("---- confirm ----");
		if(ack){
			log.info("---- confirm ----ack==true  cause="+cause);
		}else{
			log.info("---- confirm ----ack==false  cause="+cause);
		}
	}

}

2.2.3、消息的发送控制类

由于我们的消息生产者生产消息,发送队列之前,是先将消息推送至消息转发器,然后由消息转发器广播消息至每个绑定的消息队列中。

所以,我们只需要将消息发送至指定的exchange即可。

	@Autowired
	private IMessageServcie messageServiceImpl;

	@RequestMapping("/sendFanoutMsg")
	@ResponseBody
	public String sendFanoutMsg(HttpServletRequest request){
		String msg = request.getParameter("msg");
		messageServiceImpl.sendMessage("fanoutExchange", "", msg);
		return "sendFanoutMsg";
	}

2.2.4、消息消费者

1、配置好了交换机,消息队列以及将消息队列和消息转发器事项了绑定操作。
2、配置好了消息生产者发送消息至指定的消息转发器上。

所以此处我们需要配置消息消费者消费rabbitmq中的消息信息。
我们上面配置的是两个消息队列,所以我们需要配置两个消费者bean ,分别监听对应的队列即可。

消费者一:@RabbitListener(queues=“fanoutQueue1”)

import java.io.IOException;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Component
@RabbitListener(queues="fanoutQueue1")
public class FanoutMsgConsumerQueue1 {
	
	@RabbitHandler
	public void process(String msg,Channel channel, Message message) throws IOException {
		//拿到消息延迟消费
		try {
			Thread.sleep(1000*8);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		
		try {
			//正确执行  手动ack
			//假设收到消息失败呢?  假定收到消息是 message 表示失败
			if("message".equalsIgnoreCase(msg)){
				/*channel.basicNack(message.getMessageProperties().getDeliveryTag(),
						false, false);*/
				channel.basicReject(message.getMessageProperties().getDeliveryTag(),
						true);
				System.err.println("get fanout msg1 failed msg = "+msg);
			}else{
				channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
				System.out.println("get fanout msg1 success msg = "+msg);
			}
			
		} catch (Exception e) {
			//消费者处理出了问题,需要告诉队列信息消费失败
			channel.basicNack(message.getMessageProperties().getDeliveryTag(),
					false, false);
			System.err.println("get fanout msg1 failed msg = "+msg);
		}
		
		
	}
}

消费者二:@RabbitListener(queues=“fanoutQueue2”)

import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Component
@RabbitListener(queues="fanoutQueue2")
public class FanoutMsgConsumerQueue2 {
	
	@RabbitHandler
	public void process(String msg,Channel channel, Message message) throws IOException {
		//拿到消息延迟消费
		try {
			Thread.sleep(1000*8);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		
		try {
			//正确执行  手动ack
			//假设收到消息失败呢?  假定收到消息是 message 表示失败
			if("message".equalsIgnoreCase(msg)){
				/*channel.basicNack(message.getMessageProperties().getDeliveryTag(),
						false, false);*/
				channel.basicReject(message.getMessageProperties().getDeliveryTag(),
						true);
				System.err.println("get fanout msg2 failed msg = "+msg);
			}else{
				channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
				System.out.println("get fanout msg2 success msg = "+msg);
			}
			
		} catch (Exception e) {
			//消费者处理出了问题,需要告诉队列信息消费失败
			channel.basicNack(message.getMessageProperties().getDeliveryTag(),
					false, false);
			System.err.println("get fanout msg2 failed msg = "+msg);
		}
		
		
	}
}

2.3、测试

在这里插入图片描述

2019-12-24 15:49:20.268 ---- [AMQP Connection 127.0.0.1:5672] ---- INFO  cn.linkpower.service.impl.MessageServiceImpl - ---- confirm ----
2019-12-24 15:49:20.269 ---- [AMQP Connection 127.0.0.1:5672] ---- INFO  cn.linkpower.service.impl.MessageServiceImpl - ---- confirm ----ack==true  cause=null
get fanout msg1 success msg = 111112233
get fanout msg2 success msg = 111112233

标签