如何利用Rabbitmq来创建连接池,满足高并发情况下的连接和释放

解决方案 »

  1.   

    基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池)参考下
      

  2.   

    正好前一段时间研究过 RabbitMQ 集成 Spring,分享一下自己的经验,希望对你有帮助
    我开始使用原生的API 集成 RabbitMQ,代码如下public class RabbitMQDirectRouteInfoConsumer {
        private static final String QUEUE_NAME = "log.info.queue";    public static void main(String[] argv) throws Exception {
            Connection connection = RabbitMQFactory.getFactory().newConnection();
            final Channel channel = connection.createChannel();
            channel.exchangeDeclare(RabbitMQDirectRouteProducer.EXCHANGE_NAME, "direct", true);
            Map args = new HashMap();
            args.put("x-message-ttl", 60000);
            args.put("x-dead-letter-exchange", RabbitMQDirectRouteProducer.DEAD_LETTER_EXCHANGE);
            args.put("x-dead-letter-routing-key", RabbitMQDirectRouteProducer.DEAD_LETTER_ROUTE_KEY);
            channel.queueDeclare(QUEUE_NAME, true, false, false, args);
            channel.queueBind(QUEUE_NAME, RabbitMQDirectRouteProducer.EXCHANGE_NAME, "info");
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(new RabbitMQDirectRouteInfoConsumer().getClass().getCanonicalName() + " Received '" + envelope.getRoutingKey() + "':'" + message + "'" + " at " + System.currentTimeMillis());
                    channel.basicReject(envelope.getDeliveryTag(), true);
    //                channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }我也考虑到了消费速度问题,考虑接入多线程,看代码只能在 Consumer 应答的时候接入,并且需要把 channel 传入到异步线程里面,这样增加了状态的维护成本同时也给一起异常情况对于消息的reject造成功能,所以最后我引入了spring-messaging 很方便的解决了这个问题。大致代码如下@Bean
        SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                                 MessageListenerAdapter listenerAdapter) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConcurrentConsumers(5);
            container.setMaxConcurrentConsumers(10);
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(queueName);
            container.setMessageListener(listenerAdapter);
            return container;
        }
    其原理也是参考了 JAVA concurrent 包下面线程池的设计,只是这里的每一个线程都是一个 Consumer 每一个 Comsumer 对应一个 RabbitMQ 的链接,这样他们的应答的回复都不会互相干扰,同时 Spring Messaging 内部维护了线程池的数量,可以使用 setConcurrentConsumers 和setMaxConcurrentConsumers  直接 配置。具体的官网代码如下,希望对你有帮助。
    https://spring.io/guides/gs/messaging-rabbitmq/小编为订阅号「码匠笔记」号主,先后就职于 ThoughtWorks、阿里巴巴等互联网公司的经验分享,包含但不限于 JAVA、并发编程、性能优化、架构设计、小程序、开源软件等。有兴趣可以关注一波,一起学习、讨论。