调试了一天了,无法解决rabbitmq的广播模式,两个消费者,一条A消费者接受到,一条B消费者接收到,不能同时接收到,必须是同一个queue,求大神解答,代码如下:
rabbitTemplate 和 cachingConnectionFactory工厂连接类
@Configuration
@EnableRabbit
public class RabbitConfiguration { @Autowired
RabbitProperties properties;
@Bean
public CachingConnectionFactory cachingConnectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses(properties.getAddresses());
factory.setUsername(properties.getUsername());
factory.setPassword(properties.getPassword());
factory.setVirtualHost(properties.getVirtualHost());
factory.setPublisherConfirms(true);
return factory;
} @Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(cachingConnectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){
RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
}queue exchange listener 声明类
@Configuration("syncRabbitConfiguration")
@EnableRabbit
public class RabbitConfiguration {
public static final String QUEUE = "external_socket.request";
public static final String EXCHANGE = "external_socket.request";
@Autowired
CachingConnectionFactory cachingConnectionFactory; @Autowired
RabbitAdmin rabbitAdmin; @Autowired
SocketConsumer consumer;
@Bean
public FanoutExchange socketExchange(){
return new FanoutExchange(EXCHANGE);
}
@Bean
public Queue socketQueue(){
return new Queue(QUEUE);
} @Bean
public Binding socketBinding(Queue socketQueue,FanoutExchange socketExchange){
return BindingBuilder.bind(socketQueue).to(socketExchange);
} @Bean
public SimpleMessageListenerContainer smsContainer(Queue socketQueue){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(cachingConnectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.NONE);
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setRabbitAdmin(rabbitAdmin);
container.setQueues(socketQueue);
container.setMessageListener(consumer);
container.setRecoveryInterval(MQConstant.Rabbit.DEFAULT_RECOVERY_INTERVAL);
return container;
}}
其中container.setAcknowledgeMode(AcknowledgeMode.NONE);
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
这几个参数我都调整过,没用,下面是消费者代码@Service
public class SocketConsumer implements ChannelAwareMessageListener { @Override
public void onMessage(Message message, Channel channel) throws Exception {
String body = null;
try {
body = new String(message.getBody(),"UTF-8");
SimpleDataDto dto = JSONObject.parseObject(body, SimpleDataDto.class);
System.out.println(dto.getSyncType()+":收到消息");
if(dto.getSyncType() == null){
List<SocketIOClient> allClient = SessionManager.getAllClient();
for (SocketIOClient socketIOClient : allClient) {
socketIOClient.sendEvent(dto.getDataType(),dto.getData());
}
}else{
SocketIOClient client = SessionManager.getClient(dto.getSyncType());
if(client !=null)
client.sendEvent(dto.getDataType(),dto.getData());
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}catch (Exception e){
e.printStackTrace();
} }
}发送代码就一行
rabbitTemplate.convertAndSend(RabbitConfiguration.EXCHANGE,"",dataDto);
通过自带的web管理段发布消息也是一样的效果
下面是管理端的截图exchange
queue实在找不到办法了,求解决啊!!!!!!!!!!
rabbitTemplate 和 cachingConnectionFactory工厂连接类
@Configuration
@EnableRabbit
public class RabbitConfiguration { @Autowired
RabbitProperties properties;
@Bean
public CachingConnectionFactory cachingConnectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses(properties.getAddresses());
factory.setUsername(properties.getUsername());
factory.setPassword(properties.getPassword());
factory.setVirtualHost(properties.getVirtualHost());
factory.setPublisherConfirms(true);
return factory;
} @Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(cachingConnectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){
RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
}queue exchange listener 声明类
@Configuration("syncRabbitConfiguration")
@EnableRabbit
public class RabbitConfiguration {
public static final String QUEUE = "external_socket.request";
public static final String EXCHANGE = "external_socket.request";
@Autowired
CachingConnectionFactory cachingConnectionFactory; @Autowired
RabbitAdmin rabbitAdmin; @Autowired
SocketConsumer consumer;
@Bean
public FanoutExchange socketExchange(){
return new FanoutExchange(EXCHANGE);
}
@Bean
public Queue socketQueue(){
return new Queue(QUEUE);
} @Bean
public Binding socketBinding(Queue socketQueue,FanoutExchange socketExchange){
return BindingBuilder.bind(socketQueue).to(socketExchange);
} @Bean
public SimpleMessageListenerContainer smsContainer(Queue socketQueue){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(cachingConnectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.NONE);
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setRabbitAdmin(rabbitAdmin);
container.setQueues(socketQueue);
container.setMessageListener(consumer);
container.setRecoveryInterval(MQConstant.Rabbit.DEFAULT_RECOVERY_INTERVAL);
return container;
}}
其中container.setAcknowledgeMode(AcknowledgeMode.NONE);
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
这几个参数我都调整过,没用,下面是消费者代码@Service
public class SocketConsumer implements ChannelAwareMessageListener { @Override
public void onMessage(Message message, Channel channel) throws Exception {
String body = null;
try {
body = new String(message.getBody(),"UTF-8");
SimpleDataDto dto = JSONObject.parseObject(body, SimpleDataDto.class);
System.out.println(dto.getSyncType()+":收到消息");
if(dto.getSyncType() == null){
List<SocketIOClient> allClient = SessionManager.getAllClient();
for (SocketIOClient socketIOClient : allClient) {
socketIOClient.sendEvent(dto.getDataType(),dto.getData());
}
}else{
SocketIOClient client = SessionManager.getClient(dto.getSyncType());
if(client !=null)
client.sendEvent(dto.getDataType(),dto.getData());
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}catch (Exception e){
e.printStackTrace();
} }
}发送代码就一行
rabbitTemplate.convertAndSend(RabbitConfiguration.EXCHANGE,"",dataDto);
通过自带的web管理段发布消息也是一样的效果
下面是管理端的截图exchange
queue实在找不到办法了,求解决啊!!!!!!!!!!
解决方案 »
免费领取超大流量手机卡,每月29元包185G流量+100分钟通话, 中国电信官方发货