队列的特点是先进先出,栈的特点是后进先出。Queue继承Collection接口,Stack继承Vector容器类,最顶层接口也是Collection。在Java中容器分为Collection和Map两大类。 Collection家族中除了常见的List、Set,现在又新增一个Queue、Stack。
1 public interface Queue <E> extends Collection <E> {}
BlockingQueue 的四组API
方式
抛出异常
不抛异常,有返回值
阻塞等待
超时等待
入队列
add
offer
put
offer(Element,Time,TimeUnit)
出队列
remove
poll
take
poll(,)
返回头部元素
element
peek
-
-
BlockingQueue主要有两个实现:ArrayBlockingQueue、LinkedBlockingQueue。
BlockingQueue的其中一组核心方法(只介绍其中一组,其它三组类似):
方法名
描述
offer(anObject)
表示如果可以的话,将anObject加到BlockingQueue里,放入成功返回true,否则返回false。
offer(E o, long timeout, TimeUnit unit)
可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
put(anObject)
把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续。
poll(time)
取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。
poll(long timeout, TimeUnit unit)
从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。
take()
取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;。
drainTo()
一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
在ArrayBlockingQueue中对poll(long timeout, TimeUnit unit) 方法的具体实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public E poll (long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) { if (nanos <= 0 ) return null ; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }
应用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 BlockingQueue<String> queue = new ArrayBlockingQueue <>(3 ); BlockingQueue<String> syncQueue = new SynchronousQueue <>(); public void test1 () { queue.add("1" ); queue.add("2" ); queue.add("3" ); queue.add("4" ); System.out.println(queue.element()); System.out.println(queue.remove()); System.out.println(queue.remove()); System.out.println(queue.remove()); System.out.println(queue.remove()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public void test2 () { queue.offer("1" ); queue.offer("2" ); queue.offer("3" ); System.out.println(queue.offer("4" )); System.out.println(queue.peek()); queue.poll(); queue.poll(); queue.poll(); System.out.println(queue.size()); System.out.println(queue.poll()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void test3 () throws InterruptedException { queue.put("1" ); queue.put("2" ); queue.put("3" ); queue.put("4" ); queue.take(); queue.take(); queue.take(); System.out.println(queue.size()); queue.take(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void test4 () throws InterruptedException { queue.offer("1" ); queue.offer("2" ); queue.offer("3" ); System.out.println(queue.offer("4" , 2 , TimeUnit.SECONDS)); queue.poll(); queue.poll(); queue.poll(); System.out.println(queue.size()); System.out.println(queue.poll(2 , TimeUnit.SECONDS)); }
SynchronousQueue 同步队列,不存储元素,一个元素进出完成后,下一个元素才能进出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public void test5 () { new Thread (() -> { try { System.out.println(Thread.currentThread().getName() + " put 1" ); syncQueue.put("1" ); System.out.println(Thread.currentThread().getName() + " put 2" ); syncQueue.put("2" ); System.out.println(Thread.currentThread().getName() + " put 3" ); syncQueue.put("3" ); } catch (InterruptedException e) { e.printStackTrace(); } }, "A" ).start(); new Thread (() -> { try { TimeUnit.SECONDS.sleep(2 ); System.out.println(Thread.currentThread().getName() + "=>" + syncQueue.take()); TimeUnit.SECONDS.sleep(2 ); System.out.println(Thread.currentThread().getName() + "=>" + syncQueue.take()); TimeUnit.SECONDS.sleep(2 ); System.out.println(Thread.currentThread().getName() + "=>" + syncQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }, "B" ).start(); }
PriorityQueue 优先级队列 PriorityQueue是一个带有优先级的队列,而不是先进先出队列,元素按优先级顺序被移除,该队列也没有上限(即 Integer.MAX_VALUE),无容量限制,自动扩容。
此队列虽然没有容量限制,但是会由于服务器资源耗尽抛OutOfMemoryError异常。
如果队列为空,那么取元素的操作take就会阻塞,所以检索操作take是受阻的。
放入PriorityQueue中的元素需要具有比较能力。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class PriorityQueueDemo { public static void main (String[] args) { PriorityQueue<String> priorityQueue = new PriorityQueue <>(new Comparator <String>() { @Override public int compare (String o1, String o2) { return 0 ; } }); priorityQueue.add("c" ); priorityQueue.add("a" ); priorityQueue.add("b" ); System.out.println(priorityQueue.poll()); System.out.println(priorityQueue.poll()); System.out.println(priorityQueue.poll()); PriorityQueue<MessageObject> MessageObjectQueue = new PriorityQueue <>(new Comparator <MessageObject>() { @Override public int compare (MessageObject o1, MessageObject o2) { return o1.order > o2.order ? -1 : 1 ; } }); } static class MessageObject { String content; int order; } }
下面探寻一下延时队列的实现原理。
1 2 3 4 5 6 public class DelayQueue <E extends Delayed > extends AbstractQueue <E> implements BlockingQueue <E> { private final PriorityQueue<E> q = new PriorityQueue <E>(); ... }
DelayQueue的泛型必须实现Delayed接口。
1 2 3 4 5 6 7 8 9 10 11 12 public interface Delayed extends Comparable <Delayed> { long getDelay (TimeUnit unit) ; }
应用示例
线程池中的定时调度就是使用这样的方法实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class DelayQueueDemo { public static void main (String[] args) throws InterruptedException { DelayQueue<Message> delayQueue = new DelayQueue <Message>(); Message message = new Message ("message - 00001" , new Date (System.currentTimeMillis() + 5000L )); delayQueue.add(message); while (true ) { System.out.println(delayQueue.poll()); Thread.sleep(1000L ); } } static class Message implements Delayed { String content; Date sendTime; public Message (String content, Date sendTime) { this .content = content; this .sendTime = sendTime; } @Override public long getDelay (TimeUnit unit) { long duration = sendTime.getTime() - System.currentTimeMillis(); return TimeUnit.NANOSECONDS.convert(duration, TimeUnit.MILLISECONDS); } @Override public int compareTo (Delayed o) { return o.getDelay(TimeUnit.NANOSECONDS) > this .getDelay(TimeUnit.NANOSECONDS) ? 1 : -1 ; } } }