消息消费的方式有俩种,一种是并发消费,一种是顺序消费。
前面对于并发消费已经有一定的了解,现在了解一下顺序消费的实现。
先来看看负载服务对于顺序消费与并发消费的不同。
对于队列上的分配,其实还是依赖具体的分配策略。
然鹅,分配完策略之后,是否可以从队列拉取消息,顺序消费与并发消费还是存在着许多不同。
对于并发消费来说,生成拉取队列并没有限制,然而对于顺序消费来说,则需要上锁。
具体还是看看生成拉取队列数据的方法
顺序消费如何生成拉取队列
public abstract class RebalanceImpl {
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
--------------------------------------------------------------------------------------------------------------------------------------------
1.将旧的处理队列移除。由于重新分配队列,对于以前的队列将不再处理(拉取,或者消费消息)
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
----------------------------------------------------------------------------------------------------------------------------------------------
2.根据分配的队列,生成拉取请求队列
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
唯一不同的点,并发并消费没有特殊处理,然而顺序要求获取到这个队列的锁。
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
分发拉取请求,进行拉取
this.dispatchPullRequest(pullRequestList);
return changed;
}
}
通过上面代码来看。并发消费在进行拉取消息的时候,是不需要对消息队列上锁的,而顺序消费是需要的。
如果不上锁,在某个消费者没有消费完的时候,另外一个消费者获取到了队列,可以进行消息消费,那就无法是有序的消费。
顺序消费是如何对队列上锁的?
上面的代码片段中,有一个Lock方法,用于对队列上锁。接下来看看实现
public abstract class RebalanceImpl {
public boolean lock(final MessageQueue mq) {
找到某个Broker集群的master的地址
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.getMqSet().add(mq);
try {
向broker的master申请去锁住队列,最后返回的队列集合就是已经上锁的队列。
Set<MessageQueue> lockedMq =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mmqq : lockedMq) {
ProcessQueue processQueue = this.processQueueTable.get(mmqq);
if (processQueue != null) {
设置标记,同时更新上锁时间,表明已经获取到该队列的锁
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
如果返回的队列不包含此队列,直接返回false。
boolean lockOK = lockedMq.contains(mq);
return lockOK;
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
}
}
return false;
}
}
从实现上来看,就是向队列的broker地址中去申请对队列进行上锁。
broker端如何对队列进行上锁?
还是回到BrokerController对应的处理器。再查看一下对应的RequestCode.LOCK_BATCH_MQ的处理方法。
最后定位到代码如下
public class RebalanceLockManager {
LockEntry用于记录每个客户端的上锁时间
static class LockEntry {
private String clientId;
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public long getLastUpdateTimestamp() {
return lastUpdateTimestamp;
}
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
public boolean isLocked(final String clientId) {
boolean eq = this.clientId.equals(clientId);
return eq && !this.isExpired();
}
public boolean isExpired() {
boolean expired =
(System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
return expired;
}
}
用于记录每个队列的锁的持有者
private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
用于判断某个客户端是否持有该队列的锁,支持重入---更新持有锁的时间
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (groupValue != null) {
LockEntry lockEntry = groupValue.get(mq);
if (lockEntry != null) {
boolean locked = lockEntry.isLocked(clientId);
if (locked) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
}
return locked;
}
}
return false;
}
某个组的消费者尝试对队列进行上锁
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
final String clientId) {
Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
for (MessageQueue mq : mqs) {
if (this.isLocked(group, mq, clientId)) {
保存已经上锁的消息队列
lockedMqs.add(mq);
} else {
保存获取不到锁的消息队列
notLockedMqs.add(mq);
}
}
对获取不到锁的消息队列的处理
if (!notLockedMqs.isEmpty()) {
try {
this.lock.lockInterruptibly();
try {
如果获取不到该组的任何信息,先进行初始化
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (null == groupValue) {
groupValue = new ConcurrentHashMap<>(32);
this.mqLockTable.put(group, groupValue);
}
for (MessageQueue mq : notLockedMqs) {
LockEntry lockEntry = groupValue.get(mq);
如果不存在该队列的持有者信息,直接上锁,说明没有被上锁
if (null == lockEntry) {
lockEntry = new LockEntry();
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
}
已经持有了,进行重入,更新上锁时间
if (lockEntry.isLocked(clientId)) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
lockedMqs.add(mq);
continue;
}
String oldClientId = lockEntry.getClientId();
如果持有者id与当前消费者id不一样,但是原先持有者已经过期了,直接进行上锁
if (lockEntry.isExpired()) {
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
lockedMqs.add(mq);
continue;
}
}
} finally {
最后解锁
this.lock.unlock();
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
}
}
返回已经上锁的消息队列
return lockedMqs;
}
}
Broker中的实现是比较简单的,其实说白了就是利用一个成员变量来保持消息队列与锁持有者的关系。
锁默认是60s,可以通过配置来调整这个锁的持有时间。
那么拿到锁之后,就是进行请求的拉取。其实这一块实现上大体是相似的。
拉取消息
拉取的实现代码位置如下org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage(PullRequest)
这里面区别最大的点。在进行消息拉取前,并发消费者要判断偏移量是否过大,过大不拉取,而顺序消费者则是需要判断锁是否已经失效。锁没有失效才可以进行消息的拉取。锁若失效了,则稍后重试。
if (!this.consumeOrderly) {
并发消费---偏移量过大,暂时不拉取消息
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
省略代码。。。
return;
}
} else {
顺序消费---锁没有失效的情况下才可以拉取
if (processQueue.isLocked()) {
第一次上锁,需要设置偏移量,才知道从队列的哪个位置开始拉取消息
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
。。。省略部分代码
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
锁若失效,晚点再拉取。
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
为什么这里需要这个判断,有可能之前上锁的队列,在解锁或者时间到期之后,被其他消费者获取到了,这个时候处理队列的锁就失效了。这个时候再进行消息拉取其实意义不大。所以锁失效了,会延时稍后继续尝试拉取。
那么这里有一个机制,会去定时更新锁,如果后续锁获取到了,就会更新处理队列的状态,就可以继续对消息进行拉取了。
锁的自动更新
来看看顺序消费服务的实现
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
直接看内部实现
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
}
从顺序消费的实现来看,会定时去更新处理队列的锁。
最后看看如何去对处理队列上锁
public abstract class RebalanceImpl {
private HashMap<String/* brokerName */, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {
HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>();
for (MessageQueue mq : this.processQueueTable.keySet()) {
Set<MessageQueue> mqs = result.get(mq.getBrokerName());
if (null == mqs) {
mqs = new HashSet<MessageQueue>();
result.put(mq.getBrokerName(), mqs);
}
mqs.add(mq);
}
return result;
}
public void lockAll() {
将当前的处理队列以broker进行分组
HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<MessageQueue>> entry = it.next();
final String brokerName = entry.getKey();
final Set<MessageQueue> mqs = entry.getValue();
if (mqs.isEmpty())
continue;
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.setMqSet(mqs);
try {
对消息队列进行上锁
Set<MessageQueue> lockOKMQSet =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
将上锁的消息队列以及对应的处理队列的状态进行更新
for (MessageQueue mq : lockOKMQSet) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
if (!processQueue.isLocked()) {
log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
}
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
没有上锁的消息队列以及处理队列,则lock设置为false
for (MessageQueue mq : mqs) {
if (!lockOKMQSet.contains(mq)) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
processQueue.setLocked(false);
log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);
}
}
}
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mqs, e);
}
}
}
}
}
那么当获取到消息之后,顺序消费者如何消费?
这里还是比较简单的
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
class ConsumeRequest implements Runnable {
消费请求对应的处理队列
private final ProcessQueue processQueue;
消费请求对应的消息队列
private final MessageQueue messageQueue;
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}
public ProcessQueue getProcessQueue() {
return processQueue;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
@Override
public void run() {
如果该处理队列已经被废弃,直接返回
if (this.processQueue.isDropped()) {
return;
}
获取队列的锁,顺序消费虽然也是多线程,但是每个消息队列同一时刻只能由一个线程消费。
但是这里并没有完全锁住队列。所以后面又加了一个消费锁
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
获取到锁,进行消费。
广播模式下,并不需要对broker中的消息队列上锁。
集群模式下,则需要对Broker的消息队列上锁,同时锁未过期才可以进行消费
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
记录开始时间,每个消费者持有队列的时间不超过1分钟
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
处理队列已经废弃,跳出循环
if (this.processQueue.isDropped()) {
break;
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
集群模式下,没有获取到锁,待会重试消费
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
集群模式下,锁过期了(最近1分钟没有更新锁),待会重试消费
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
long interval = System.currentTimeMillis() - beginTime;
超过1分钟,重新提交消费请求
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
省略部分代码
long beginTimestamp = System.currentTimeMillis();
默认消费结果成功
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
获取处理队列的消费锁
this.processQueue.getLockConsume().lock();
如果处理队列已经被废弃了,直接跳出循环
if (this.processQueue.isDropped()) {
break;
}
消费结果
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
hasException = true;
} finally {
解锁
this.processQueue.getLockConsume().unlock();
}
if (null == status
|| ConsumeOrderlyStatus.ROLLBACK == status
|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
当返回结果为以上结果,只是打印个日志,所以直接省略了
}
消费总时长
long consumeRT = System.currentTimeMillis() - beginTimestamp;
根据返回结果设置返回类型
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeOrderlyStatus.SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
省略部分代码
if (null == status) {
status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
省略部分代码
ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
根据消费结果,决定是否继续消费
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
}
}
对于消费结果的处理
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
检查消费次数。一开始没有超过最大消费次数,则稍后重新消费。
超过最大消费次数进行sendback。sendback只要有一条消息失败,再次进行消费。
private boolean checkReconsumeTimes(List<MessageExt> msgs) {
boolean suspend = false;
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
if (!sendMessageBack(msg)) {
suspend = true;
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
}
} else {
suspend = true;
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
}
}
}
return suspend;
}
public boolean processConsumeResult(
final List<MessageExt> msgs,
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest
) {
boolean continueConsume = true;
long commitOffset = -1L;
默认自动提交
if (context.isAutoCommit()) {
switch (status) {
case COMMIT:
case ROLLBACK:
log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
consumeRequest.getMessageQueue());
case SUCCESS:
只有返回结果为成功,才会去提交。将消费成功的消息从成员变量中移除。
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
检查消费次数是否已经超过最大消费次数,没超过进行重试,超过进行sendback
sendback存在失败,则进行内部重试
如果所有消息都没有超过最大次数,则进行重试
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
}
break;
default:
break;
}
} else {
switch (status) {
case SUCCESS:
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case COMMIT:
将消费的消息
commitOffset = consumeRequest.getProcessQueue().commit();
break;
case ROLLBACK:
回滚,将消息放回去
consumeRequest.getProcessQueue().rollback();
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
}
break;
default:
break;
}
}
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
同步偏移量
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}
return continueConsume;
}
}
网友评论