SpringBoot整合RabbitMQ

继上一篇《Spring整合RabbitMQ实例》之后,再写一篇SpringBoot的。

一、创建Spring Starter工程

二、配置文件
application.properties

1
2
3
4
5
6
7
8
9
10
11
12
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

my.first.queue=BOOT_FIRST_QUEUE
my.second.queue=BOOT_SECOND_QUEUE
my.third.queue=BOOT_THIRD_QUEUE

my.topic.exchange=BOOT_TOPIC_EXCHANGE
my.fanout.exchange=BOOT_FANOUT_EXCHANGE

log4j.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
log4j.rootLogger=INFO,consoleAppender,fileAppender
log4j.category.ETTAppLogger=DEBUG, ettAppLogFile
log4j.appender.consoleAppender=org.apache.log4j.ConsoleAppender
log4j.appender.consoleAppender.Threshold=TRACE
log4j.appender.consoleAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.consoleAppender.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss SSS} ->[%t]--[%-5p]--[%c{1}]--%m%n
log4j.appender.fileAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.fileAppender.File=d:/log/rabbitmq/boot-debug1.log
log4j.appender.fileAppender.DatePattern='_'yyyy-MM-dd'.log'
log4j.appender.fileAppender.Threshold=TRACE
log4j.appender.fileAppender.Encoding=BIG5
log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.fileAppender.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss SSS}-->[%t]--[%-5p]--[%c{1}]--%m%n
log4j.appender.ettAppLogFile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.ettAppLogFile.File=d:/log/rabbitmq/boot-ettdebug.log
log4j.appender.ettAppLogFile.DatePattern='_'yyyy-MM-dd'.log'
log4j.appender.ettAppLogFile.Threshold=DEBUG
log4j.appender.ettAppLogFile.layout=org.apache.log4j.PatternLayout
log4j.appender.ettAppLogFile.layout.ConversionPattern=%-d{yyyy-MM-dd HH\:mm\:ss SSS}-->[%t]--[%-5p]--[%c{1}]--%m%n

三、MQ配置类
队列、交换机、绑定关系都在这里配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.jianyu.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ配置类
 *
 * @author jianyu.bai
 *
 */

@Configuration
public class RabbitConfig {

    @Value("${my.first.queue}")
    private String firstQueue;

    @Value("${my.second.queue}")
    private String secondQueue;
   
    @Value("${my.third.queue}")
    private String thirdQueue;

    @Value("${my.topic.exchange}")
    private String topicExchange;

    @Value("${my.fanout.exchange}")
    private String fanoutExchange;

    @Bean(name = "firstQueue")
    public Queue getFirstQueue() {
        return new Queue(firstQueue);
    }

    @Bean(name = "secondQueue")
    public Queue getSecondQueue() {
        return new Queue(secondQueue);
    }
   
    @Bean(name = "thirdQueue")
    public Queue getThirdQueue() {
        return new Queue(thirdQueue);
    }

    @Bean(name = "topicExchange")
    public TopicExchange getTopicExchange() {
        return new TopicExchange(topicExchange);
    }
   
    @Bean(name = "fanoutExchange")
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(fanoutExchange);
    }
   
    @Bean
    Binding bindingSecond(@Qualifier("secondQueue") Queue queue, @Qualifier("topicExchange")TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("#.food.#");
    }
   
    @Bean
    Binding bindingThird(@Qualifier("thirdQueue") Queue queue, @Qualifier("fanoutExchange")FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

}

四、生产者
对三种类型的交换机发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.jianyu.provider;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class HelloProvider {

    private Logger log = Logger.getLogger(HelloProvider.class);

    @Value("${my.topic.exchange}")
    private String topicExchange;
   
    @Value("${my.fanout.exchange}")
    private String fanoutExchange;
   
    @Value("${my.first.queue}")
    private String firstQueue;

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        // String context = "hello " + new Date();
        // log.info("Provider send message : " + context);
       
        // Direct模式
        // firstQueue会收到
        rabbitTemplate.convertAndSend(firstQueue,"a direct message");
       
        // Topic模式
        // secondQueue会收到
        rabbitTemplate.convertAndSend(topicExchange, "changsha.food.tomorrow", "a topic msg : changsha.food.tomorrow");
        rabbitTemplate.convertAndSend(topicExchange, "yongzhou.food.today", "a topic msg : yongzhou.food.today");

        // Fanout模式
        // thirdQueue会收到
        rabbitTemplate.convertAndSend(fanoutExchange, "", "a fanout message");
    }

}

五、消费者
第一个队列,从Direct交换机获取消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.jianyu.consumer;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "${my.first.queue}")
public class FirstConsumer {
    private Logger log = Logger.getLogger(FirstConsumer.class);

    @RabbitHandler
    public void process(String msg) {
        log.info("First queue received message: " + msg);
    }
 
}

第二个队列,从Topic交换机获取消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.jianyu.consumer;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "${my.second.queue}")
public class SecondConsumer {
    private Logger log = Logger.getLogger(SecondConsumer.class);

    @RabbitHandler
    public void process(String msg) {
        log.info("Second queue received message: " + msg);
    }
 
}

第三个队列,从Fanout交换机获取消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.jianyu.consumer;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "${my.third.queue}")
public class ThirdConsumer {
    private Logger log = Logger.getLogger(ThirdConsumer.class);

    @RabbitHandler
    public void process(String msg) {
        log.info("Third queue received message: " + msg);
    }
}

六、测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.jianyu;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.jianyu.provider.HelloProvider;

@RunWith(SpringRunner.class)
@SpringBootTest
public class MqAppTests {
   
    @Autowired
    private HelloProvider helloProvider;
   
    @Test
    public void hello()  throws Exception {
        helloProvider.send();
    }
}

代码:https://github.com/baijy/springboot-rabbitmq