Rabbitmq线程池创建 如何利用Rabbitmq来创建连接池,满足高并发情况下的连接和释放 解决方案 » 免费领取超大流量手机卡,每月29元包185G流量+100分钟通话, 中国电信官方发货 基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池)参考下 正好前一段时间研究过 RabbitMQ 集成 Spring,分享一下自己的经验,希望对你有帮助我开始使用原生的API 集成 RabbitMQ,代码如下public class RabbitMQDirectRouteInfoConsumer { private static final String QUEUE_NAME = "log.info.queue"; public static void main(String[] argv) throws Exception { Connection connection = RabbitMQFactory.getFactory().newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(RabbitMQDirectRouteProducer.EXCHANGE_NAME, "direct", true); Map args = new HashMap(); args.put("x-message-ttl", 60000); args.put("x-dead-letter-exchange", RabbitMQDirectRouteProducer.DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key", RabbitMQDirectRouteProducer.DEAD_LETTER_ROUTE_KEY); channel.queueDeclare(QUEUE_NAME, true, false, false, args); channel.queueBind(QUEUE_NAME, RabbitMQDirectRouteProducer.EXCHANGE_NAME, "info"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(new RabbitMQDirectRouteInfoConsumer().getClass().getCanonicalName() + " Received '" + envelope.getRoutingKey() + "':'" + message + "'" + " at " + System.currentTimeMillis()); channel.basicReject(envelope.getDeliveryTag(), true);// channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, false, consumer); }}我也考虑到了消费速度问题,考虑接入多线程,看代码只能在 Consumer 应答的时候接入,并且需要把 channel 传入到异步线程里面,这样增加了状态的维护成本同时也给一起异常情况对于消息的reject造成功能,所以最后我引入了spring-messaging 很方便的解决了这个问题。大致代码如下@Bean SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConcurrentConsumers(5); container.setMaxConcurrentConsumers(10); container.setConnectionFactory(connectionFactory); container.setQueueNames(queueName); container.setMessageListener(listenerAdapter); return container; }其原理也是参考了 JAVA concurrent 包下面线程池的设计,只是这里的每一个线程都是一个 Consumer 每一个 Comsumer 对应一个 RabbitMQ 的链接,这样他们的应答的回复都不会互相干扰,同时 Spring Messaging 内部维护了线程池的数量,可以使用 setConcurrentConsumers 和setMaxConcurrentConsumers 直接 配置。具体的官网代码如下,希望对你有帮助。https://spring.io/guides/gs/messaging-rabbitmq/小编为订阅号「码匠笔记」号主,先后就职于 ThoughtWorks、阿里巴巴等互联网公司的经验分享,包含但不限于 JAVA、并发编程、性能优化、架构设计、小程序、开源软件等。有兴趣可以关注一波,一起学习、讨论。 请问免费类库靠什么盈利? javascript 获取表格元素值 javaEE 域名问题 SSH开发中有哪些适合做报表的组件 列表自动静态刷新 请求帮助:hibernate 如何设置子类条件进行查询 <c:foreach>后页面仍然显示的是标签代码而不是数值 在线等 如何使用自己定义的wsdl文件,谢谢 EJB本地接口的客户端,无法找到JNDI对象! 关于maven elcipse tomcat配置问题 数据结构处理 Web项目同步云服务器问题
我开始使用原生的API 集成 RabbitMQ,代码如下public class RabbitMQDirectRouteInfoConsumer {
private static final String QUEUE_NAME = "log.info.queue"; public static void main(String[] argv) throws Exception {
Connection connection = RabbitMQFactory.getFactory().newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(RabbitMQDirectRouteProducer.EXCHANGE_NAME, "direct", true);
Map args = new HashMap();
args.put("x-message-ttl", 60000);
args.put("x-dead-letter-exchange", RabbitMQDirectRouteProducer.DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key", RabbitMQDirectRouteProducer.DEAD_LETTER_ROUTE_KEY);
channel.queueDeclare(QUEUE_NAME, true, false, false, args);
channel.queueBind(QUEUE_NAME, RabbitMQDirectRouteProducer.EXCHANGE_NAME, "info");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(new RabbitMQDirectRouteInfoConsumer().getClass().getCanonicalName() + " Received '" + envelope.getRoutingKey() + "':'" + message + "'" + " at " + System.currentTimeMillis());
channel.basicReject(envelope.getDeliveryTag(), true);
// channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}我也考虑到了消费速度问题,考虑接入多线程,看代码只能在 Consumer 应答的时候接入,并且需要把 channel 传入到异步线程里面,这样增加了状态的维护成本同时也给一起异常情况对于消息的reject造成功能,所以最后我引入了spring-messaging 很方便的解决了这个问题。大致代码如下@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConcurrentConsumers(5);
container.setMaxConcurrentConsumers(10);
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
其原理也是参考了 JAVA concurrent 包下面线程池的设计,只是这里的每一个线程都是一个 Consumer 每一个 Comsumer 对应一个 RabbitMQ 的链接,这样他们的应答的回复都不会互相干扰,同时 Spring Messaging 内部维护了线程池的数量,可以使用 setConcurrentConsumers 和setMaxConcurrentConsumers 直接 配置。具体的官网代码如下,希望对你有帮助。
https://spring.io/guides/gs/messaging-rabbitmq/小编为订阅号「码匠笔记」号主,先后就职于 ThoughtWorks、阿里巴巴等互联网公司的经验分享,包含但不限于 JAVA、并发编程、性能优化、架构设计、小程序、开源软件等。有兴趣可以关注一波,一起学习、讨论。