使用场景
-
环境隔离需求
- 当需要在同一个Kafka集群上同时运行生产环境和灰度环境的消息队列
- 避免不同环境的消息互相干扰
- 方便进行灰度测试和验证
-
动态Topic路由
- 无需修改业务代码
- 通过配置实现消息的自动路由
- 支持灵活切换环境
实现原理
-
生产者拦截器(KafkaProducerInterceptor)
- 实现了 ProducerInterceptor 接口
- 在消息发送前通过 onSend 方法拦截消息
- 根据配置的前缀( AppConst.KAFKA_PREFIX_KEY )动态修改目标Topic
- 例如:原始topic为"order",配置前缀为"grey_",最终发送到"grey_order"队列
2.监听器配置
- 需要配合 KafkaListenerFactoryBeanPostProcessor 使用
- 用于修改消费者监听的队列名称
- 确保消费者监听的队列与生产者发送的队列相匹配
实现代码
import cn.hutool.core.util.StrUtil;
import cn.example.common.AppConst;
import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.reflections.Reflections;
import org.reflections.scanners.FieldAnnotationsScanner;
import org.reflections.scanners.MethodAnnotationsScanner;
import org.reflections.scanners.MethodParameterScanner;
import org.reflections.scanners.SubTypesScanner;
import org.reflections.util.ConfigurationBuilder;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.boot.autoconfigure.AutoConfigurationPackages;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* 需要配合cn.example.interceptor.KafkaProducerInterceptor
* KafkaListenerFactoryBeanPostProcessor:更改监听队列
* KafkaProducerInterceptor:更改消息发送队列
*/
@Component
@Slf4j
public class KafkaListenerFactoryBeanPostProcessor implements BeanFactoryPostProcessor, EnvironmentAware {
private Environment env;
@SneakyThrows
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
if (StrUtil.isNotBlank(env.getProperty(AppConst.KAFKA_PREFIX_KEY))) {
List<String> packageNames = AutoConfigurationPackages.get(beanFactory);
for (String packageName : packageNames) {
Reflections reflections = new Reflections(new ConfigurationBuilder()
// 指定路径URL
.forPackages(packageName)
// 添加子类扫描工具
.addScanners(new SubTypesScanner())
// 添加 属性注解扫描工具
.addScanners(new FieldAnnotationsScanner())
// 添加 方法注解扫描工具
.addScanners(new MethodAnnotationsScanner())
// 添加方法参数扫描工具
.addScanners(new MethodParameterScanner())
);
Set<Method> methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class);
if (!CollectionUtils.isEmpty(methodSet)) {
for (Method method : methodSet) {
KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class);
changeTopics(kafkaListener);
}
}
}
}
}
private void changeTopics(KafkaListener kafkaListener) throws Exception {
InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener);
Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues");
memberValuesField.setAccessible(true);
Map<String, Object> memberValues = (Map<String, Object>) memberValuesField.get(invocationHandler);
String[] topics = (String[]) memberValues.get("topics");
log.info("修改前topics:{}", Lists.newArrayList(topics));
for (int i = 0; i < topics.length; i++) {
topics[i] = env.getProperty(AppConst.KAFKA_PREFIX_KEY) + topics[i];
}
memberValues.put("topics", topics);
log.info("修改后topics:{}", Lists.newArrayList(kafkaListener.topics()));
}
@Override
public void setEnvironment(Environment environment) {
env = environment;
}
}
import cn.hutool.core.util.StrUtil;
import cn.example.common.AppConst;
import cn.example.common.utils.ApplicationContextUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import java.util.Map;
import java.util.Objects;
/**
* 需要配合cn.example.beanPostProcesser.KafkaListenerFactoryBeanPostProcessor
* KafkaListenerFactoryBeanPostProcessor:更改监听队列
* KafkaProducerInterceptor:更改消息发送队列
*/
@Slf4j
public class KafkaProducerInterceptor implements ProducerInterceptor<String, String> {
/**
* 运行在用户主线程中,在消息被序列化之前调用
*
* @param record
* @return
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
Environment environment = ApplicationContextUtil.getEnvironment();
String prefixKey = environment.getProperty(AppConst.KAFKA_PREFIX_KEY);
if (StrUtil.isNotBlank(prefixKey)) {
log.info("原始topic:{}", record.topic());
String targetTopic = prefixKey + record.topic();
log.info("修改后的topic:{}",prefixKey+record.topic());
return new ProducerRecord<String, String>( targetTopic,
record.partition(), record.timestamp(), record.key(), record.value());
}
return record;
}
/**
* 在消息被应答之前或者消息发送失败时调用,通常在producer回调逻辑触发之前,运行在produer的io线程中
*
* @param metadata
* @param exception
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
log.info("实际topic:{}", metadata.topic());
}
/**
* 清理工作
*/
@Override
public void close() {
}
/**
* 初始化工作
*
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {
}
}
配置方式
- 在配置文件中设置前缀
# 生产环境不设置前缀
kafka.prefix.key=
# 灰度环境设置前缀
kafka.prefix.key=grey_









网友评论