topics模式适合订阅者按需索取。
发布者发布的routing中带有不同的特点,举例如下
"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox",
"lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"
中间用点分隔
订阅者按照需要进行筛选,删选规则如下
* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.
订阅者就可以按照比如.orange.,..rabbit等等条件来过滤消息
那么代码如下
Tut5Config.java
@Profile({"tut5","topics"})
@Configuration
public class Tut5Config {
@Bean
public TopicExchange topic() {
return new TopicExchange("tut.topic");
}
@Profile("receiver")
private static class ReceiverConfig {
@Bean
public Tut5Receiver receiver() {
return new Tut5Receiver();
}
@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}
@Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
}
@Bean
public Binding binding1a(TopicExchange topic,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(topic)
.with("*.orange.*");
}
@Bean
public Binding binding1b(TopicExchange topic,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(topic)
.with("*.*.rabbit");
}
@Bean
public Binding binding2a(TopicExchange topic,
Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2)
.to(topic)
.with("lazy.#");
}
}
@Profile("sender")
@Bean
public Tut5Sender sender() {
return new Tut5Sender();
}
}
Tut5Sender.java
public class Tut5Sender {
@Autowired
private RabbitTemplate template;
@Autowired
private TopicExchange topic;
AtomicInteger index = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox",
"lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
StringBuilder builder = new StringBuilder("Hello to ");
if (this.index.incrementAndGet() == keys.length) {
this.index.set(0);
}
String key = keys[this.index.get()];
builder.append(key).append(' ');
builder.append(this.count.incrementAndGet());
String message = builder.toString();
template.convertAndSend(topic.getName(), key, message);
System.out.println(" [x] Sent '" + message + "'");
}
}
Tut5Receiver.java
public class Tut5Receiver {
@RabbitListener(queues = "#{autoDeleteQueue1.name}")
public void receive1(String in) throws InterruptedException {
receive(in, 1);
}
@RabbitListener(queues = "#{autoDeleteQueue2.name}")
public void receive2(String in) throws InterruptedException {
receive(in, 2);
}
public void receive(String in, int receiver) throws
InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
System.out.println("instance " + receiver + " [x] Received '"
+ in + "'");
doWork(in);
watch.stop();
System.out.println("instance " + receiver + " [x] Done in "
+ watch.getTotalTimeSeconds() + "s");
}
private void doWork(String in) throws InterruptedException {
for (char ch : in.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
}
}
网友评论