案例需求
项目开发中,有些查询在相同条件下每次查询结果都一样,此时可以将结果缓存起来以便下次查询时直接返回。这样能大大提升查询效率,特别是耗时很长的查询,效果更明显。在这个需求中,我们除了要求将结果进行缓存外,还要确保查询任务只执行一次。
在这篇文章中,我们使用FutureTask和ConcurrentHashMap来实现上面的需求。
代码实现
我们定义一个UserService的类,提供一个根据用户ID查询对应用户的方法。假设这个查询耗时很长(虽然一般情况下并不需要多长时间),而且相同ID的查询结果都一样。这时就可以考虑将结果缓存起来。
具体代码如下:
- 定义一个
User实体
public class User {
private Integer id;
private String name;
public User(Integer id, String name) {
this.id = id;
this.name = name;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
-
UserService服务类
public class UserService {
public User getOne(Integer id) {
// 其他耗时很长的操作
return new User(id,"user");
}
}
-
Client场景类
public class Client {
private Map<Integer, User> cache = new HashMap<>();
private UserService service = new UserService();
public static void main(String[] args) {
Client client = new Client();
int id = 1234;
// 启动五个线程并发执行
for (int i = 0; i < 5; i++) {
new Thread(() -> System.out.println(client.getUserById(id))).start();
}
}
private synchronized User getUserById(Integer id) {
User user = cache.get(id);
if (user == null) {
user = service.getOne(id);
cache.put(id, user);
}
return user;
}
}
执行结果:
D:\javasoft\jdk1.8.0_231\bin\java.exe "...
User@6706a70b
User@6706a70b
User@6706a70b
User@6706a70b
User@6706a70b
Process finished with exit code 0
通过执行结果可以看出,上述场景类可以完成我们的需求。但是这种实现方式存在不足之处。由于HashMap不是线程安全的,所以我们在getUserById方法上使用synchronized同步锁来保证线程安全。但是这样的话会造成其他线程阻塞,大大降低了查询效率。那这时候该怎么办呢,可能有人会说既然HashMap不是线程安全的,那只要使用线程安全的ConcurrentHashMap替换HashMap不就行了吗。话不多说,我们上代码一试便知。
-
Client场景类(使用ConcurrentHashMap)
改动的地方只是使用ConcurrentHashMap替换了HashMap,并去掉了getUserById方法上的synchronized关键字
public class Client {
private Map<Integer, User> cache = new ConcurrentHashMap<>();
private UserService service = new UserService();
public static void main(String[] args) {
Client client = new Client();
int id = 1234;
// 启动五个线程并发执行
for (int i = 0; i < 5; i++) {
new Thread(() -> System.out.println(client.getUserById(id))).start();
}
}
private User getUserById(Integer id) {
User user = cache.get(id);
if (user == null) {
user = service.getOne(id);
cache.put(id, user);
}
return user;
}
}
接下来看看执行结果:
D:\javasoft\jdk1.8.0_231\bin\java.exe "...
User@693e4988
User@6223c513
User@278e4dc4
User@346827f
User@76ee89dd
Process finished with exit code 0
结果好像跟我们预期的不一样,每一次调用都返回了新的对象,缓存并没有生效。为什么会这样,其实很简单。ConcurrentHashMap虽然是线程安全的,但是由于我们去掉了getUserById方法上的synchronized关键字,并发访问时就会出现多个线程进入到if (user == null)判断,从而重复执行查询操作。那如何解决这个问题呢。这就是我们本次的主题:实现任务唯一执行。
我们可以想一下,之所以出现重复执行的问题,是因为第一个查询任务执行的时候,后面的线程并不知道这回事,所以就自己执行查询操作。如果能做到让其他线程知道已有任务在执行中,然后让他们阻塞等待任务执行完成,再去获取结果即可,这样就能实现任务唯一执行了。JUC包中已经给我们提供了一个这样类:FutureTask。它表示一个异步计算,并且只有计算完成才能通过get方法获取结果,否则会一直阻塞或者抛出TimeoutException(如果get时设置了超时时间)。下面来看看具体实现方式
-
Client场景类(ConcurrentHashMap配合FutureTask使用)
public class Client {
private Map<Integer, Future<User>> cache = new ConcurrentHashMap<>(); // 1
private UserService service = new UserService();
public static void main(String[] args) {
Client client = new Client();
int id = 1234;
// 启动五个线程并发执行
for (int i = 0; i < 5; i++) {
new Thread(() -> System.out.println(client.getUserById(id))).start();
}
}
// 使用FutureTask的特性,如果已有查询任务在执行,其他线程可以
// 获取这个任务,并等待其返回结果
private User getUserById(Integer id) {
Future<User> future = cache.get(id);
if (future == null) {
FutureTask task = new FutureTask<>(() -> service.getOne(id));
future = cache.putIfAbsent(id, task); // 2
if (future == null) {
future = task;
task.run();
}
}
try {
return future.get();
} catch (Exception e) {
cache.remove(id, future); // 3
throw new RuntimeException(e.getMessage(), e);
}
}
}
执行结果:
D:\javasoft\jdk1.8.0_231\bin\java.exe "...
User@56f3ee1d
User@56f3ee1d
User@56f3ee1d
User@56f3ee1d
User@56f3ee1d
Process finished with exit code 0
结果符合我们的需求,接下来一步步分析实现原理,关键实现主要有三个地方。
首先,相比于上一个版本。ConcurrentHashMap中的value由User变为Future<User>(FutureTask是Future接口的一个实现类):private Map<Integer, **Future<User>**> cache = new ConcurrentHashMap<>();这样做的目的是将任务缓存起来,告诉其他线程我正在执行任务,你乖乖地等着我执行完成获取结果就好,不需要重复执行相同的任务了。
其次,在getUserById方法中,当缓存中还没有任务时,会创建一个FutureTask任务,并将它放入缓存中。注意这里创建任务时可能多个线程都完成了创建任务的工作,但是放进缓存的时候只能有一个线程成功。这得益于ConcurrentHashMap的线程安全机制,以及future = cache.putIfAbsent(id, task);中putIfAbsent方法的使用。这个方法只有在当前key没有对应value时才进行写入,并且返回null。反之,则不会进行写入操作,而是直接返回key对应的值。这就保证了在key相同的情况下只会有一个线程创建的任务能放进缓存中。
最后,需要考虑缓存污染的问题。假如任务执行失败了,那么其他线程通过缓存获取的FutureTask就无法获取到正确的结果。所以我们需要在get发生异常时移除对应的缓存数据。
以上是个人对ConcurrentHashMap和FutureTask使用的一些理解,如果哪里写得有问题,还望各位猿友斧正。













网友评论