Java 并发工具箱之concurrent包

 

Java.util.concurrent  包是专为 Java并发编程而设计的包。

BlockingQueue

此接口是一个线程安全的 存取实例的队列。

使用场景

BlockingQueue通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。

BlockingQueue

注意事项:

  • 此队列是有限的,如果队列到达临界点,Thread1就会阻塞,直到Thread2从队列中拿走一个对象。

  • 若果队列是空的,Thread2会阻塞,直到Thread1把一个对象丢进队列。

相关方法

BlockingQueue中包含了如下操作方法:

Throws ExceptionSpecial ValueBlocksTimes Out
Insertadd(o)offer(o)put(o)offer(o, timeout, timeunit)
Removeremove(o)poll()take()poll(timeout, timeunit)
Examineelement()peek()

名词解释:

  • Throws Exception: 如果试图的操作无法立即执行,抛一个异常。

  • Special Value: 如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。

  • Blocks: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。

  • Times Out: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

注意事项:

  • 无法插入 null,否则会抛出一个 NullPointerException。

  • 队列这种数据结构,导致除了获取开始和结尾位置的其他对象的效率都不高,虽然可通过remove(o)来移除任一对象。

实现类

因为是一个接口,所以我们必须使用一个实现类来使用它,有如下实现类:

  • ArrayBlockingQueue: 数组阻塞队列

  • DelayQueue: 延迟队列

  • LinkedBlockingQueue:  链阻塞队列

  • PriorityBlockingQueue:  具有优先级的阻塞队列

  • SynchronousQueue: 同步队列

使用示例:

见: BlockingQueue

ArrayBlockingQueue

ArrayBlockingQueue 是一个有界的阻塞队列

  • 内部实现是将对象放到一个数组里。数组有个特性:一旦初始化,大小就无法修改。因此无法修改ArrayBlockingQueue初始化时的上限。

  • ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

DelayQueue

DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口:

publicinterfaceDelayedextendsComparable<Delayed< {publiclonggetDelay(TimeUnit timeUnit);  // 返回将要延迟的时间段}
  • 1

  • 2

  • 3

  • 1

  • 2

  • 3

  • 在每个元素的 getDelay() 方法返回的值的时间段之后才释放掉该元素。如果返回的是 0 或者负值,延迟将被认为过期,该元素将会在 DelayQueue 的下一次 take  被调用的时候被释放掉。

  • Delayed 接口也继承了 java.lang.Comparable 接口,Delayed对象之间可以进行对比。这对DelayQueue 队列中的元素进行排序时有用,因此它们可以根据过期时间进行有序释放。

LinkedBlockingQueue

内部以一个链式结构(链接节点)对其元素进行存储 。

  • 可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。

  • 内部以 FIFO(先进先出)的顺序对元素进行存储。

PriorityBlockingQueue

一个无界的并发队列,它使用了和类 java.util.PriorityQueue 一样的排序规则。

  • 无法向这个队列中插入 null 值。

  • 插入到 其中的元素必须实现 java.lang.Comparable 接口。

  • 对于具有相等优先级(compare() == 0)的元素并不强制任何特定行为。

  • 从一个 PriorityBlockingQueue 获得一个 Iterator 的话,该 Iterator 并不能保证它对元素的遍历是以优先级为序的。

SynchronousQueue

一个特殊的队列,它的内部同时只能够容纳单个元素。

  • 如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。

  • 如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。

BlockingDeque

此接口表示一个线程安全放入和提取实例的双端队列

使用场景

通常用在一个线程既是生产者又是消费者的时候。 
BlockingDeque

注意事项

  • 如果双端队列已满,插入线程将被阻塞,直到一个移除线程从该队列中移出了一个元素。

  • 如果双端队列为空,移除线程将被阻塞,直到一个插入线程向该队列插入了一个新元素。

相关方法

Throws ExceptionSpecial ValueBlocksTimes Out
InsertaddFirst(o)offerFirst(o)putFirst(o)offerFirst(o, timeout, timeunit)
RemoveremoveFirst(o)pollFirst(o)takeFirst(o)pollFirst(timeout, timeunit)
ExaminegetFirst(o)peekFirst(o)
Throws ExceptionSpecial ValueBlocksTimes Out
InsertaddLast(o)offerLast(o)putLast(o)offerLast(o, timeout, timeunit)
RemoveremoveLast(o)pollLast(o)takeLast(o)pollLast(timeout, timeunit)
ExaminegetLast(o)peekLast(o)

注意事项

  • 关于方法的处理方式和上节一样。

  • BlockingDeque 接口继承自 BlockingQueue 接口,可以用其中定义的方法。

实现类

  • LinkedBlockingDeque : 链阻塞双端队列

LinkedBlockingDeque

LinkedBlockingDeque 是一个双端队列,可以从任意一端插入或者抽取元素的队列。

  • 在它为空的时候,一个试图从中抽取数据的线程将会阻塞,无论该线程是试图从哪一端抽取数据。

ConcurrentMap

一个能够对别人的访问(插入和提取)进行并发处理的 java.util.Map接口。 
ConcurrentMap 除了从其父接口 java.util.Map 继承来的方法之外还有一些额外的原子性方法。

实现类

因为是接口,必须用实现类来使用它,其实现类为

  • ConcurrentHashMap

ConcurrentHashMap与HashTable比较

  • 更好的并发性能,在你从中读取对象的时候 ConcurrentHashMap 并不会把整个 Map 锁住,只是把 Map 中正在被写入的部分进行锁定。

  • 在被遍历的时候,即使是 ConcurrentHashMap 被改动,它也不会抛 ConcurrentModificationException。

ConcurrentNavigableMap

一个支持并发访问的 java.util.NavigableMap,它还能让它的子 map 具备并发访问的能力。

headMap

headMap(T toKey) 方法返回一个包含了小于给定 toKey 的 key 的子 map。

tailMap

tailMap(T fromKey) 方法返回一个包含了不小于给定 fromKey 的 key 的子 map。

subMap

subMap() 方法返回原始 map 中,键介于 from(包含) 和 to (不包含) 之间的子 map。

更多方法

  • descendingKeySet()

  • descendingMap()

  • navigableKeySet()

CountDownLatch

CountDownLatch 是一个并发构造,它允许一个或多个线程等待一系列指定操作的完成。

  • CountDownLatch 以一个给定的数量初始化。countDown() 每被调用一次,这一数量就减一。

  • 通过调用 await() 方法之一,线程可以阻塞等待这一数量到达零。

CyclicBarrier

CyclicBarrier 类是一种同步机制,它能够对处理一些算法的线程实现同步。

更多实例参考:  CyclicBarrier

Exchanger

Exchanger 类表示一种两个线程可以进行互相交换对象的会和点。

更多实例参考:  Exchanger

Semaphore

Semaphore 类是一个计数信号量。具备两个主要方法:

  • acquire()

  • release()

  • 每调用一次 acquire(),一个许可会被调用线程取走。

  • 每调用一次 release(),一个许可会被返还给信号量。

Semaphore 用法

  • 保护一个重要(代码)部分防止一次超过 N 个线程进入。

  • 在两个线程之间发送信号。

保护重要部分

如果你将信号量用于保护一个重要部分,试图进入这一部分的代码通常会首先尝试获得一个许可,然后才能进入重要部分(代码块),执行完之后,再把许可释放掉。

Semaphore semaphore = new Semaphore(1);  //critical section  semaphore.acquire();  ...  semaphore.release();
  • 1

  • 2

  • 3

  • 4

  • 5

  • 1

  • 2

  • 3

  • 4

  • 5

在线程之间发送信号

如果你将一个信号量用于在两个线程之间传送信号,通常你应该用一个线程调用 acquire() 方法,而另一个线程调用 release() 方法。

  • 如果没有可用的许可,acquire() 调用将会阻塞,直到一个许可被另一个线程释放出来。

  • 如果无法往信号量释放更多许可时,一个 release() 调用也会阻塞。

公平性

无法担保掉第一个调用 acquire() 的线程会是第一个获得一个许可的线程。

可以通过如下来强制公平:

Semaphore semaphore = new Semaphore(1true); 
  • 1

  • 1

  • 需要注意,强制公平会影响到并发性能,建议不使用。

ExecutorService

这里之前有过简单的总结:  Java 中几种常用的线程池 

存在于 java.util.concurrent 包里的 ExecutorService 实现就是一个线程池实现。

实现类

此接口实现类包括:

  • ScheduledThreadPoolExecutor : 通过 Executors.newScheduledThreadPool(10)创建的

  • ThreadPoolExecutor: 除了第一种的其他三种方式创建的

相关方法

  • execute(Runnable): 
    无法得知被执行的 Runnable 的执行结果

  • submit(Runnable): 
    返回一个 Future 对象,可以知道Runnable 是否执行完毕。

  • submit(Callable): 
    Callable 实例除了它的 call() 方法能够返回一个结果,通过Future可以获取。

  • invokeAny(…): 
    传入一系列的 Callable 或者其子接口的实例对象,无法保证返回的是哪个 Callable 的结果 ,只能表明其中一个已执行结束。 
    如果其中一个任务执行结束(或者抛了一个异常),其他 Callable 将被取消。

  • invokeAll(…): 
    返回一系列的 Future 对象,通过它们你可以获取每个 Callable 的执行结果。

关闭ExecutorService

  • shutdown() :&

50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信