Pulsar源码解析-延迟队列实现

问题:

  1. 延迟队列的作用
  2. 延迟队列数据结构
  3. 如何添加到延迟队列?
  4. 如何从延迟队列取出?

问题1延迟队列的作用

延迟队列用来解决需要延迟消费的场景,例如 电商中订单超时15分钟未支付自动关闭
特点:Pulsar的延迟队列可支持任意维度的延迟
存储:堆外内存
消息数:1G堆外内存 = 1 * 1024 * 1024 * 1024 / 24 = 44,739,242条,1G支持4400万条可满足大多数业务场景,不足就加内存。(ps: 消息是持久化到磁盘,延迟消息是从磁盘读取出来放入延迟队列中,只放消息的索引:2个long类型索引+1个long类型时间=24字节,假设broker宕机,启动后消费者读取消息会先放入延迟队列校验是否到时间,然后推送给消费者)
缺点:当延迟的时间维度差距很大(有几个月的,有几分钟的)影响磁盘删除,这时需要使用者按时间维度分topic发送(分多个层级分、时、天、周、月、季度、年),为什么呢?因为pulsar的删除最小单元是Ledger,一个Ledger可以存很多消息,其中有一条未消费,整个Ledger都不会删。导致磁盘越来越大

问题2延迟队列数据结构

延迟队列的实现是TripleLongPriorityQueue,本质是ByteBuf,一次写入24字节

public class TripleLongPriorityQueue implements AutoCloseable {

    private static final int SIZE_OF_LONG = 8;
    private static final int DEFAULT_INITIAL_CAPACITY = 16;

    // Each item is composed of 3 longs
    private static final int ITEMS_COUNT = 3;

    private static final int TUPLE_SIZE = ITEMS_COUNT * SIZE_OF_LONG;

    private final ByteBuf buffer;

    private int capacity;
    private int size;
    public TripleLongPriorityQueue() {
        this(DEFAULT_INITIAL_CAPACITY);
    }

    public TripleLongPriorityQueue(int initialCapacity) {
        capacity = initialCapacity;
        buffer = PooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity * ITEMS_COUNT * SIZE_OF_LONG);
        size = 0;
    }
    ...
}

问题3如何添加到延迟队列?

public class TripleLongPriorityQueue implements AutoCloseable {

    public void add(long n1, long n2, long n3) {
    	// 扩容
        if (size == capacity) {
            increaseCapacity();
        }
        // 添加
        put(size, n1, n2, n3);
        // 排序+交换
        siftUp(size);
        // 统计个数
        ++size;
    }
}

扩容

    private void increaseCapacity() {
        // 小于256扩一倍,超过256扩一半
        this.capacity += (capacity <= 256 ? capacity : capacity / 2);
        buffer.capacity(this.capacity * TUPLE_SIZE);
    }

添加

    private void put(int idx, long n1, long n2, long n3) {
    	// 当前size * 24 = 尾部
        int i = idx * TUPLE_SIZE;
        // 追加写,比较紧凑
        buffer.setLong(i, n1);
        buffer.setLong(i + 1 * SIZE_OF_LONG, n2);
        buffer.setLong(i + 2 * SIZE_OF_LONG, n3);
    }
    private void siftUp(int idx) {
        while (idx > 0) {
        	// 折半
            int parentIdx = (idx - 1) / 2;
            // 当前数据比中间大结束
            // 有没有可能是 2 3 4 5 1 6 7
            // 不可能,因为在插入1时比4小就会换位置,并发?也没有,外层调用时加了锁
            if (compare(idx, parentIdx) >= 0) {
                break;
            }

            swap(idx, parentIdx);
            idx = parentIdx;
        }
    }

交换

    private void swap(int idx1, int idx2) {
    	// 小
        int i1 = idx1 * TUPLE_SIZE;
        // 大
        int i2 = idx2 * TUPLE_SIZE;

		// 获取小的值
        long tmp1 = buffer.getLong(i1);
        long tmp2 = buffer.getLong(i1 + 1 * SIZE_OF_LONG);
        long tmp3 = buffer.getLong(i1 + 2 * SIZE_OF_LONG);
		
		// 大小交换
        buffer.setLong(i1, buffer.getLong(i2));
        buffer.setLong(i1 + 1 * SIZE_OF_LONG, buffer.getLong(i2 + 1 * SIZE_OF_LONG));
        buffer.setLong(i1 + 2 * SIZE_OF_LONG, buffer.getLong(i2 + 2 * SIZE_OF_LONG));

        buffer.setLong(i2, tmp1);
        buffer.setLong(i2 + 1 * SIZE_OF_LONG, tmp2);
        buffer.setLong(i2 + 2 * SIZE_OF_LONG, tmp3);
    }

总结:存储使用netty堆外内存Bytebuf,每条消息是3个Long类型,第一个Long是延迟时间,即24字节,插入时自动扩容+排序,顺序从小到大。即快过期的是第一条。
ps:add调用之前的文章分析消费者服务端拉取的那篇有讲到,推给消费者之前会判断:如果有设置延迟时间则往延迟队列添加,添加成功是延迟消息,添加失败延迟时间已到。下面这段代码:

public class InMemoryDelayedDeliveryTracker {
    public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
        long now = clock.millis();
        if (deliveryAt < (now + tickTimeMillis)) {
            return false;
        }

        priorityQueue.add(deliveryAt, ledgerId, entryId);
        updateTimer();
        return true;
    }
}

问题4如何从延迟队列取出?

public class InMemoryDelayedDeliveryTracker {

public Set<PositionImpl> getScheduledMessages(int maxMessages) {
		// 读取数量
        int n = maxMessages;
        Set<PositionImpl> positions = new TreeSet<>();
        long now = clock.millis();
        long cutoffTime = now + tickTimeMillis;
		// 读取数量>0 && 队列不为空
        while (n > 0 && !priorityQueue.isEmpty()) {
        	// 读前8个字节的时间
            long timestamp = priorityQueue.peekN1();
            // 如果最近的延迟都大于当前时间,说明全都没到时间 结束
            if (timestamp > cutoffTime) {
                break;
            }
			
            long ledgerId = priorityQueue.peekN2();
            long entryId = priorityQueue.peekN3();
            // 保存消息索引
            positions.add(new PositionImpl(ledgerId, entryId));
			// 移除first
            priorityQueue.pop();
            --n;
        }
		// 找到最近过期时间的消息计算还有多久到时间,设置一个倒计时,到时间后触发dispatcher读数据
		// dispatcher读的时候会读重新投递的和延迟队列的
        updateTimer();
        return positions;
    }
}

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>