美文网首页
AQS原理、多线程通信

AQS原理、多线程通信

作者: ershuai8614 | 来源:发表于2022-08-13 20:59 被阅读0次

问题引入:多线程并发安全引起的思考

首先我们通过引入一段示例进入我们今天的主题。先来看下面一段生产者消费者多线程并发的代码示例

public class ProducerAndConsumerV1 {
    static class DataBuffer<T> {
        private Queue<T> queue = new LinkedList();

        private static final Integer QUEUE_MAX_LENGTH = 10000;

        private Integer length = 0;

        private final Integer MAX_LENGTH = 10;

        /**
         * 消息生产
         * @param message
         */
        public void produceMessage(T message)  {
                if(length < QUEUE_MAX_LENGTH){
                    queue.add(message);
                    length++;
                    System.out.println("生产消息,队列长度:"+length);
                }
        }
        /**
         * 消费消息
         */
        public void consumerMessage() {
                System.out.println("消费消息,队列长度:"+length);
                if (length > 0) {
                    queue.poll();
                    length -- ;
                }
        }
    }

    public static void main(String[] args) {
        ProducerAndConsumerV1.DataBuffer dataBuffer = new ProducerAndConsumerV1.DataBuffer();

        Runnable produceAction = () -> {
            while (true) {
                try {
                    dataBuffer.produceMessage("cs");
                    Thread.sleep(200);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        Runnable consumerAction = () -> {
            while (true) {
                try {
                    dataBuffer.consumerMessage();
                    Thread.sleep(100);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };


        // 同时并发执行的线程数
        final int THREAD_TOTAL = 200;
        //线程池,用于多线程模拟测试
        ExecutorService threadPool =
                Executors.newFixedThreadPool(THREAD_TOTAL);
        //假定共 200 条线程,其中有 100 个消费者和 100 生产者

        //100个生产者,每隔200ms生产消息
        for (int i = 0; i < 100; I++)
        {
            threadPool.submit(produceAction);
        }
        //100个消费者,每隔100ms消费消息
        for (int i = 0; i < 100; I++)
        {
            threadPool.submit(consumerAction);
        }
    }
}

在上面的代码中,我们做了一个简单的消息队列,100个生产者每隔200ms像消息队列中生产消息,100个消费者每隔100ms消费消息。如果对并发安全稍微有一点了解的同学应该能看出来,上述的实例代码是存在多线程安全问题的。多个线程间同时去操作队列和长度,是会产生线程不安全的情况的。那么如何解决呢,大部分人可能会直接上sychroized java的内置锁,或者用java的显示锁Lock去保证线程安全。那么问题来了,如果让你自己实现一把锁解决这个问题,你该如何设计呢?或者换个问法,如果不让你使用java的内置锁或者显示锁,你该如果解决这个线程安全的问题呢?

AQS原理

思考:我们先抛开解决这个问题的各种技术手段不谈,单纯想想如何解决这个问题。其实上述示例代码中的核心问题是要保证临界资源的访问安全问题,说通俗点,就是同一时刻就只能有一个生产者或者一个消费者去操作队列。我们知道,在一个进程中,多个线程是共享同一个进程的内存、cpu等资源的,也正是因为这样,所以才会有多个线程并发访问的问题。但是也正是因为这个特性,我们想到,如果想让多个线程顺序执行,我们是不是可以先定义一个中间变量 state ,初始化值为0(软件工程学中的一个重要思想,往往在解决一个比较复杂的问题时,我们只要多引入一层或者一个中间变量,就可以解决问题),让多个线程同时去修改这个中间变量state = 1,如果修改成功,就表示这个线程获得了生产消息或者消费消息的权利。而修改失败的线程则继续去修改这个变量,直到修改成功的一刻,则获取了生产消息和消费消息的权利。我们姑且把这个state 叫做锁,state =1 时就表示加锁成功,就可以操作消息队列。上面我们说的这种方法,要保证的前提时,修改state = 1 这个操作要是原子性的操作,也就是同一时刻只有一个线程可以修改成功,cpu其实支持这样的指令:cmpxchg,而在java应该层面也封装好了这样的操作,在UnSafe类中。这样的操作有一个专业名词,叫做CAS(Compare and Swap)

操作系统层面的 CAS 是一条 CPU 的原子指令(cmpxchg 指令),正是由于该指令具备了原子性,所以使用 CAS 操作数据时不会造成数据不一致问题,Unsafe 提供的 CAS 方法,直接通过native 方式(封装 C++代码)调用了底层的 CPU 指令 cmpxchg。

利用上面说到的CAS,我们可以写出以下代码:

public class MyLock {
    //使用volatile关键字,保证有序性和可见性
    private volatile int state;

    //不安全类
    private static final Unsafe unsafe = getUnsafe();;
    //value 的内存偏移(相对与对象头部的偏移,不是绝对偏移)
    private static final long valueOffset;

    static {
        try {
            //取得 value 属性的内存偏移
            valueOffset = unsafe.objectFieldOffset(MyLock.class.getDeclaredField("state"));
        } catch (Exception ex) {
            throw new Error(ex);
        }
    }

    /**
     * 加锁
     * @return
     */
    public boolean lock() {
        int oldValue = state;
        //通过 CAS 原子操作,如果操作失败,则自旋,一直到操作成功
        do {
            oldValue = state;
        } while (!unSafeCompareAndSet(oldValue, oldValue + 1));
        return true;
    }

    /**
     * 解锁
     * @return
     */
    public boolean unlock() {
        int oldValue = state;
        //通过 CAS 原子操作,如果操作失败,则自旋,一直到操作成功
        do {
            oldValue = state;
        } while (!unSafeCompareAndSet(oldValue, oldValue - 1));
        return true;
    }

    public final boolean unSafeCompareAndSet(int oldValue, int
            newValue) {
        //原子操作:使用 unsafe 的“比较并交换方法”,进行 value 属性的交换
        return unsafe.compareAndSwapInt(
                this, valueOffset,oldValue ,newValue );
    }


    public static Unsafe getUnsafe() {
        try {
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);
            return (Unsafe) theUnsafe.get(null);
        } catch (Exception e) {
            throw new AssertionError(e);
        }
    }


    public static void main(String[] args) {
        MyLock myLock = new MyLock();
        System.out.println(myLock.lock());
    }
}

lock方法大概解释一下:CAS 是一种无锁算法,该算法关键依赖两个值——期望值(就值)和新值,底层 CPU 利用原子操作,判断内存原值与期望值是否相等,如果相等则给内存地址赋新值,否则不做任何操作。那么以上代码的含义就是:
(1)获得字段的期望值(oldValue)。
(2)计算出需要替换的新值(newValue)。
(3)通过 CAS 将新值(newValue)放在字段的内存地址上,如果 CAS 失败则重复第 1 步到第 2 步,一直到 CAS 成功,这种重复俗称 CAS 自旋。
以上cas的方法保证了原子性,volatile 关键字保证了有序性和可见性(基于篇幅关系,本文不再讲解volatile的原理,有兴趣的小伙伴可以去了解一下)。

通过以上lock方法,可以多个线程去同时修改state字段的属性值,修改成功,则返回true,表示获取到锁,修改失败,则一直在自旋尝试去修改。获取到锁的线程,生产或者消费消息结束后,调用unLock方法,修改state状态为0,释放锁。此时在一直不断自旋的线程则获取到锁继续执行。

以上是我们基于cas自旋加volitale关键字实现的一把锁,当然这把锁还有很不足的地方,比如抢锁过程中修改state值失败的线程,一直自旋去修改,这种在竞争激烈的情况下,是非常消耗cpu性能的,我们可以考虑加一个队列,这个队列可以用来保存抢锁失败的线程,入队列成功之后,暂停抢锁失败的线程,从而让出cpu。等到持有锁的线程释放锁时,唤醒队列中头结点中的线程,被唤醒的线程加锁成功之后,重复以上流程,释放锁后唤醒队列中的下一个节点。那么基于以上想法,MyLock 中几个重要的属性就出来了
MyLock{
private volitale int state = 0;
private volitale Queue queue = new LinkedList();
}
说到这里,其实在java中,我们以上所说的逻辑就是 AQS的逻辑,只不过在AQS中,我们定义的队列,是用一个FIFO先进先出的双向链表来实现的,除此之外,它还支持很多的功能,让我们对锁的使用更为优雅,比如还支持共享锁,可中断,条件锁等等。下面我们讲的线程间的通信就是基于AQS的条件等待。因此,如果我们想要使用锁,可以直接用java中的ReentrantLock,它就是基于AQS实现的一个排它锁。

线程间的通信

文章开头给出的示例,其实除了线程安全的问题,还有一个问题就是:生产消息时即使队列已经满了,还是会一直无效的循环判断队列是否可以加入消息,同样消费者也是这样的问题,即使消息队列已经空了,还是会无效循环队列是否有消息可以消费。我们可以想到的处理办法就是,是否可以在多线程间通信,比如生产者的线程在消息队列已经满了的情况下,直接休眠,等到消费者的线程消费了消息,队列消息不满的情况下再来唤醒生产者线程生产消息。或者从消费者角度来看,队列为空的情况,线程暂停,等到队列有消息时,生产者线程来唤醒消费者线程进行消费消息。这就涉及到了线程间的通信,目前java中线程通信有两种机制,一个是通过Object.wait,Object.notify 来实现(要配合sychroized来使用),另一个就是通过Lock的条件等待。下面我们就这两种方式来阐述其使用方式和原理。

Object.wait,notify方式

底层实现原理的数据结构

{
    entryList
    Ower
    WaitSet
}

entryList:有资格成为竞争候选的线程
Ower: 拥有monitor的线程
waitSet:处于等待状态的线程

下面以此代码示例讲解

Object full = new Object()
A线程:
      synchronized (full) {
         full.wait();           
      }
B线程:
      synchronized (full) {
         full.notify();           
      }

我们知道sychroized 锁的原理是通过对象的mointor监视器实现的。

对象的 wait 方法的核心原理,大致如下:
(1)当线程调用了 locko(某个同步锁对象)的 wait 方法后,JVM 会将当前线程加入 locko
监视器的 WaitSet(等待集),等待被其他线程唤醒。
(2)当前线程会释放 locko 对象监视器的 Owner 权利,让其他线程可以抢夺 locko 对象的监
视器。
(3)让当前线程等待,其状态变成 WAITING。

对象的 notify(或者 notifyAll)方法的核心原理,大致如下:
(1)当线程调用了 locko(某个同步锁对象)的 notify 方法后,JVM 会唤醒 locko 监视器
WaitSet 中的第一条等待线程。
(2)当线程调用了 locko 的 notifyAll 方法后,JVM 会唤醒 locko 监视器 WaitSet 中的所有等
待线程。
(3)等待线程被唤醒后,会从监视器的 WaitSet 移动到 EntryList,线程具备了排队抢夺监视
器 Owner 权利的资格,其状态从 WAITING 变成 BLOCKED。 (4)EntryList 中的线程抢夺到监视器 Owner 权利之后,线程的其状态从 BLOCKED 变成,
Runnable,具备重新执行的资格。
下面我们给出用sychroized 和 wait,notify方式的一版代码:

public class ProducerAndConsumer<T> {
    static class DataBuffer<T> {
        private Queue<T> queue = new LinkedList();

        private Integer length = 0;

        private final Integer MAX_LENGTH = 10;

        private Object syncObject = new Object();

        /**
         * 当队列不满的时候,向队列发送消息
         */
        private Object NOT_FULL = new Object();

        /**
         * 当队列不为空时,向队列发送消息
         */
        private Object NOT_EMPTY = new Object();


        /**
         * 消息生产
         * @param message
         */
        public void produceMessage(T message) throws InterruptedException {
            while (length > MAX_LENGTH) {
                synchronized (NOT_FULL) {
                    NOT_FULL.wait();
                    System.out.println("-------队列已满-------");
                }
            }
            synchronized (syncObject) {
                queue.add(message);
                length++;
            }
            synchronized (NOT_EMPTY) {
                System.out.println("-------队列不为空-------");
                NOT_EMPTY.notify();
            }
        }

        /**
         * 消费消息
         */
        public void consumerMessage() throws InterruptedException {
            while (length <= 0) {
                synchronized (NOT_EMPTY) {
                    System.out.println("消费者-------队列已空-------");
                    NOT_EMPTY.wait();
                }
            }
            synchronized (syncObject) {
                queue.poll();
                length -- ;
            }
            synchronized (NOT_FULL) {
                NOT_FULL.notify();
            }
        }
    }
    public static void main(String[] args) throws Exception{
        DataBuffer dataBuffer = new DataBuffer();

        Callable produceAction = () -> {
            try {
                dataBuffer.produceMessage(1001L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return null;
        };

        Callable consumerAction = () -> {
            try {
                dataBuffer.consumerMessage();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        };


        // 同时并发执行的线程数
        final int THREAD_TOTAL = 20;
        //线程池,用于多线程模拟测试
        ExecutorService threadPool =
                Executors.newFixedThreadPool(THREAD_TOTAL);
        //假定共 11 条线程,其中有 10 个消费者,但是只有 1 个生产者;
        final int CONSUMER_TOTAL = 3;
        final int PRODUCE_TOTAL = 1;

        for (int i = 0; i < PRODUCE_TOTAL; i++)
        {
            //生产者线程每生产一个商品,间隔 50ms
            threadPool.submit(new Producer(produceAction, 30000));
        }
        for (int i = 0; i < CONSUMER_TOTAL; i++)
        {
            //消费者线程每消费一个商品,间隔 100ms
            threadPool.submit(new Consumer(consumerAction, 15000));
        }
    }
}

Condition的 await,signal方式

Condition 与 Object 的 wait()/notify()作用是相似的:都是使得一个线程等待某个条件
(Condition),只有当该条件具备(signal 或者 signalAll 方法被调用)时等待线程才会被唤醒,
从而重新争夺锁。不同的是:Object 的 wait()/notify()由 JVM 底层的实现,而 Condition 接口与实
现类完全使用Java代码实现。当需要进行线程间的通信时,建议结合使用 ReetrantLock与Condition,
通过 Condition 的 await()和 signal()方法进行线程间的阻塞与唤醒。

ConditionObject 类是实现条件队列的关键,每个 ConditionObject 对象都维护一个单独的条件
等待对列。每个 ConditionObject 对应一个条件队列,它记录该队列的头节点和尾节点。

public class ConditionObject implements Condition, java.io.Serializable {
     //记录该队列的头节点
    private transient Node firstWaiter;
   //记录该队列的尾节点
   private transient Node lastWaiter;
}

await()等待方法原理:


image.png

signal()唤醒方法原理

image.png

最后给出我们用java显示锁和条件等待的一版代码:

public class ProducerAndConsumerV3 {
    static class DataBuffer<T> {
        private Queue<T> queue = new LinkedList();

        private static final Integer QUEUE_MAX_LENGTH = 1;

        private Integer length = 0;

        private final Integer MAX_LENGTH = 10;

        private Lock syncObject = new ReentrantLock(true);

        private Condition NOT_FULL = syncObject.newCondition();

        private Condition NOT_EMPTY = syncObject.newCondition();


        /**
         * 消息生产
         * @param message
         */
        public void produceMessage(T message) throws InterruptedException {
            syncObject.lock();
            if (length < QUEUE_MAX_LENGTH) {
                queue.add(message);
                length++;
                System.out.println("生产消息,队列长度:" + length);

                NOT_EMPTY.signal();

            } else {
                NOT_FULL.await();
            }
            syncObject.unlock();
        }
        /**
         * 消费消息
         */
        public void consumerMessage() throws InterruptedException {
            syncObject.lock();
            System.out.println("消费消息,队列长度:"+length);
            if (length > 0) {
                queue.poll();
                length--;

                NOT_FULL.signal();
            } else {
                NOT_EMPTY.await();
            }

            syncObject.unlock();
        }
    }

    public static void main(String[] args) {
        ProducerAndConsumerV3.DataBuffer dataBuffer = new ProducerAndConsumerV3.DataBuffer();

        for (int i=0;i<3;i++) {
            Runnable produceAction = () -> {
                try {
                    dataBuffer.produceMessage("cs");
                    Thread.sleep(200);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            };
            new Thread(produceAction).start();
        }


        for (int j=0;j<3;j++) {
            Runnable consumerAction = () -> {
                try {
                    dataBuffer.consumerMessage();
                    Thread.sleep(100);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            };
            new Thread(consumerAction).start();
        }
    }
}

我们可以再扩展一下,我们这里考虑的是单机情况下,可以通过线程间的通信去完成,如果是放在分布式的环境下,比如最常见的消息队列中间件的实现,消费者是怎么监听的broker中有消息的呢,和单机情况下一样,不可能是所有消息者循环自旋请求broker是否有消息产生,实现方式也是消费者注册了消息监听的接口,当broker有消息时,然后从众多消费者中,利用调度算法选出一台消费节点,然后回调推送消息给消费者。不同的只是单机情况下,是多线程间的通信,而在分布式的环境下,是多个进程间通过http请求互相通信罢了。

相关文章

网友评论

      本文标题:AQS原理、多线程通信

      本文链接:https://www.haomeiwen.com/subject/bklqgrtx.html