美文网首页
内核无锁队列 -- kfifo

内核无锁队列 -- kfifo

作者: faithzhou | 来源:发表于2019-02-22 18:44 被阅读0次

理论证明,在一个生产者和一个消费者的情况下,两者之间的同步无需加锁,即可并发访问。Linux内核无锁队列kfifo完美践行了该理论,性能得到极大提升。工作中如果能使用到该定论,可以有效的提升程序的性能。

内核代码处处有惊喜,往往让人叹为观止,犹如桃花源,让你发现别有洞天的美景。kfifo的实现中使用如下几个trick:

  1. 一个整数如果是2的整数幂,那么对齐取余操作%,可以转化为与其&操作,提升效率
  2. 使用整数自动溢出构成的一个环来替代循环数组,简化判空、判满及各种长度运算
  3. 先操作buffer,再计数。实现无锁队列

1. kfifo结构体信息

kfifo是内核中一个First In First Out数据结构,循坏队列实现。

struct __kfifo {
    unsigned int    in;
    unsigned int    out;
    unsigned int    mask;
    unsigned int    esize;
    void        *data;
};

data:用于存放数据的缓存区

esize:缓存区每个元素的size(element size)

mask:缓冲区元素个数(size) - 1;使用&mask,替换%size,提升效率

in:队尾下标,入队列的offset为(in % size 或者 in & mask)

out:队首下标,出队列的offset为(out % size 或者 out & mask)

队尾下标in在有数据入队的时候,一直自增;队首下标out在有数据出队的时候,一直自增。内核使用了unsigned int溢出的特性,来实现循环队列,即in - out不管任何情况都为队列的长度,即使in < out,这里可以画图或者自己编程理解一下。

2. kfifo内存分配

int __kfifo_alloc(struct __kfifo *fifo, unsigned int size,
        size_t esize, gfp_t gfp_mask)
{
    /*
     * round down to the next power of 2, since our 'let the indices
     * wrap' technique works only in this case.
     */
    size = roundup_pow_of_two(size);

    fifo->in = 0;
    fifo->out = 0;
    fifo->esize = esize;

    if (size < 2) {
        fifo->data = NULL;
        fifo->mask = 0;
        return -EINVAL;
    }

    fifo->data = kmalloc(size * esize, gfp_mask);

    if (!fifo->data) {
        fifo->mask = 0;
        return -ENOMEM;
    }
    fifo->mask = size - 1;

    return 0;
}

内存分配时,需要将size向上取2的幂,同时置fifo->mask=size - 1。

3. kfifo初始化

int __kfifo_init(struct __kfifo *fifo, void *buffer,
        unsigned int size, size_t esize)
{
    size /= esize;

    size = roundup_pow_of_two(size);

    fifo->in = 0;
    fifo->out = 0;
    fifo->esize = esize;
    fifo->data = buffer;

    if (size < 2) {
        fifo->mask = 0;
        return -EINVAL;
    }
    fifo->mask = size - 1;

    return 0;
}

4. 入队列__kfifo_in

unsigned int __kfifo_in(struct __kfifo *fifo,
        const void *buf, unsigned int len)
{
    unsigned int l;

    l = kfifo_unused(fifo);
    if (len > l)
        len = l;

    kfifo_copy_in(fifo, buf, len, fifo->in);
    fifo->in += len;
    return len;
}

可以看到,先将元素入队列:kfifo_copy_in,然后再计数fifo->in += len。

其中kfifo_unused判断队列中还有多少空间,代码如下:

static inline unsigned int kfifo_unused(struct __kfifo *fifo)
{
    return (fifo->mask + 1) - (fifo->in - fifo->out);
}

kfifo_copy_in为入队列的实体,实现如下:

static void kfifo_copy_in(struct __kfifo *fifo, const void *src,
        unsigned int len, unsigned int off)
{
    unsigned int size = fifo->mask + 1;
    unsigned int esize = fifo->esize;
    unsigned int l;

    off &= fifo->mask;
    if (esize != 1) {
        off *= esize;
        size *= esize;
        len *= esize;
    }
    l = min(len, size - off);

    memcpy(fifo->data + off, src, l);
    memcpy(fifo->data, src + l, len - l);
    /*
     * make sure that the data in the fifo is up to date before
     * incrementing the fifo->in index counter
     */
    smp_wmb();
}

5. 出队列__kfifo_out

unsigned int __kfifo_out(struct __kfifo *fifo,
        void *buf, unsigned int len)
{
    len = __kfifo_out_peek(fifo, buf, len);
    fifo->out += len;
    return len;
}

同样可以看出,先出队,再计数。如果先计数,在还没有出队前,被其他入队操作覆盖。

__kfifo_out_peek如下:

unsigned int __kfifo_out_peek(struct __kfifo *fifo,
        void *buf, unsigned int len)
{
    unsigned int l;

    l = fifo->in - fifo->out;
    if (len > l)
        len = l;

    kfifo_copy_out(fifo, buf, len, fifo->out);
    return len;
}

每次去元素时,需要判断长度是否越界,防止破坏空间。

kfifo_copy_out是出队的实体,试下如下:

static void kfifo_copy_out(struct __kfifo *fifo, void *dst,
        unsigned int len, unsigned int off)
{
    unsigned int size = fifo->mask + 1;
    unsigned int esize = fifo->esize;
    unsigned int l;

    off &= fifo->mask;
    if (esize != 1) {
        off *= esize;
        size *= esize;
        len *= esize;
    }
    l = min(len, size - off);

    memcpy(dst, fifo->data + off, l);
    memcpy(dst + l, fifo->data, len - l);
    /*
     * make sure that the data is copied before
     * incrementing the fifo->out index counter
     */
    smp_wmb();
}

6. 总结

上面分析基于linux 3.10.107内核。工程中,如果遇到1:N,或者N:1的情况,都可以转换为N个1:1的情况,将无锁队列的核心原理(先操作再计数)运用其中,来优化程序,提升逼格。

相关文章

  • 内核无锁队列 -- kfifo

    理论证明,在一个生产者和一个消费者的情况下,两者之间的同步无需加锁,即可并发访问。Linux内核无锁队列kfifo...

  • 基于Linux的kfifo移植到STM32(支持os的互斥访问)

    基于Linux的kfifo移植到STM32(支持os的互斥访问) 关于kfifo kfifo是内核里面的一个Fir...

  • 基于Linux的kfifo移植到STM32(支持os的互斥访问)

    基于Linux的kfifo移植到STM32(支持os的互斥访问) 关于kfifo kfifo是内核里面的一个Fir...

  • 无锁数组队列

    无锁数组队列

  • 无锁队列

    简介 无锁队列是lock-free中最基本的数据结构,一般应用场景是资源分配,比如TimerId的分配,Worke...

  • 无锁队列

    rte_ring 关键点 无锁: rte_atomic32_cmpset 直到成功(CAS)环: 总长度c...

  • 无锁环形队列

    并发编程中,经常会遇到资源竞争问题,而保持竞争资源的正确使用,可以通过锁的方式,但synchronized blo...

  • 竞态与同步(1)

    内核里处理的竞态主要通过以下方法处理: 信号量(互斥量)、自旋锁、读写信号量、读写自旋锁、等待队列、完成量。 信号...

  • JUC并发集合总结

    ConcurrentLinkedQueue 线程安全的支持高并发的队列,使用链表实现。非阻塞,无锁,无界。该队列也...

  • 多线程之非阻塞队列

    ConcurrentLinkedQueue 相对于阻塞队列加锁实现阻塞,非阻塞队列采用无锁CAS的方式来实现。

网友评论

      本文标题:内核无锁队列 -- kfifo

      本文链接:https://www.haomeiwen.com/subject/ytqyyqtx.html