rabbitmq基本操作


rabbitmq基本操作

这次把一些可能实际工作中遇到的问题整理(或许以后会遇到更多问题),
翻出之前的笔记,之前在学习的时候记录了笔记,但是也没有整理,知识也没有体系.所以这只是老笔记的整理.

1. rabbitmq基本概念

安装:各个系统不同,网上可以轻松找到自己对应的系统.

1.1 四个核心概念

用一张图可以很好的表示
XBn2Pf.png
生产者:生产者生产的数据与交换机为一对一关系
交换机:交换机与队列为一对多关系
队列:一个队列只能对应一个消费者
消费者:绑定多个消费者消息也只能被其中一名消费者获取

XBnIqs.png

当然如果没有使用过mq,在这里硬想这四个概念也只能记住名字.安装好之后实际操作,然后进控制台点点点,就能理解的更深了.

1.2 其他概念

  • broker:
    又称 Server,接收客户端的连接,实现AMQP实体服务,接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
  • Connection
    连接,应用服务与Server的连接,本质上是基于tcp的链接.
  • Channel
    信道,客户端可建立多个Channel,每个Channel代表一个会话任务
  • message
    MessageProperties 和 body 构成.BasicProperties (基本API)可对消息的优先级、过期时间等参数进行设置
  • Exchange
    交换机,消息将根据 routeKey 被交换机转发给对应的绑定队列
  • queue
    消息最终被送到这里等待消费者取走,参数中的Auto-delete意为当前队列的最后一个消息被取出后是否自动删除
  • Binding
    exchange 和 queue 之间的虚拟连接,二者通过 routingkey 进行绑定
  • Routingkey:
    路由规则,交换机可以用它来确定消息改被路由到哪里
  • Virtual host
    虚拟主机,用于进行逻辑隔离,是最上层的消息路由,一个虚拟主机中可以有多个 Exchange 和 Queue,同一个虚拟主机中不能有名称一样的 Exchange 和 Queue

XBnHI0.png

2. 基本的消息模型

rabbitmq官方支持六种,常用的也就四种.

2.1 maven和虚拟主机

  • xml
    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
        </dependencies>
  • 虚拟主机

创建虚拟主机和用户

XBnqiV.png

2.2 workQueues模式

  • 生产者
public static void workQueues() throws IOException, TimeoutException {
        //1.链接
        ConnectionFactory factory= new ConnectionFactory();
        //参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/hcq");
        factory.setUsername("hcq");
        factory.setPassword("121056");
        //链接
        Connection connection = factory.newConnection();
        //channel
        Channel channel = connection.createChannel();
        //队列
        channel.queueDeclare("workQueues", true, false, false, null);
        //发送
        String body="workQueues";
        channel.basicPublish("","workQueues",null,body.getBytes());
        //释放
        channel.close();
        connection.close();
    }
  • 消费者
public static void workQueues() throws IOException, TimeoutException {
       //1.链接
       ConnectionFactory factory= new ConnectionFactory();
       //参数
       factory.setHost("127.0.0.1");
       factory.setPort(5672);
       factory.setVirtualHost("/hcq");
       factory.setUsername("hcq");
       factory.setPassword("121056");
       //链接
       Connection connection = factory.newConnection();
       //channel
       Channel channel = connection.createChannel();
       //队列
       channel.queueDeclare("workQueues", true, false, false, null);
       //接受
       Consumer consumer=new DefaultConsumer(channel){
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               System.out.println("body:"+new String(body));
           }
       };
       channel.basicConsume("workQueues",true,consumer);
   }

可以理解为没有指定交换机,而是走了队列,多个消费者消费的话也只能轮询消费.

发布模式

可以理解为没有指定routingKey,交换机会把消息发送到与它绑定的所有队列中.

  • 生产者
public class PubPro {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.链接
        ConnectionFactory factory= new ConnectionFactory();
        //参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/hcq");
        factory.setUsername("hcq");
        factory.setPassword("121056");
        //链接
        Connection connection = factory.newConnection();
        //channel
        Channel channel = connection.createChannel();
        //交换机
        channel.exchangeDeclare("start-fanout", BuiltinExchangeType.FANOUT,true,true,false,null);
        //队列
        channel.queueDeclare("start-fanout_queue1",true,false,false,null);
        channel.queueDeclare("start-fanout_queue2",true,false,false,null);
        //帮顶
        channel.queueBind("start-fanout_queue1","start-fanout","");
        channel.queueBind("start-fanout_queue2","start-fanout","");
        //发送
        String body="pub-pub:"+System.currentTimeMillis();
        channel.basicPublish("start-fanout","",null,body.getBytes());
    }

}
  • 消费者

消费者1,消费者2,需要监听不同消息队列.

public class PubCon2 {
    public static void main(String[] args) throws IOException, TimeoutException {
//1.链接
        ConnectionFactory factory= new ConnectionFactory();
        //参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/hcq");
        factory.setUsername("hcq");
        factory.setPassword("121056");
        //链接
        Connection connection = factory.newConnection();
        //channel
        Channel channel = connection.createChannel();
        //接受
        Consumer consumer=new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
            }
        };
        channel.basicConsume("start-fanout_queue2",true,consumer);
        //释放--消费者不需要关闭资源
        //channel.close();
        //connection.close();
    }

}

直连模式

可以理解为只有routingKey完全相同,才会把消息发送到与routingKey匹配的队列上.

  • 消费者
public class RoutingPro {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.链接
        ConnectionFactory factory= new ConnectionFactory();
        //参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/hcq");
        factory.setUsername("hcq");
        factory.setPassword("121056");
        //链接
        Connection connection = factory.newConnection();
        //channel
        Channel channel = connection.createChannel();
        //交换机
        channel.exchangeDeclare("start-route", BuiltinExchangeType.DIRECT,true,true,false,null);
        //队列
        channel.queueDeclare("start-route_queue1",true,false,false,null);
        channel.queueDeclare("start-route_queue2",true,false,false,null);
        //帮顶
        channel.queueBind("start-route_queue1","start-route","info");
        channel.queueBind("start-route_queue2","start-route","info");
        channel.queueBind("start-route_queue2","start-route","error");
        //发送
        String body="route:info:"+System.currentTimeMillis();
        String body2="route:error:"+System.currentTimeMillis();
        channel.basicPublish("start-route","info",null,body.getBytes());
        channel.basicPublish("start-route","error",null,body2.getBytes());
    }

}
  • 生产者
public class RoutingCon1 {
    public static void main(String[] args) throws IOException, TimeoutException {
//1.链接
        ConnectionFactory factory= new ConnectionFactory();
        //参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/hcq");
        factory.setUsername("hcq");
        factory.setPassword("121056");
        //链接
        Connection connection = factory.newConnection();
        //channel
        Channel channel = connection.createChannel();
        //接受
        Consumer consumer=new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
            }
        };
        channel.basicConsume("start-route_queue1",true,consumer);
        //释放--消费者不需要关闭资源
        //channel.close();
        //connection.close();
    }
}

topic模式

可以理解为直连模式升级版,当routingKey基于一定规则匹配上时,发送到与交换机帮顶的匹配上的队列中.

  • 生产者
public class TopicPro {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.链接
        ConnectionFactory factory= new ConnectionFactory();
        //参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/hcq");
        factory.setUsername("hcq");
        factory.setPassword("121056");
        //链接
        Connection connection = factory.newConnection();
        //channel
        Channel channel = connection.createChannel();
        //交换机
        channel.exchangeDeclare("start-topic", BuiltinExchangeType.TOPIC,true,true,false,null);
        //队列
        channel.queueDeclare("start-topic_queue1",true,false,false,null);
        channel.queueDeclare("start-topic_queue2",true,false,false,null);
        //帮顶
        channel.queueBind("start-topic_queue1","start-topic","#.error");
        channel.queueBind("start-topic_queue1","start-topic","order.*");
        channel.queueBind("start-topic_queue2","start-topic","*.*");
        //发送
        String body="topic:info:"+System.currentTimeMillis();
        String body2="topic:error:"+System.currentTimeMillis();
        channel.basicPublish("start-topic","order.info",null,body.getBytes());
        channel.basicPublish("start-topic","goods.error",null,body2.getBytes());
        channel.basicPublish("start-topic","order.error",null,body2.getBytes());
        channel.basicPublish("start-topic","goods.info",null,body.getBytes());
    }

}
  • 消费者
public class TopicCon1 {
    public static void main(String[] args) throws IOException, TimeoutException {
//1.链接
        ConnectionFactory factory= new ConnectionFactory();
        //参数
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/hcq");
        factory.setUsername("hcq");
        factory.setPassword("121056");
        //链接
        Connection connection = factory.newConnection();
        //channel
        Channel channel = connection.createChannel();
        //接受
        Consumer consumer=new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
            }
        };
        channel.basicConsume("start-topic_queue1",true,consumer);
        //释放--消费者不需要关闭资源
        //channel.close();
        //connection.close();
    }
}

3. 整合springboot

整合spring在刚学习mq的时候还做过,不过现在都是springboot的项目,因此只整合一下springboot.

  • pom文件

    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
        </dependencies>
  • yml

    server:
      port: 6700
    spring:
      rabbitmq:
        port: 5672
        host: localhost
        username: hcq
        password: 121056
        #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调
        publisher-confirm-type: correlated
        #保证交换机能把消息推送到队列中
        publisher-returns: true
        virtual-host: /hcq
        #这个配置是保证消费者会消费消息,手动确认
        listener:
          simple:
            acknowledge-mode: manual
        template:
          mandatory: true

    3.1 配置类

  • RabbitConfig

    package com.guide.rabbit.config;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /*配置类*/
    @Configuration
    @Slf4j
    public class RabbitConfig {
        /**
         * MQ地址
         */
        @Value("${spring.rabbitmq.host}")
        private String host;
        /**
         * MQ端口
         */
        @Value("${spring.rabbitmq.port}")
        private int port;
    
        /**
         * 用户名
         */
        @Value("${spring.rabbitmq.username}")
        private String username;
    
        /**
         * 密码
         */
        @Value("${spring.rabbitmq.password}")
        private String password;
        /**
         * 虚拟主机
         */
        @Value("${spring.rabbitmq.virtual-host}")
        private String virtualHost;
    
        // 定义一个或多个交换机
        public static final String EXCHANGE_A = "ex-direct-guide1";
        public static final String EXCHANGE_B = "ex-topic-guide2";
    
        // 定义队列
        public static final String QUEUE_A = "queue-guide1";
        public static final String QUEUE_B = "queue-guide2";
    
        // 定义routing-key
        public static final String ROUTING_KEY_A = "routing.key.guide1";
        public static final String ROUTING_KEY_B = "routing.key.*";
    
        /**
         * 针对消费者配置
         * 1. 设置交换机类型
         * 2. 将队列绑定到交换机
         FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
         HeadersExchange :通过添加属性key-value匹配
         DirectExchange:按照routingkey分发到指定队列
         TopicExchange:多关键字匹配
         **/
        @Bean
        public DirectExchange defaultExchange() {
            return new DirectExchange(EXCHANGE_A);
        }
    
        //topic交换机
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange(EXCHANGE_B);
        }
        /**
         * 获取队列A
         * @return
         */
        @Bean
        public Queue queueA() {
            return new Queue(QUEUE_A, true); //队列持久
        }
    
        @Bean
        public Queue queueB() {
            return new Queue(QUEUE_B, true); //队列持久
        }
        // 一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTING_KEY_A);
        }
    
        /**
         * 将queueB和ROUTING_KEY_B绑定
         * @return
         */
        @Bean
        public Binding binding1() {
            return BindingBuilder.bind(queueB()).to(topicExchange()).with(RabbitConfig.ROUTING_KEY_B);
        }
    
        /**
         * queueA和ROUTING_KEY_B绑定
         * @return
         */
        @Bean
        public Binding binding2() {
            return BindingBuilder.bind(queueA()).to(topicExchange()).with(RabbitConfig.ROUTING_KEY_B);
        }
        @Bean
        public MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }
        // 创建连接工厂,获取MQ的连接
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost(virtualHost);
            return connectionFactory;
        }
    
        // 创建rabbitTemplate
        @Bean(name = "rabbitTemplate")
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate template = new RabbitTemplate(connectionFactory());
            //默认使用simpleMessageConverter  在此处更改为json序列化方案
            template.setMessageConverter(messageConverter());
            return template;
        }
    
    }

    3.2 生产者

  • MsgProducer

其实就是调用rabbitTemplate来发送消息

@Component
@Slf4j
public class MsgProducer{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    private ObjectMapper objectMapper=new ObjectMapper();
    public void sendMsg(String content) throws JsonProcessingException {

        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        RabbitMessage rabbitMessage=new RabbitMessage();
        rabbitMessage.setId(21323132132l);
        rabbitMessage.setMsg(content);
        rabbitMessage.setSendTime(new Date());
        log.info("【++++++++++++++++++ message :{}】", objectMapper.writeValueAsString(rabbitMessage));
        //把消息放入ROUTING_KEY_A对应的队列当中去,对应的是队列A
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTING_KEY_A, rabbitMessage, correlationId);
    }
}

3.3 消费者

  • MsgReceiver

形式上就是在类上加@RabbitListener(queues = RabbitConfig.QUEUE_A) 注解,要标注队列名.
在方法上加 @RabbitHandler注解.

@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MsgReceiver {
    @RabbitHandler
    public void process(@Payload RabbitMessage rabbitMessage) {
        //String body = new String(message.getBody());
        System.out.println("接收处理队列A当中的消息: " + rabbitMessage.toString());
    }
}

小结

  1. 总结了rabbitmq中的组件,概念,
  2. 四种常见的模式
  3. 整合springboot

Author: 向天歌
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint policy. If reproduced, please indicate source 向天歌 !
  TOC