业务背景
我所在公司系统要升级到jdk8, 其中一个工程需要从IBM MQ 切换到ActiveMQ。 消息发送方的消息队列名称是配置在数据库中的,不同的环境有不同的配置,且同一个消息可能有不同的消息队列名称(因为是很古老的项目,所以谁也说不清楚为什么有这么多不同的队列),但是为了在升级过程中不会让客户修改任何数据,我们决定从程序入手解决不同对列名称问题。我们的消息消费端是基于SpringBoot的工程。
思路
虽然消息生产方在数据库中配置了多个消息队列,但是我们的消费逻辑是一样的。所以,我们就要用同一个消息消费者去消费这些队列中的消息,消息消费流程如下图。如何注册达到这样的效果呢?
activemq.png
注册bean
我们使用spring常用的注册bean的方式要么是通过xml文件配置,在启动过程中加载解析,最终成为容器中的bean, 或者我们使用注解的方式,例如@Controller,@Service, @Resource,@Component,@Bean 等,上面是我们常用的方式。现在我们要通过java代码的方式去生成bean.
- new一个对象,放到容器中
程序初始化完成后,获取上下文,直接把实例添加到容器中,然后给这个实例起一个名字。
AnnotationConfigServletWebServerApplicationContext beanFactory = (AnnotationConfigServletWebServerApplicationContext)context;
RootBeanDefinition beanDefinition = new RootBeanDefinition(Person.class);
beanDefinition.setAttribute("name","张老三");
beanFactory.registerBeanDefinition("person",beanDefinition);
- 继承 BeanDefinitionRegistryPostProcessor
@Configuration
public class DefinitionRegistryPostProcessor implements BeanDefinitionRegistryPostProcessor {
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
registerListener("1,2,3,4,5,6,7,8,9",new CustomerListener(),beanDefinitionRegistry);
/* this.registerListener("test1",new RSSQLInvoiceUploadAgent(),beanDefinitionRegistry);
this.registerListener("test2",new GPRetrievePaymentAgent(),beanDefinitionRegistry);
this.registerListener("test3",new GPCustomerAgent(),beanDefinitionRegistry);*/
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
}
private void registerListener(String destination, MessageListener queueListener,BeanDefinitionRegistry beanDefinitionRegistry){
if (StringUtils.isEmpty(destination)){
return;
}
String[] jmsQueues = destination.split(",");
String[] beans = beanDefinitionRegistry.getBeanDefinitionNames();
for(int m=0;m<beans.length;m++){
System.out.println(beans[m]);
}
for(int i=0;i<jmsQueues.length;i++){
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(DefaultMessageListenerContainer.class);
beanDefinitionBuilder.addPropertyValue("cacheLevel",Integer.valueOf("1"));
beanDefinitionBuilder.addPropertyValue("concurrentConsumers",Integer.valueOf("1"));
beanDefinitionBuilder.addPropertyValue("connectionFactory",beanDefinitionRegistry.getBeanDefinition("cachingConnectionFactoryConfig"));
beanDefinitionBuilder.addPropertyValue("messageListener",queueListener);
beanDefinitionBuilder.addPropertyValue("destinationName",jmsQueues[i]);
beanDefinitionBuilder.addPropertyValue("receiveTimeout",Long.valueOf("3000"));
beanDefinitionRegistry.registerBeanDefinition(queueListener.getClass().getSimpleName()+i,beanDefinitionBuilder.getBeanDefinition());
}
}
但是最终没有使用这种方法在我们实际的开发过程中,因为@JmsListener 是支持通过"," 分隔监听我们需要的消息队列的。










网友评论