批量消息

作者: 念䋛 | 来源:发表于2021-06-19 09:09 被阅读0次

批量消息
批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞 吐量。
相信大家在官网以及测试代码中都看到了关键的注释:如果批量消息大于1MB就不要用一个批次 发送,而要拆分成多个批次消息发送。也就是说,一个批次消息的大小不要超过1MB
实际使用时,这个1MB的限制可以稍微扩大点,实际大的限制是4194304字节,大概4MB。但 是使用批量消息时,这个消息长度确实是必须考虑的一个问题。而且批量消息的使用是有一定限 制的,这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。
简单的发送

public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
    producer.start();
    String topic = "BatchTest";
    List<Message> messages = new ArrayList<>();
    messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
    messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
    messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));

    producer.send(messages);
    producer.shutdown();
}

如果不能确定发送的消息是否大于1M,可以把list分为若干个小于1兆的list

public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.start();

        //large batch
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>(100 * 1000);
        for (int i = 0; i < 100 * 1000; i++) {
            messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
        }
        //把大的list 分为小于1000*1000的若干个list
        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
            List<Message> listItem = splitter.next();
            producer.send(listItem);
        }
        producer.shutdown();
    }

}

class ListSplitter implements Iterator<List<Message>> {
    private int sizeLimit = 1000 * 1000;
    private final List<Message> messages;
    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            //计算每个消息的大小,topic名称+消息体+属性
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            //日志的开销
            tmpSize = tmpSize + 20;
            //如果大于定义的大小
            if (tmpSize > sizeLimit) {
                //单个消息超过了最大的限制
                //忽略,否则会阻塞分裂的进程
                if (nextIndex - currIndex == 0) {
                    //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > sizeLimit) {
                break;
            } else {
                totalSize += tmpSize;
            }

        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("Not allowed to remove");
    }

相关文章

  • 消费消息(二)

    批量消息发送 批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitSt...

  • 批量消息

    批量消息批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞 吐量。相信大...

  • 第五章 RocketMQ 批量消息

    批量发送消息:能显著提高传递小消息的性能。限制是这些批量消息应该有相同的 topic,相同的 waitStoreM...

  • 批量发送消息

    rocketmq提供批量发送消息的机制,但消息发送的数据包大小不能超过1M 示例代码: String topic ...

  • store模块阅读20:CommitLog(4):Message

    说明 本节讲解批量消息(MessageExtBatch)的编码器,以及上一节没讲完了批量消息的doAppend方法...

  • Springboot注解@KafkaListener实现Kafk

    在使用时Kafka时,经常遇到大批量消息在队列中,如果一个消息一个消息的消费的话效率太低下了,所以批量消费消息是很...

  • 批量生产消息

    1、简单的生产消息 public class SimpleBatchProducer {public static...

  • RocketMQ批量消息机制

    不支持的操作 Producer 从客户端日志中看出,rocketmq 消息体大小有限制,CODE: 13 DE...

  • 91 RocketMQ

    RocketMQ 作为一种纯java,分布式,队列模型的开源消息中间件,支持事务消息,顺序消息,批量消息,定时消息...

  • RocketMQ-Producer生产者解析

    Producer 概念说明* 初始化流程&流程图&相关类关系说明* 消息发送过程* 批量消息发送* 发送顺序消息...

网友评论

    本文标题:批量消息

    本文链接:https://www.haomeiwen.com/subject/wfthyltx.html