小强哥博客

小强哥,小强哥博客,技术大咖

Java构建高效可伸缩的结果缓存

image

几乎所有的服务器应用程序都会使用某种形式的缓存。中用之前的计算结果能降低延迟,提高吞吐量,但却会消耗更多的内存。 这博客记录一下如果开发一个高效可伸缩的缓存,将从简单的hashmap开始,然后一步步分析他的性能缺陷,并记录如何修复这些缺陷。

目标需求

在Computable<A,V>接口中声明一个函数,Computable,其输入类型为A,输出类型为V。在ExpensiveFunction中实现的Computable需要很长的时间结算结果,我们将创建一个Computable的包装器,帮助记住之前的计算结果,并将缓存过程封装起来。

实现方式1

import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;

public class CacheTest1 {

}

class ExpensiveFunction implements Computable<String, BigInteger>{
	@Override
	public BigInteger comput(String args) throws InterruptedException {
		// 这里是一个计算很长的过程
		return new BigInteger(args);
	}
}

class Memoizer1<A, V> implements Computable<A, V>{

	private final Map<A,V> cache=new HashMap<A,V>();
	private final Computable<A,V> c;
	
	public Memoizer1(Computable<A,V> _c){
		this.c=_c;
	}
	
	@Override
	public synchronized V comput(A arg) throws InterruptedException {
		V result=cache.get(arg);
		if(result==null){
			result=c.comput(arg);
		}
		return result;
	}
	
}

在上面程序,Memoizer1给出了第一种尝试,使用HashMap保存之前的计算结果。compute方法检查需要的结果是否在缓存中,如果存在则把结果返回,否则重新计算然后把结果放到HashMap中。

上面的代码存在2个问题,1、因为HashMap不是线程安全的,因此需要synchronized进行方法同步,但是这样做的话同时只能有一个线程执行compute,如果某个线程执行计算的时间很长,那么将会出超时阻塞。2、会造成重复计算,由于计算时间过长,其他的线程并不知道这个计算正在进行,因此会继续重复计算。

实现方式2

import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class CacheTest2 {

}

class ExpensiveFunction implements Computable<String, BigInteger>{
	@Override
	public BigInteger comput(String args) throws InterruptedException {
		// do something
		return new BigInteger(args);
	}
}

class Memoizer1<A, V> implements Computable<A, V>{

	private final Map<A,V> cache=new ConcurrentHashMap<A,V>();
	private final Computable<A,V> c;
	
	public Memoizer1(Computable<A,V> _c){
		this.c=_c;
	}
	
	@Override
	public V comput(A arg) throws InterruptedException {
		V result=cache.get(arg);
		if(result==null){
			result=c.comput(arg);
		}
		return result;
	}
	
}

在上面代码中,将hashmap替换成了concurrnethashmap,由于concurrenthashmap类本身是线程安全的因此在访问底层map就不需要增加synchronized,因此就避免了compute方法的串行性,提高了并发行为。

虽然上面代码解决了并发问题,但是依然存在一个问题就是会存在重复计算问题,这种方案依然不够完美,我们继续完善,继续向下看。

实现方式3

import java.math.BigInteger;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class CacheTest3 {

}

class ExpensiveFunction implements Computable<String, BigInteger> {
	@Override
	public BigInteger comput(String args) throws InterruptedException {
		// do something
		return new BigInteger(args);
	}
}

class Memoizer1<A, V> implements Computable<A, V> {

	private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
	private final Computable<A, V> c;

	public Memoizer1(Computable<A, V> _c) {
		this.c = _c;
	}

	@Override
	public V comput(A arg) throws InterruptedException {
		Future<V> f = cache.get(arg);
		if (f == null) {
			Callable<V> eval = new Callable<V>() {
				@Override
				public V call() throws Exception {
					return c.comput(arg);
				}
			};
			FutureTask<V> ft = new FutureTask<V>(eval);
			f = ft;
			cache.put(arg, ft);
			ft.run();
		}
		V v=null;
		try {
			v=f.get();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return v;
	}

}

上面代码使用了futuretask,futuretask表示一个计算的过程,这个过程可能已经计算完成,也可能正在进行。如果结果可用,那么futuretask.get将立刻返回结果,否则它会一直阻塞,知道有结果在返回。

在comput函数中,会先从cache中获得当前计算内容是否在计算,如果没有在计算,那么创建一个FutureTask放到map中,然后启动这个计算;如果有在计算则直接调用get等待返回结果。

上面的方案其实已经很完美了,它表现出了非常好的并发特性,但是还是存在一点小小的缺陷,在高并发状态下,仍然会存在重复计算问题。问题的根本原因就是在if的代码块中是非原子的“先检查在执行”操作,因此两个线程仍有可能在同一时间调用compute计算相同的值。

最终实现

import java.math.BigInteger;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class CacheTest3 {

}

class ExpensiveFunction implements Computable<String, BigInteger> {
	@Override
	public BigInteger comput(String args) throws InterruptedException {
		// do something
		return new BigInteger(args);
	}
}

class Memoizer1<A, V> implements Computable<A, V> {

	private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
	private final Computable<A, V> c;

	public Memoizer1(Computable<A, V> _c) {
		this.c = _c;
	}

	@Override
	public V comput(A arg) throws InterruptedException {
		Future<V> f = cache.get(arg);
		if (f == null) {
			Callable<V> eval = new Callable<V>() {
				@Override
				public V call() throws Exception {
					return c.comput(arg);
				}
			};
			FutureTask<V> ft = new FutureTask<V>(eval);
			f=cache.putIfAbsent(arg, ft);
			if(f==null){
				f=ft;
				ft.run();
			}
		}
		V v=null;
		try {
			v=f.get();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return v;
	}

}

实现方式3中存在的问题是复合操作实在底层的map对象上执行的,而这个对象无法通过加锁来保持原子性。在最终实现中使用了concurrnethashmap中的原子方法putifabsent,避免了重复计算问题。