美文网首页程序员
Exchanger类:并发任务间的数据交换

Exchanger类:并发任务间的数据交换

作者: higher2017 | 来源:发表于2017-02-28 23:11 被阅读79次

本文参考至《Java7并发编程实战手册》

Exchanger类允许在两个线程之间定义同步点,当两个线程都到达同步点时,它们交换数据。也就是第一个线程的数据进入到第二个线程中,第二线程的数据进入到第一个线程中。
这里有一个生产者和消费者的例子:生产者每次都会生产10个数据,这10个数据放在一个箩筐(List)里面,Exchanger会在:生产者生产完10个数据并且消费者消费完10个数据的时候,将两者的箩筐换过来。这样生产者的箩筐里面就没有数据(确保生产者生产的东西有地方装);消费者的箩筐里面就有10个数据(确保消费者有数据消费)。

生产者:
package Exchanger;

import java.util.List;
import java.util.concurrent.Exchanger;

/**
 * Producer:生产者
 * 
 * @author JM
 * @date 2017-2-28 下午10:33:08
 * @since JDK 1.7
 */
public class Producer implements Runnable {

    private List<String> buffer;

    private final Exchanger<List<String>> exchanger;

    public Producer(List<String> buffer, Exchanger<List<String>> exchanger) {
        super();
        this.buffer = buffer;
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            for (int j = 0; j < 10; j++) {
                String message = "Event " + ((i * 10) + j);
                System.out.printf("Producer:create---------- %s\n", message);
                buffer.add(message);
            }
            System.out.println("Before exchange  Producer:have11111111111@@@@@@@@@@ " + buffer.size());
            try {
                // 如果这个时候消费者也刚好在这里等待,那么生产者和消费者的数据产生对换(否则将等待消费者到达这里再进行数据对换)
                buffer = exchanger.exchange(buffer);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("After exchange   Producer:have222222222222@@@@@@@@@@ " + buffer.size());
        }
    }

}
消费者:
package Exchanger;

import java.util.List;
import java.util.concurrent.Exchanger;

public class Consumer implements Runnable {
    private List<String> buffer;
    private final Exchanger<List<String>> exchanger;

    public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) {
        super();
        this.buffer = buffer;
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("Before exchange Consumer:have111111111******** " + buffer.size());
            try {
                // 如果这个时候生产者也刚好在这里等待,那么生产者和消费者的数据产生对换(否则将等待生产者到达这里再进行数据对换)
                buffer = exchanger.exchange(buffer);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("After exchange Consumer:have2222222******** " + buffer.size());
            for (int j = 0; j < 10; j++) {
                String message = buffer.get(0);
                System.out.printf("Consumer:use----------- %s\n", message);
                buffer.remove(0);
            }
        }
    }

}
测试类:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;

public class Main {
    public static void main(String[] args) {
        List<String> consumerBuffer = new ArrayList<String>();
        List<String> producerBuffer = new ArrayList<String>();
        Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
        Producer producer = new Producer(producerBuffer, exchanger);
        Consumer consumer = new Consumer(consumerBuffer, exchanger);
        Thread producerThread = new Thread(producer);
        Thread consumerThread = new Thread(consumer);
        producerThread.start();
        consumerThread.start();
    }
}

相关文章

网友评论

    本文标题:Exchanger类:并发任务间的数据交换

    本文链接:https://www.haomeiwen.com/subject/yfifgttx.html