一、Callable
Callable
接口类似于Runnable
,因为它们都是为其实例可能由另一个线程执行的类设计的。 然而Runnable
不返回结果,也不能抛出被检查的异常
Callable是创建线程的第三种方式,是一个函数式接口
/**
* A task that returns a result and may throw an exception.
* Implementors define a single method with no arguments called
* {@code call}.
*
* <p>The {@code Callable} interface is similar to {@link
* java.lang.Runnable}, in that both are designed for classes whose
* instances are potentially executed by another thread. A
* {@code Runnable}, however, does not return a result and cannot
* throw a checked exception.
*
* <p>The {@link Executors} class contains utility methods to
* convert from other common forms to {@code Callable} classes.
*
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> the result type of method {@code call}
*/
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
使用Callable接口创建线程时,发现Thread中构造方法并没有Callable参数
但是Java中提供了一实现Runnable接口的实现类 FutureTask
1、 Future接口;
- Future是一个接口,代表了一个异步计算的结果。接口中的方法用来检查计算是否完成、等待完成和得到计算的结果。
- 当计算完成后,只能通过get()方法得到结果,get方法会阻塞直到结果准备好了。
- 如果想取消,那么调用cancel()方法。其他方法用于确定任务是正常完成还是取消了。
- 一旦计算完成了,那么这个计算就不能被取消。
2、 FutureTask类;
- FutureTask类实现了RunnableFuture接口,而RunnnableFuture接口继承了Runnable和Future接口,所以说FutureTask是一个提供异步计算的结果的任务。
- FutureTask可以用来包装Callable或者Runnbale对象。因为FutureTask实现了Runnable接口,所以FutureTask也可以被提交给Executor(如上面例子那样)
FutureTask 实现了 RunnableFuture 接口,RunnableFuture 接口继承自 Runnable 接口
public class FutureTask<V> implements RunnableFuture<V> {
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
因此我们可以先创建一个 FutureTask 类,将 Callable 参数传进去,再将 FutureTask 作为参数传入创建 Thread类中
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// new Thread(new Runnable()).start();
// new Thread(new FutureTask<V>()).start();
// new Thread(new FutureTask<V>(Callable)).start();
MyCallable myCallable = new MyCallable();
FutureTask futureTask = new FutureTask(myCallable);
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start();
Integer o = (Integer) futureTask.get(); //这个get 方法可能会产生阻塞!把他放到最后
// 或者使用异步通信来处理!
System.out.println(o);
}
}
class MyCallable implements Callable<Integer>{
@Override
public Integer call() throws Exception {
// 耗时操作
System.out.println("call()");
return 200;
}
}
输出
call()
200
注:
1、 Callable接口会有缓存,开启多个线程但是只返回一个输出;
2、 返回结果会阻塞线程,比较耗时,尽量放在代码最后或者使用异步通信处理;
Callable 和 Runnable 区别
//Callable 接口
public interface Callable<V> {
V call() throws Exception;
}
// Runnable 接口
public interface Runnable {
public abstract void run();
}
1、 Callable规定的方法是call(),Runnable规定的方法是run().;
2、 Callable的任务执行后可返回值,而Runnable的任务是不能返回值的;
3、 call方法可以抛出异常,run方法不可以;
4、 运行Callable任务可以拿到一个Future对象,Future表示异步计算的结果(executorService.submit(Runnabletask)也会返回future,但是没有future的效果);
二、集合类线程不安全
(一)List
一般在多线程下,使用List
ArrayList是非线程安全的,在多线程的情况下,向list插入数据的时候,可能会造成数据丢失的情况。并且一个线程在遍历List,另一个线程修改List,会报ConcurrentModificationException(并发修改异常)错误
java.util.ConcurrentModificationException 并发修改异常
一般有三种解决方案
1、使用 Ventor
Vector是一个线程安全的List,但是它的线程安全实现方式是对所有操作都加上了synchronized关键字,这种方式严重影响效率.所以并不推荐使用Vector
2、使用 Collections.synchronizedList(List list)
首先Collections.synchronizedList(new ArrayList<>());
public static <T> List<T> synchronizedList(List<T> list) {
return (list instanceof RandomAccess ?
new SynchronizedRandomAccessList<>(list) :
new SynchronizedList<>(list));
}
这个方法回根据你传入的List是否实现RandomAccess这个接口来返回的SynchronizedRandomAccessList还是SynchronizedList
SynchronizedList源码
static class SynchronizedList<E>
extends SynchronizedCollection<E>
implements List<E> {
private static final long serialVersionUID = -7754090372962971524L;
final List<E> list;
SynchronizedList(List<E> list) {
super(list);
this.list = list;
}
SynchronizedList(List<E> list, Object mutex) {
super(list, mutex);
this.list = list;
}
public boolean equals(Object o) {
if (this == o)
return true;
synchronized (mutex) {return list.equals(o);}
}
public int hashCode() {
synchronized (mutex) {return list.hashCode();}
}
public E get(int index) {
synchronized (mutex) {return list.get(index);}
}
public E set(int index, E element) {
synchronized (mutex) {return list.set(index, element);}
}
public void add(int index, E element) {
synchronized (mutex) {list.add(index, element);}
}
public E remove(int index) {
synchronized (mutex) {return list.remove(index);}
}
public int indexOf(Object o) {
synchronized (mutex) {return list.indexOf(o);}
}
public int lastIndexOf(Object o) {
synchronized (mutex) {return list.lastIndexOf(o);}
}
public boolean addAll(int index, Collection<? extends E> c) {
synchronized (mutex) {return list.addAll(index, c);}
}
public ListIterator<E> listIterator() {
return list.listIterator(); // Must be manually synched by user
}
public ListIterator<E> listIterator(int index) {
return list.listIterator(index); // Must be manually synched by user
}
public List<E> subList(int fromIndex, int toIndex) {
synchronized (mutex) {
return new SynchronizedList<>(list.subList(fromIndex, toIndex),
mutex);
}
}
@Override
public void replaceAll(UnaryOperator<E> operator) {
synchronized (mutex) {list.replaceAll(operator);}
}
@Override
public void sort(Comparator<? super E> c) {
synchronized (mutex) {list.sort(c);}
}
... ...
}
执行add()等方法的时候加了synchronized关键字,但是listIterator(),iterator()方法却没有加
3、使用 CopyOnWriteArrayList
CopyOnWriteArrayList 底层是数组实现的,主要有以下两个变量
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
}
1、 lock:ReentrantLock,独占锁,多线程运行的情况下,只有一个线程会获得这个锁,只有释放锁后其他线程才能获得;
2、 array:存放数据的数组,关键是被volatile修饰了,被volatile修饰,就保证了可见性,也就是一个线程修改后,其他线程立即可见;
CopyOnWriteArrayList原理
1、CopyOnWriteArrayList实现了List接口,因此它是一个队列
2、CopyOnWriteArrayList包含了成员lock。每一个CopyOnWriteArrayList都和一个监视器锁lock绑定,通过lock,实现了对CopyOnWriteArrayList的互斥访问
3、CopyOnWriteArrayList包含了成员array数组,这说明CopyOnWriteArrayList本质上通过数组实现的
4、CopyOnWriteArrayList的“动态数组”机制 -- 它内部有个“volatile数组”(array)来保持数据。在“添加/修改/删除”数据时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该数组赋值给“volatile数组”。这就是它叫做CopyOnWriteArrayList的原因!CopyOnWriteArrayList就是通过这种方式实现的动态数组;不过正由于它在“添加/修改/删除”数据时,都会新建数组,所以涉及到修改数据的操作,CopyOnWriteArrayList效率很 低;但是单单只是进行遍历查找的话,效率比较高
5、CopyOnWriteArrayList的“线程安全”机制 -- 是通过volatile和监视器锁Synchrnoized来实现的
6、CopyOnWriteArrayList是通过“volatile数组”来保存数据的。一个线程读取volatile数组时,总能看到其它线程对该volatile变量最后的写入;就这样,通过volatile提供了“读取到的数据总是最新的”这个机制的 保证
7、CopyOnWriteArrayList通过监视器锁Synchrnoized来保护数据。在“添加/修改/删除”数据时,会先“获取监视器锁”,再修改完毕之后,先将数据更新到“volatile数组”中,然后再“释放互斥锁”;这样,就达到了保护数据的目的
可以看到 add()方法使用 lock进行加锁
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
CopyOnWriteArrayList 添加数组的步骤如下:
1、获得独占锁,将添加功能加锁
2、获取原来的数组,并得到其长度
3、创建一个长度为原来数组长度+1的数组,并拷贝原来的元素给新数组
4、追加元素到新数组末尾
5、指向新数组
6、释放锁
这个过程是线程安全的,**写入时复制(COW)**的核心思想就是每次修改的时候拷贝一个新的资源去修改,add()方法再拷贝新资源的时候将数组容量+1,这样虽然每次添加元素都会浪费一定的空间,但是数组的长度正好是元素的长度,也在一定程度上节省了扩容的开销
/**
* List 线程不安全
* java.util.ConcurrentModificationException 并发修改异常
*/
public class ListTest {
public static void main(String[] args) {
// 单线程下安全;并发下 线程不安全
//List<String> list = new ArrayList<>();
/* 解决方案
1、List<String> list = new Vector<>(); // 线程安全,源码中方法都有 synchronized 修饰
2、List<String> list = Collections.synchronizedList(new ArrayList<>());
3、List<String> list = new CopyOnWriteArrayList<>();
CopyOnWrite 写入时复制(COW);计算机程序设计领域的一种优化策略
多个线程调用的时候,list,读取的时候,固定的,写入(覆盖);在写入的时候避免覆盖,造成数据问题
读写分离
* */
//List<String> list = new Vector<>(); // 线程安全,源码中方法都有 synchronized 修饰
//List<String> list = Collections.synchronizedList(new ArrayList<>());
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
(二)Set
/**
* java.util.ConcurrentModificationException
*/
public class SetTest {
public static void main(String[] args) {
//Set<String> set = new HashSet<>();
//Set<String> set = Collections.synchronizedSet(new HashSet<>());
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 1; i <=10 ; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
(三)Map
// java.util.ConcurrentModificationException
public class MapTest {
public static void main(String[] args) {
// 默认等价于 new HashMap<>(16,0.75);
// Map<String, String> map = new HashMap<>();
Map<String, String> map = new ConcurrentHashMap<>();
for (int i = 1; i <=10; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
ConcurrentHashMap源码分析
1、 添加元素put/putVal方法;
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
//如果有空值或者空键,直接抛异常
if (key == null || value == null) throw new NullPointerException();
//基于key计算hash值,并进行一定的扰动
int hash = spread(key.hashCode());
//记录某个桶上元素的个数,如果超过8个,会转成红黑树
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果数组还未初始化,先对数组进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//如果hash计算得到的桶位置没有元素,利用cas将元素添加
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//cas+自旋(和外侧的for构成自旋循环),保证元素添加安全
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果hash计算得到的桶位置元素的hash值为MOVED,证明正在扩容,那么协助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
//hash计算的桶位置元素不为空,且当前没有处于扩容操作,进行元素添加
V oldVal = null;
//对当前桶进行加锁,保证线程安全,执行元素添加操作
synchronized (f) {
if (tabAt(tab, i) == f) {
//普通链表节点
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//树节点,将元素添加到红黑树中
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//链表长度大于/等于8,将链表转成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
//如果是重复键,直接将旧值返回
if (oldVal != null)
return oldVal;
break;
}
}
}
//添加的是新元素,维护集合长度,并判断是否要进行扩容操作
addCount(1L, binCount);
return null;
}
需要添加元素时,会针对当前元素所对应的桶位进行加锁操作,这样一方面保证元素添加时,多线程的安全,同时对某个桶位加锁不会影响其他桶位的操作,进一步提升多线程的并发效率
2、 数组初始化,initTable方法;
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//cas+自旋,保证线程安全,对数组进行初始化操作
while ((tab = table) == null || tab.length == 0) {
//如果sizeCtl的值(-1)小于0,说明此时正在初始化, 让出cpu
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//cas修改sizeCtl的值为-1,修改成功,进行数组初始化,失败,继续自旋
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
//sizeCtl为0,取默认长度16,否则去sizeCtl的值
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//基于初始长度,构建数组对象
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//计算扩容阈值,并赋值给sc
sc = n - (n >>> 2);
}
} finally {
//将扩容阈值,赋值给sizeCtl
sizeCtl = sc;
}
break;
}
}
return tab;
}
3、 数组扩容;
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//如果是多cpu,那么每个线程划分任务,最小任务量是16个桶位的迁移
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//如果是扩容线程,此时新数组为null
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//两倍扩容创建新数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//记录线程开始迁移的桶位,从后往前迁移
transferIndex = n;
}
//记录新数组的末尾
int nextn = nextTab.length;
//已经迁移的桶位,会用这个节点占位(这个节点的hash值为-1--MOVED)
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
//i记录当前正在迁移桶位的索引值
//bound记录下一次任务迁移的开始桶位
//--i >= bound 成立表示当前线程分配的迁移任务还没有完成
if (--i >= bound || finishing)
advance = false;
//没有元素需要迁移 -- 后续会去将扩容线程数减1,并判断扩容是否完成
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//计算下一次任务迁移的开始桶位,并将这个值赋值给transferIndex
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//如果没有更多的需要迁移的桶位,就进入该if
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//扩容结束后,保存新数组,并重新计算扩容阈值,赋值给sizeCtl
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//扩容任务线程数减1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//判断当前所有扩容任务线程是否都执行完成
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//所有扩容线程都执行完,标识结束
finishing = advance = true;
i = n; // recheck before commit
}
}
//当前迁移的桶位没有元素,直接在该位置添加一个fwd节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//当前节点已经被迁移
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//当前节点需要迁移,加锁迁移,保证多线程安全
//此处迁移逻辑和jdk7的ConcurrentHashMap相同,不再赘述
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}