使用阿里云rocketMq:
依赖:
groupId:com.aliyun.openservices
artifactId:ons-client
version:1.2.7.Final
生产者代码:
public classMessageQueryConfiguration {
private String ons_addr = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet";
private String producerId =xxx;
private String consumerId=xxx;
private String accessKey=xxx;
private String secretKey=xxx;
private String tag=xxx;
private Producer producer;
privateStringmessageModel= PropertyValueConst.BROADCASTING;
@Bean
Producer getProducer(){
return ONSFactory.createProducer(properties());
}
publicProperties properties() {
Properties properties =newProperties();
properties.put(PropertyKeyConst.ProducerId,producerId);
properties.put(PropertyKeyConst.ConsumerId,consumerId);
properties.put(PropertyKeyConst.AccessKey,accessKey);
properties.put(PropertyKeyConst.SecretKey,secretKey);
properties.put(PropertyKeyConst.ONSAddr,syncApplication);
properties.put(PropertyKeyConst.MessageModel,messageModel);
return properties;
}
// 默认触发入口
private voidinit() {
producer= ONSFactory.createProducer(properties());
start();
}
public voidstart() {
producer.start();
}
public voidshutdown() {
producer.shutdown();
}
// 遍历发送的信息-push 消息
public Message msg(Object object) {
try{
return newMessage(
"topic",
tag,
newObjectMapper().writeValueAsBytes(object)
);
}catch(JsonProcessingException e) {
e.printStackTrace();
}
return null;
}
// 开始写入消息
public void Thread(Object object) {
init();
send(msg(object));
}
// 发送消息
public void send(finalMessage message) {
producer.send(message);
}
}
两种消费模式:
集群消费:处于同一个topic和tag下时的任意一条消息只会被同一逻辑下消费节点的某一个消费者消费。
集群:MQ 约定使用相同 Consumer ID 的订阅者属于同一个集群,同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点。
集群消费适用场景&注意事项:
*:消费端集群化部署,每条消息只需要被处理一次。
*:由于消费进度在服务端维护,可靠性更高。
*:集群消费模式下,每一条消息都只会被分发到一台机器上处理,如果需要被集群下的每一台机器都处理,请使用广播模式。
*:集群消费模式下,不保证消息的每一次失败重投等逻辑都能路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
注:如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
广播消费:处于同一个topic和tag下时的任意一条消息只会被同一逻辑下消费节点的每一个消费者消费。
*:广播消费适用场景&注意事项:
*:顺序消息暂不支持广播消费模式。
*:每条消息都需要被相同逻辑的多台机器处理。
*:消费进度在客户端维护,出现重复的概率稍大于集群模式。
*:广播模式下,MQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
*:广播模式下,第一次启动时默认从最新消息消费,客户端的消费进度是被持久化在客户端本地的隐藏文件中,因此不建议删除该隐藏文件,否则会丢失部分消息。
*:广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
目前仅 Java 客户端支持广播模式。
*:广播模式下服务端不维护消费进度,所以 MQ 控制台不支持消息堆积查询和堆积报警功能。














网友评论