问题引入:多线程并发安全引起的思考
首先我们通过引入一段示例进入我们今天的主题。先来看下面一段生产者消费者多线程并发的代码示例
public class ProducerAndConsumerV1 {
static class DataBuffer<T> {
private Queue<T> queue = new LinkedList();
private static final Integer QUEUE_MAX_LENGTH = 10000;
private Integer length = 0;
private final Integer MAX_LENGTH = 10;
/**
* 消息生产
* @param message
*/
public void produceMessage(T message) {
if(length < QUEUE_MAX_LENGTH){
queue.add(message);
length++;
System.out.println("生产消息,队列长度:"+length);
}
}
/**
* 消费消息
*/
public void consumerMessage() {
System.out.println("消费消息,队列长度:"+length);
if (length > 0) {
queue.poll();
length -- ;
}
}
}
public static void main(String[] args) {
ProducerAndConsumerV1.DataBuffer dataBuffer = new ProducerAndConsumerV1.DataBuffer();
Runnable produceAction = () -> {
while (true) {
try {
dataBuffer.produceMessage("cs");
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
}
};
Runnable consumerAction = () -> {
while (true) {
try {
dataBuffer.consumerMessage();
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
};
// 同时并发执行的线程数
final int THREAD_TOTAL = 200;
//线程池,用于多线程模拟测试
ExecutorService threadPool =
Executors.newFixedThreadPool(THREAD_TOTAL);
//假定共 200 条线程,其中有 100 个消费者和 100 生产者
//100个生产者,每隔200ms生产消息
for (int i = 0; i < 100; I++)
{
threadPool.submit(produceAction);
}
//100个消费者,每隔100ms消费消息
for (int i = 0; i < 100; I++)
{
threadPool.submit(consumerAction);
}
}
}
在上面的代码中,我们做了一个简单的消息队列,100个生产者每隔200ms像消息队列中生产消息,100个消费者每隔100ms消费消息。如果对并发安全稍微有一点了解的同学应该能看出来,上述的实例代码是存在多线程安全问题的。多个线程间同时去操作队列和长度,是会产生线程不安全的情况的。那么如何解决呢,大部分人可能会直接上sychroized java的内置锁,或者用java的显示锁Lock去保证线程安全。那么问题来了,如果让你自己实现一把锁解决这个问题,你该如何设计呢?或者换个问法,如果不让你使用java的内置锁或者显示锁,你该如果解决这个线程安全的问题呢?
AQS原理
思考:我们先抛开解决这个问题的各种技术手段不谈,单纯想想如何解决这个问题。其实上述示例代码中的核心问题是要保证临界资源的访问安全问题,说通俗点,就是同一时刻就只能有一个生产者或者一个消费者去操作队列。我们知道,在一个进程中,多个线程是共享同一个进程的内存、cpu等资源的,也正是因为这样,所以才会有多个线程并发访问的问题。但是也正是因为这个特性,我们想到,如果想让多个线程顺序执行,我们是不是可以先定义一个中间变量 state ,初始化值为0(软件工程学中的一个重要思想,往往在解决一个比较复杂的问题时,我们只要多引入一层或者一个中间变量,就可以解决问题),让多个线程同时去修改这个中间变量state = 1,如果修改成功,就表示这个线程获得了生产消息或者消费消息的权利。而修改失败的线程则继续去修改这个变量,直到修改成功的一刻,则获取了生产消息和消费消息的权利。我们姑且把这个state 叫做锁,state =1 时就表示加锁成功,就可以操作消息队列。上面我们说的这种方法,要保证的前提时,修改state = 1 这个操作要是原子性的操作,也就是同一时刻只有一个线程可以修改成功,cpu其实支持这样的指令:cmpxchg,而在java应该层面也封装好了这样的操作,在UnSafe类中。这样的操作有一个专业名词,叫做CAS(Compare and Swap)
操作系统层面的 CAS 是一条 CPU 的原子指令(cmpxchg 指令),正是由于该指令具备了原子性,所以使用 CAS 操作数据时不会造成数据不一致问题,Unsafe 提供的 CAS 方法,直接通过native 方式(封装 C++代码)调用了底层的 CPU 指令 cmpxchg。
利用上面说到的CAS,我们可以写出以下代码:
public class MyLock {
//使用volatile关键字,保证有序性和可见性
private volatile int state;
//不安全类
private static final Unsafe unsafe = getUnsafe();;
//value 的内存偏移(相对与对象头部的偏移,不是绝对偏移)
private static final long valueOffset;
static {
try {
//取得 value 属性的内存偏移
valueOffset = unsafe.objectFieldOffset(MyLock.class.getDeclaredField("state"));
} catch (Exception ex) {
throw new Error(ex);
}
}
/**
* 加锁
* @return
*/
public boolean lock() {
int oldValue = state;
//通过 CAS 原子操作,如果操作失败,则自旋,一直到操作成功
do {
oldValue = state;
} while (!unSafeCompareAndSet(oldValue, oldValue + 1));
return true;
}
/**
* 解锁
* @return
*/
public boolean unlock() {
int oldValue = state;
//通过 CAS 原子操作,如果操作失败,则自旋,一直到操作成功
do {
oldValue = state;
} while (!unSafeCompareAndSet(oldValue, oldValue - 1));
return true;
}
public final boolean unSafeCompareAndSet(int oldValue, int
newValue) {
//原子操作:使用 unsafe 的“比较并交换方法”,进行 value 属性的交换
return unsafe.compareAndSwapInt(
this, valueOffset,oldValue ,newValue );
}
public static Unsafe getUnsafe() {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
return (Unsafe) theUnsafe.get(null);
} catch (Exception e) {
throw new AssertionError(e);
}
}
public static void main(String[] args) {
MyLock myLock = new MyLock();
System.out.println(myLock.lock());
}
}
lock方法大概解释一下:CAS 是一种无锁算法,该算法关键依赖两个值——期望值(就值)和新值,底层 CPU 利用原子操作,判断内存原值与期望值是否相等,如果相等则给内存地址赋新值,否则不做任何操作。那么以上代码的含义就是:
(1)获得字段的期望值(oldValue)。
(2)计算出需要替换的新值(newValue)。
(3)通过 CAS 将新值(newValue)放在字段的内存地址上,如果 CAS 失败则重复第 1 步到第 2 步,一直到 CAS 成功,这种重复俗称 CAS 自旋。
以上cas的方法保证了原子性,volatile 关键字保证了有序性和可见性(基于篇幅关系,本文不再讲解volatile的原理,有兴趣的小伙伴可以去了解一下)。
通过以上lock方法,可以多个线程去同时修改state字段的属性值,修改成功,则返回true,表示获取到锁,修改失败,则一直在自旋尝试去修改。获取到锁的线程,生产或者消费消息结束后,调用unLock方法,修改state状态为0,释放锁。此时在一直不断自旋的线程则获取到锁继续执行。
以上是我们基于cas自旋加volitale关键字实现的一把锁,当然这把锁还有很不足的地方,比如抢锁过程中修改state值失败的线程,一直自旋去修改,这种在竞争激烈的情况下,是非常消耗cpu性能的,我们可以考虑加一个队列,这个队列可以用来保存抢锁失败的线程,入队列成功之后,暂停抢锁失败的线程,从而让出cpu。等到持有锁的线程释放锁时,唤醒队列中头结点中的线程,被唤醒的线程加锁成功之后,重复以上流程,释放锁后唤醒队列中的下一个节点。那么基于以上想法,MyLock 中几个重要的属性就出来了
MyLock{
private volitale int state = 0;
private volitale Queue queue = new LinkedList();
}
说到这里,其实在java中,我们以上所说的逻辑就是 AQS的逻辑,只不过在AQS中,我们定义的队列,是用一个FIFO先进先出的双向链表来实现的,除此之外,它还支持很多的功能,让我们对锁的使用更为优雅,比如还支持共享锁,可中断,条件锁等等。下面我们讲的线程间的通信就是基于AQS的条件等待。因此,如果我们想要使用锁,可以直接用java中的ReentrantLock,它就是基于AQS实现的一个排它锁。
线程间的通信
文章开头给出的示例,其实除了线程安全的问题,还有一个问题就是:生产消息时即使队列已经满了,还是会一直无效的循环判断队列是否可以加入消息,同样消费者也是这样的问题,即使消息队列已经空了,还是会无效循环队列是否有消息可以消费。我们可以想到的处理办法就是,是否可以在多线程间通信,比如生产者的线程在消息队列已经满了的情况下,直接休眠,等到消费者的线程消费了消息,队列消息不满的情况下再来唤醒生产者线程生产消息。或者从消费者角度来看,队列为空的情况,线程暂停,等到队列有消息时,生产者线程来唤醒消费者线程进行消费消息。这就涉及到了线程间的通信,目前java中线程通信有两种机制,一个是通过Object.wait,Object.notify 来实现(要配合sychroized来使用),另一个就是通过Lock的条件等待。下面我们就这两种方式来阐述其使用方式和原理。
Object.wait,notify方式
底层实现原理的数据结构
{
entryList
Ower
WaitSet
}
entryList:有资格成为竞争候选的线程
Ower: 拥有monitor的线程
waitSet:处于等待状态的线程
下面以此代码示例讲解
Object full = new Object()
A线程:
synchronized (full) {
full.wait();
}
B线程:
synchronized (full) {
full.notify();
}
我们知道sychroized 锁的原理是通过对象的mointor监视器实现的。
对象的 wait 方法的核心原理,大致如下:
(1)当线程调用了 locko(某个同步锁对象)的 wait 方法后,JVM 会将当前线程加入 locko
监视器的 WaitSet(等待集),等待被其他线程唤醒。
(2)当前线程会释放 locko 对象监视器的 Owner 权利,让其他线程可以抢夺 locko 对象的监
视器。
(3)让当前线程等待,其状态变成 WAITING。
对象的 notify(或者 notifyAll)方法的核心原理,大致如下:
(1)当线程调用了 locko(某个同步锁对象)的 notify 方法后,JVM 会唤醒 locko 监视器
WaitSet 中的第一条等待线程。
(2)当线程调用了 locko 的 notifyAll 方法后,JVM 会唤醒 locko 监视器 WaitSet 中的所有等
待线程。
(3)等待线程被唤醒后,会从监视器的 WaitSet 移动到 EntryList,线程具备了排队抢夺监视
器 Owner 权利的资格,其状态从 WAITING 变成 BLOCKED。 (4)EntryList 中的线程抢夺到监视器 Owner 权利之后,线程的其状态从 BLOCKED 变成,
Runnable,具备重新执行的资格。
下面我们给出用sychroized 和 wait,notify方式的一版代码:
public class ProducerAndConsumer<T> {
static class DataBuffer<T> {
private Queue<T> queue = new LinkedList();
private Integer length = 0;
private final Integer MAX_LENGTH = 10;
private Object syncObject = new Object();
/**
* 当队列不满的时候,向队列发送消息
*/
private Object NOT_FULL = new Object();
/**
* 当队列不为空时,向队列发送消息
*/
private Object NOT_EMPTY = new Object();
/**
* 消息生产
* @param message
*/
public void produceMessage(T message) throws InterruptedException {
while (length > MAX_LENGTH) {
synchronized (NOT_FULL) {
NOT_FULL.wait();
System.out.println("-------队列已满-------");
}
}
synchronized (syncObject) {
queue.add(message);
length++;
}
synchronized (NOT_EMPTY) {
System.out.println("-------队列不为空-------");
NOT_EMPTY.notify();
}
}
/**
* 消费消息
*/
public void consumerMessage() throws InterruptedException {
while (length <= 0) {
synchronized (NOT_EMPTY) {
System.out.println("消费者-------队列已空-------");
NOT_EMPTY.wait();
}
}
synchronized (syncObject) {
queue.poll();
length -- ;
}
synchronized (NOT_FULL) {
NOT_FULL.notify();
}
}
}
public static void main(String[] args) throws Exception{
DataBuffer dataBuffer = new DataBuffer();
Callable produceAction = () -> {
try {
dataBuffer.produceMessage(1001L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
};
Callable consumerAction = () -> {
try {
dataBuffer.consumerMessage();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
};
// 同时并发执行的线程数
final int THREAD_TOTAL = 20;
//线程池,用于多线程模拟测试
ExecutorService threadPool =
Executors.newFixedThreadPool(THREAD_TOTAL);
//假定共 11 条线程,其中有 10 个消费者,但是只有 1 个生产者;
final int CONSUMER_TOTAL = 3;
final int PRODUCE_TOTAL = 1;
for (int i = 0; i < PRODUCE_TOTAL; i++)
{
//生产者线程每生产一个商品,间隔 50ms
threadPool.submit(new Producer(produceAction, 30000));
}
for (int i = 0; i < CONSUMER_TOTAL; i++)
{
//消费者线程每消费一个商品,间隔 100ms
threadPool.submit(new Consumer(consumerAction, 15000));
}
}
}
Condition的 await,signal方式
Condition 与 Object 的 wait()/notify()作用是相似的:都是使得一个线程等待某个条件
(Condition),只有当该条件具备(signal 或者 signalAll 方法被调用)时等待线程才会被唤醒,
从而重新争夺锁。不同的是:Object 的 wait()/notify()由 JVM 底层的实现,而 Condition 接口与实
现类完全使用Java代码实现。当需要进行线程间的通信时,建议结合使用 ReetrantLock与Condition,
通过 Condition 的 await()和 signal()方法进行线程间的阻塞与唤醒。
ConditionObject 类是实现条件队列的关键,每个 ConditionObject 对象都维护一个单独的条件
等待对列。每个 ConditionObject 对应一个条件队列,它记录该队列的头节点和尾节点。
public class ConditionObject implements Condition, java.io.Serializable {
//记录该队列的头节点
private transient Node firstWaiter;
//记录该队列的尾节点
private transient Node lastWaiter;
}
await()等待方法原理:
image.png
signal()唤醒方法原理
image.png
最后给出我们用java显示锁和条件等待的一版代码:
public class ProducerAndConsumerV3 {
static class DataBuffer<T> {
private Queue<T> queue = new LinkedList();
private static final Integer QUEUE_MAX_LENGTH = 1;
private Integer length = 0;
private final Integer MAX_LENGTH = 10;
private Lock syncObject = new ReentrantLock(true);
private Condition NOT_FULL = syncObject.newCondition();
private Condition NOT_EMPTY = syncObject.newCondition();
/**
* 消息生产
* @param message
*/
public void produceMessage(T message) throws InterruptedException {
syncObject.lock();
if (length < QUEUE_MAX_LENGTH) {
queue.add(message);
length++;
System.out.println("生产消息,队列长度:" + length);
NOT_EMPTY.signal();
} else {
NOT_FULL.await();
}
syncObject.unlock();
}
/**
* 消费消息
*/
public void consumerMessage() throws InterruptedException {
syncObject.lock();
System.out.println("消费消息,队列长度:"+length);
if (length > 0) {
queue.poll();
length--;
NOT_FULL.signal();
} else {
NOT_EMPTY.await();
}
syncObject.unlock();
}
}
public static void main(String[] args) {
ProducerAndConsumerV3.DataBuffer dataBuffer = new ProducerAndConsumerV3.DataBuffer();
for (int i=0;i<3;i++) {
Runnable produceAction = () -> {
try {
dataBuffer.produceMessage("cs");
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
};
new Thread(produceAction).start();
}
for (int j=0;j<3;j++) {
Runnable consumerAction = () -> {
try {
dataBuffer.consumerMessage();
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
};
new Thread(consumerAction).start();
}
}
}
我们可以再扩展一下,我们这里考虑的是单机情况下,可以通过线程间的通信去完成,如果是放在分布式的环境下,比如最常见的消息队列中间件的实现,消费者是怎么监听的broker中有消息的呢,和单机情况下一样,不可能是所有消息者循环自旋请求broker是否有消息产生,实现方式也是消费者注册了消息监听的接口,当broker有消息时,然后从众多消费者中,利用调度算法选出一台消费节点,然后回调推送消息给消费者。不同的只是单机情况下,是多线程间的通信,而在分布式的环境下,是多个进程间通过http请求互相通信罢了。










网友评论