java多线程之线程池(4)_线程池 lyzxii-程序员宅基地

技术标签: 多线程  线程池  

1.Executor框架浅析 

首先我们得明白一个问题,为什么需要线程池?在java中,使用线程来执行异步任务时,线程的创建和销毁需要一定的开销,如果我们为每一个任务创建一个新的线程来执行的话,那么这些线程的创建与销毁将消耗大量的计算资源。同时为每一个任务创建一个新线程来执行,这样的方式可能会使处于高负荷状态的应用最终崩溃。所以线程池的出现为解决这个问题带来曙光。我们将在线程池中创建若干条线程,当有任务需要执行时就从该线程池中获取一条线程来执行任务,如果一时间任务过多,超出线程池的线程数量,那么后面的线程任务就进入一个等待队列进行等待,直到线程池有线程处于空闲时才从等待队列获取要执行的任务进行处理,以此循环.....这样就大大减少了线程创建和销毁的开销,也会缓解我们的应用处于超负荷时的情况。

1.1 Executor框架的两级调度模型

在java线程启动时会创建一个本地操作系统线程,当该java线程终止时,这个操作系统线程也会被回收。而每一个java线程都会被一对一映射为本地操作系统的线程,操作系统会调度所有的线程并将它们分别给可用的CPU。而所谓的映射方式是这样实现的,在上层,java多线程程序通过把应用分为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。这样种两级调度模型如下图所示:

从图中我们可以看出,应用程序通过Executor框架控制上层的调度,而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。

 

1.2 Executor框架的结构

Executor框架的结构主要包括3个部分

  • 任务:包括被执行任务需要实现的接口:Runnable接口或Callable接口
  • 任务的执行:包括任务执行机制的核心接口Executor,以及继承自Executor的EexcutorService接口。Exrcutor有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
  • 异步计算的结果:包括接口Future和实现Future接口的FutureTask类

UML关系图:

下面我们通过一个UML图来认识一下这些类间的关系:

Extecutor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。

ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。

ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令,比Timer更灵活强大

 

Future接口和实现Future接口的FutureTask类,代表异步计算的结果

Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或者ScheduledThreadPoolExecutor执行。区别就是Runnable无法返回执行结果,而Callable可以返回执行结果。

 

1.3 Executor框架各个接口的解读

 

Runnable接口和Callable接口具体区别:

Runnable是java.lang包下,是一个接口,里面只声明了一个run()方法: 由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。

Callable位于java.util.concurrent包下,也是一个接口,里面也只声明了一个方法叫做call(),可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

public interface Runnable{

    public abstract void run();

}

public interface Callable<V>{

    /**

     *Computes a result, or throws an exception if unable to do so.

     *

     *@return computed result

     *@throws Exception if unable to compute a result

     */

    Vcall() throws Exception;

}

我们通过一张图来理解它们间的执行关系

分析说明:

1.主线程首先创建实现Runnable或Callable接口的任务对象,在重写的方法里实现具体的任务细节

    此外我们可以利用工具类Executors把一个Runnable对象封装为一个Callable对象,具体如下面的两种方式:

Executors.callable(Runnabletask)或者Executors.callable(Runnabletask,Object result)

2.然后可以把Runnable对象或者Callbale对象直接提交给ExecutorService执行,在ExecutorService接口中有以下若干个方法。

  • submit()方法可以执行Runnable对象或者Callbale对象,
  • execute()方法则是执行Runnable对象的

这里需要注意的是如果执行submit()方法将返回一个实现Future接口的对象(其实就是FutureTask)。当然由于FutureTask实现了Runnable接口,我们也可以直接创建FutureTask,然后提交给ExecutorService执行。

<T>Future<T> submit(Callable<T> task);

<T>Future<T> submit(Runnable task, T result);(不常用)

Future<?>submit(Runnable task);

Void execute(Runnabletask)

Future接口:

Future就是对于具体的Runnable或者Callable任务的执行进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

Future类位于java.util.concurrent包下,它是一个接口:

       
public interface Future<V>{

    /**cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。*/

    boolean cancel(boolean mayInterruptIfRunning);


    //isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

    boolean isCancelled();

    
    //isDone方法表示任务是否已经完成,若任务完成,则返回true;
    boolean isDone();

    
    //get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
    Vget() throws InterruptedException, ExecutionException;

    
    //get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

    Vget(long timeout, TimeUnit unit)

        throws InterruptedException,ExecutionException, TimeoutException;

}

也就是说Future提供了三种功能:

  1)判断任务是否完成;

  2)能够中断任务;

  3)能够获取任务执行结果。

  因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

 

FutureTask类:

FutureTask类实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable接口和Future接口,因此FutureTask既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

public class FutureTask<V> implements RunnableFuture<V>

public interface RunnableFuture<V> extends Runnable,Future<V> {

    void run();

}

FutureTask是Future的唯一实现类,构造方法如下:

public FutureTask(Callable<V>callable) {

}

public FutureTask(Runnablerunnable, V result) {

}

Future和FutureTask的使用案列

1.使用Callable+Future获取执行结果

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}

2.使用Callable+FutureTask获取执行结果

public class Test {
    public static void main(String[] args) {
        //第一种方式
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        executor.shutdown();
         
        //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
        /*Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        Thread thread = new Thread(futureTask);
        thread.start();*/
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}

2.ThreadPoolExecutor浅析 

ThreadPoolExecutor是线程的真正实现,通常使用工厂类Executors来创建,但它的构造方法提供了一系列参数来配置线程池,下面我们就先介绍ThreadPoolExecutor的构造方法中各个参数的含义。

	public ThreadPoolExecutor(int corePoolSize,  
	                          int maximumPoolSize,  
	                          long keepAliveTime,  
	                          TimeUnit unit,  
	                          BlockingQueue<Runnable> workQueue,  
	                          ThreadFactory threadFactory) {  
	        					this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  
	            					 threadFactory, defaultHandler);  
	    }  

corePoolSize:线程池的核心线程数,默认情况下,核心线程数会一直在线程池中存活,即使它们处理闲置状态。如果将ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,那么闲置的核心线程在等待新任务到来时会执行超时策略,这个时间间隔由keepAliveTime所指定,当等待时间超过keepAliveTime所指定的时长后,核心线程就会被终止。

maximumPoolSize:线程池所能容纳的最大线程数量,当活动线程数到达这个数值后,后续的新任务将会被阻塞。

keepAliveTime:非核心线程闲置时的超时时长,超过这个时长,非核心线程就会被回收。当ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true时,keepAliveTime同样会作用于核心线程。

unit:用于指定keepAliveTime参数的时间单位,这是一个枚举,常用的有TimeUnit.MILLISECONDS(毫秒),TimeUnit.SECONDS(秒)以及TimeUnit.MINUTES(分钟)等。

workQueue:线程池中的任务队列,通过线程池的execute方法提交Runnable对象会存储在这个队列中。

threadFactory:线程工厂,为线程池提供创建新线程的功能。ThreadFactory是一个接口,它只有一个方法:Thread newThread(Runnable r)。

除了上面的参数外还有个不常用的参数,RejectExecutionHandler,这个参数表示当ThreadPoolExecutor已经关闭或者ThreadPoolExecutor已经饱和时(达到了最大线程池大小而且工作队列已经满),execute方法将会调用Handler的rejectExecution方法来通知调用者,默认情况 下是抛出一个RejectExecutionException异常。

RejectExecutionException是一个接口,它有4个实现类,代表4种饱和策略,当有界队列被填满之后,饱和策略开始发挥作用,ThreadPoolExecutor的饱和策略可以通过setRejectedExecutionHandler来修改(如果某个任务被提交到一个已被关闭的Executor时,也会用到饱和策略),JDK提供几种不同的RejectedExecutionHandler实现,每种实现都包含不同的饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy

public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

中止(Abort)策略是 默认的饱和策略:AbortPolicy,直接抛出未检查RejectedExecutionExecption,调用者可以捕获这个异常,根据需求处理代码

    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         * 始终抛出异常,交由调用者处理
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

抛弃(Discard)策略会悄悄抛弃该任务,方法体什么都不做

    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

抛弃最旧的(Discard-Oldest)策略会抛弃下一个将被执行的任务(任务队列的头部的任务,也就是s“最旧的”任务),然后尝试重新提交新的任务,(如果工作队列是一个优先队列,那么抛弃最旧的策略抛弃优先级最高的任务,因此最好不要将抛弃最旧的饱和策略和优先级队列放在一起使用)

   public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

调用者运行(Caller-Runs)策略实现了一种调节机制,该策略既不会抛弃任务也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量,我们看到这个策略显然不想放弃执行任务。那么就用当前的Executor进行执行。不过,这样也有弊端,那就是阻塞当前Executor线程,造成该线程池无法调度任务。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

 

了解完相关构造函数的参数,我们再来看看ThreadPoolExecutor执行任务时的大致规则:

(1)如果线程池的数量还未达到核心线程的数量,那么会直接启动一个核心线程来执行任务

(2)如果线程池中的线程数量已经达到或者超出核心线程的数量,那么任务会被插入到任务队列中排队等待执行。

(3)如果在步骤(2)中无法将任务插入到任务队列中,这往往是由于任务队列已满,这个时候如果线程数量未达到线程池规定的最大值,那么会立刻启动一个非核心线程来执行任务。

(4)如果在步骤(3)中线程数量已经达到线程池规定的最大值,那么就会拒绝执行此任务,ThreadPoolExecutor会调用RejectExecutionHandler的rejectExecution方法来通知调用者。

到此ThreadPoolExecutor的详细配置了解完了,ThreadPoolExecutor的执行规则也了解完了,那么接下来我们就来介绍3种常见的线程池,它们都直接或者间接地通过配置ThreadPoolExecutor来实现自己的功能特性,这个3种线程池分别是FixedThreadPool,CachedThreadPool,ScheduledThreadPool以及SingleThreadExecutor。

 

2.1 FixedThreadPool

 FixedThreadPool模式会使用一个优先固定数目的线程来处理若干数目的任务。规定数目的线程处理所有任务,一旦有线程处理完了任务就会被用来处理新的任务(如果有的话)。FixedThreadPool模式下最多的线程数目是一定的。创建FixedThreadPool对象代码如下:

ExecutorService fixedThreadPool=Executors.newFixedThreadPool(5);

我们来看看FixedThreadPool创建方法源码:

public static ExecutorService newFixedThreadPool(int nThreads) {  
        return new ThreadPoolExecutor(nThreads, nThreads,  
                                      0L, TimeUnit.MILLISECONDS,  
                                      new LinkedBlockingQueue<Runnable>());  
    }

FixedThreadPool的corePoolSize和maximumPoolSize参数都被设置为nThreads。当线程池中的线程数量大于corePoolSize时,keepAliveTime为非核心空闲线程等待新任务的最长时间,超过这个时间后非核心线程将被终止,这里keepAliveTime设置为0L,就说明非核心线程会立即被终止。事实上这里也没有非核心线程创建,因为核心线程数和最大线程数都一样的。下面我们来看看FixedThreadPool的execute()方法的运行流程

分析:

(1)如果当前运行线程数少corePoolSize,则创建一个新的线程来执行任务。

(2)如果当前线程池的运行线程数等于corePoolSize,那么后面提交的任务将加入LinkedBlockingQueue。

(3)线程在执行完图中的1后,会在循环中反复从LinkedBlockingQueue获取任务来执行。

这里还有点要说明的是FixedThreadPool使用的是无界队列LinkedBlockingQueue作为线程池的工作队列(队列容量为Integer.MAX_VALUE)。使用该队列作为工作队列会对线程池产生如下影响

(1)当前线程池中的线程数量达到corePoolSize后,新的任务将在无界队列中等待。

(2)由于我们使用的是无界队列,所以参数maximumPoolSize和keepAliveTime无效。

(3)由于使用无界队列,运行中的FixedThreadPool不会拒绝任务(当然此时是未执行shutdown和shutdownNow方法),所以不会去调用RejectExecutionHandler的rejectExecution方法抛出异常。

下面我们给出案例,该案例来自java编程思想一书:

	public class LiftOff implements Runnable{     
	    protected int countDown = 10; //Default     
	    private static int taskCount = 0;     
	    private final int id = taskCount++;      
	    public LiftOff() {}     
	    public LiftOff(int countDown) {     
	        this.countDown = countDown;     
	    }     
	    public String status() {     
        return "#" + id + "(" +     
	            (countDown > 0 ? countDown : "LiftOff!") + ") ";     
	    }     
	    @Override     
	    public void run() {     
        while(countDown-- > 0) {     
	            System.out.print(status());     
	            Thread.yield();     
	        }     
	             
    }        
} 

 

声明一个Runnable对象,使用FixedThreadPool执行任务如下:

	public class FixedThreadPool {     
	    public static void main(String[] args) {    
	        //三个线程来执行五个任务     
        ExecutorService exec = Executors.newFixedThreadPool(3);        
	        for(int i = 0; i < 5; i++) {     
	            exec.execute(new LiftOff());     
	        }    
	        exec.shutdown();     
	    }     
	} 

2.2 CachedThreadPool

CachedThreadPool首先会按照需要创建足够多的线程来执行任务(Task)。随着程序执行的过程,有的线程执行完了任务,可以被重新循环使用时,才不再创建新的线程来执行任务。创建方式:

ExecutorService cachedThreadPool=Executors.newCachedThreadPool();

我们来看看CachedThreadPool创建方法源码:

public static ExecutorService newCachedThreadPool() {  
	   return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
	                                 60L, TimeUnit.SECONDS,  
	                                 new SynchronousQueue<Runnable>());  
	}  

从该静态方法,我们可以看到CachedThreadPool的corePoolSize被设置为0,而maximumPoolSize被设置Integer.MAX_VALUE,即maximumPoolSize是无界的,而keepAliveTime被设置为60L,单位为妙。也就是空闲线程等待时间最长为60秒,超过该时间将会被终止。而且在这里CachedThreadPool使用的是没有容量的SynchronousQueue作为线程池的工作队列,但其maximumPoolSize是无界的,也就是意味着如果主线程提交任务的速度高于maximumPoolSize中线程处理任务的速度时CachedThreadPool将会不断的创建新的线程,在极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。CachedThreadPool的execute()方法的运行流程


 

 

分析:

(1)首先执行SynchronousQueue.offer(Runnable task),添加一个任务。如果当前CachedThreadPool中有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),其中NANOSECONDS是毫微秒即十亿分之一秒(就是微秒/1000),那么主线程执行offer操作与空闲线程执行poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则进入第(2)步。

(2)当CachedThreadPool初始线程数为空时,或者当前没有空闲线程,将没有线程去执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这样的情况下,步骤(1)将会失败,此时CachedThreadPool会创建一个新的线程来执行任务,execute()方法执行完成。

(3)在步骤(2)中创建的新线程将任务执行完成后,会执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒,如果60秒内主线程提交了一个新任务,那么这个空闲线程将会执行主线程提交的新任务,否则,这个空闲线程将被终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的 CachedThreadPool是不会使用任何资源的。

根据前面的分析我们知道SynchronousQueue是一个没有容量的阻塞队列(其实个人认为是相对应时间而已的没有容量,因为时间到空闲线程就会被移除)。每个插入操作必须等到一个线程与之对应。CachedThreadPool使用SynchronousQueue,把主线程的任务传递给空闲线程执行。流程如下:

CachedThreadPool使用的案例代码如下

	public class CachedThreadPool {     
	    public static void main(String[] args) {     
	        ExecutorService exec = Executors.newCachedThreadPool();     
	        for(int i = 0; i < 10; i++) {     
	            exec.execute(new LiftOff());     
	        }     
	        exec.shutdown();         
	}

2.3 SingleThreadExecutor

SingleThreadExecutor模式只会创建一个线程。它和FixedThreadPool比较类似,不过线程数是一个。如果多个任务被提交给SingleThreadExecutor的话,那么这些任务会被保存在一个队列中,并且会按照任务提交的顺序,一个先执行完成再执行另外一个线程。SingleThreadExecutor模式可以保证只有一个任务会被执行。这种特点可以被用来处理共享资源的问题而不需要考虑同步的问题。

创建方式:

ExecutorService singleThreadExecutor=Executors.newSingleThreadExecutor();

构造方法如下:

public static ExecutorService newSingleThreadExecutor() {  
	 return new FinalizableDelegatedExecutorService  
	         (new ThreadPoolExecutor(1, 1,  
	                                 0L, TimeUnit.MILLISECONDS,  
                                        new LinkedBlockingQueue<Runnable>()));  
	          }  

从静态方法可以看出SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1,其他参数则与FixedThreadPool相同。SingleThreadExecutor使用的工作队列也是无界队列LinkedBlockingQueue。由于SingleThreadExecutor采用无界队列的对线程池的影响与FixedThreadPool一样,这里就不过多描述了。同样的我们先来看看其运行流程:

 

分析:

 

(1)如果当前线程数少于corePoolSize即线程池中没有线程运行,则创建一个新的线程来执行任务。

(2)在线程池的线程数量等于corePoolSize时,将任务加入到LinkedBlockingQueue。

(3)线程执行完成(1)中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行。

SingleThreadExecutor使用的案例代码如下:

public class SingleThreadExecutor {     
	 public static void main(String[] args) {     
	      ExecutorService exec = Executors.newSingleThreadExecutor();     
	        for (int i = 0; i < 2; i++) {     
            	    exec.execute(new LiftOff());     
	        }     
	    }     
	}

2.4 各自的适用场景

FixedThreadPool:适用于为了满足资源管理需求,而需要限制当前线程的数量的应用场景,它适用于负载比较重的服务器。

SingleThreadExecutor:适用于需要保证执行顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的场景。

CachedThreadPool:大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者负载较轻的服务器。

3.ScheduledThreadPoolExecutor浅析 

3.1 ScheduledThreadPoolExecutor执行机制分析

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后执行任务,或者定期执行任务。ScheduledThreadPoolExecutor的功能与Timer类似,但比Timer更强大,更灵活,Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。接下来我们先来了解一下ScheduledThreadPoolExecutor的运行机制:

 

 

 

 

分析:DelayQueue是一个无界队列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中无意义。ScheduledThreadPoolExecutor的执行主要分为以下两个部分

(1)当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFuture接口的ScheduleFutureTask。

(2)线程池中的线程从DelayQueue中获取ScheduleFutureTask,然后执行任务。

3.2 如何创建ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor通常使用工厂类Executors来创建,Executors可以创建两种类型的ScheduledThreadPoolExecutor,如下:

(1)ScheduledThreadPoolExecutor:可以执行并行任务也就是多条线程同时执行。

(2)SingleThreadScheduledExecutor:可以执行单条线程。

创建ScheduledThreadPoolExecutor的方法构造如下:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)  
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)  

创建SingleThreadScheduledExecutor的方法构造如下

 

	public static ScheduledExecutorService newSingleThreadScheduledExecutor()  
	public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)  

3.3 ScheduledThreadPoolExecutorSingleThreadScheduledExecutor的适用场景

ScheduledThreadPoolExecutor:适用于多个后台线程执行周期性任务,同时为了满足资源管理的需求而需要限制后台线程数量的应用场景。

SingleThreadScheduledExecutor:适用于需要单个后台线程执行周期任务,同时需要保证任务顺序执行的应用场景。

3.4 ScheduledThreadPoolExecutor使用案例

我们创建一个Runnable的对象,然后使用ScheduledThreadPoolExecutor的Scheduled()来执行延迟任务,输出执行时间即可:

我们先来介绍一下该类延迟执行的方法:

public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);

参数解析:

command:就是一个实现Runnable接口的类

delay:延迟多久后执行。

unit:用于指定keepAliveTime参数的时间单位,这是一个枚举,常用的有TimeUnit.MILLISECONDS(毫秒),TimeUnit.SECONDS(秒)以及TimeUnit.MINUTES(分钟)等。

这里要注意这个方法会返回ScheduledFuture实例,可以用于获取线程状态信息和延迟时间。

public class WorkerThread implements Runnable{  
   		 @Override  
    public void run() {  
         System.out.println(Thread.currentThread().getName()+" Start. Time = "+getNowDate());  
	         threadSleep();  
	         System.out.println(Thread.currentThread().getName()+" End. Time = "+getNowDate());  
	          
	    }  
    /** 
	     * 睡3秒 
     */  
    public void threadSleep(){  
	        try {  
            Thread.sleep(3000);  
	        } catch (InterruptedException e) {  
	            // TODO Auto-generated catch block  
	            e.printStackTrace();  
	        }  
	    }  
     /** 
      * 获取现在时间 
	      *  
      * @return 返回时间类型 yyyy-MM-dd HH:mm:ss 
      */  
    public static String getNowDate() {  
	          Date currentTime = new Date();  
          SimpleDateFormat formatter;   
	            formatter = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss");   
            String ctime = formatter.format(currentTime);   
	          return ctime;  
	         }  
	}

执行类:

public class ScheduledThreadPoolTest {

		public static void main(String[] args) {  
			        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);  
			         try {  
			        //schedule to run after sometime  
	        System.out.println("Current Time = "+getNowDate());  
			        for(int i=0; i<3; i++){  
		            Thread.sleep(1000);  
			            WorkerThread worker = new WorkerThread();  
			            //延迟10秒后执行  
			            scheduledThreadPool.schedule(worker, 10, TimeUnit.SECONDS);  
			        }  
			            Thread.sleep(3000);  
		        } catch (InterruptedException e) {  
		            e.printStackTrace();  
		        scheduledThreadPool.shutdown();  
		        while(!scheduledThreadPool.isTerminated()){  
		            //wait for all tasks to finish  
		        }  
		        System.out.println("Finished all threads");  
		    }		/**
		 * 获取现在时间
		 * 
		 * @return 返回时间类型 yyyy-MM-dd HH:mm:ss
		 */
		public static String getNowDate() {
			Date currentTime = new Date();
			SimpleDateFormat formatter;
			formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
			String ctime = formatter.format(currentTime);
			return ctime;
		}
	}

 

线程任务确实在10秒延迟后才开始执行。这就是schedule()方法的使用。下面我们再介绍2个可用于周期性执行任务的方法。

 

 

scheduleAtFixedRate方法的作用是预定在初始的延迟结束后,周期性地执行给定的任务,周期长度为period,其中initialDelay为初始延迟。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

scheduleWithFixedDelay方法的作用是预定在初始的延迟结束后周期性地执行给定任务,在一次调用完成和下一次调用开始之间有长度为delay的延迟,其中initialDelay为初始延迟(简单说是是等上一个任务结束后,在等固定的时间,然后执行。即:执行完上一个任务后再执行)。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit); 

实例代码:

public class ScheduledTask {
		public ScheduledThreadPoolExecutor se = new ScheduledThreadPoolExecutor(
				5);

		public static void main(String[] args) {
			new ScheduledTask();
		}

		public void fixedPeriodSchedule() {
			// 设定可以循环执行的runnable,初始延迟为0,这里设置的任务的间隔为5秒
			for (int i = 0; i < 5; i++) {
				se.scheduleAtFixedRate(new FixedSchedule(), 0, 5,
						TimeUnit.SECONDS);
			}
		}

		public ScheduledTask() {
			fixedPeriodSchedule();
		}

		class FixedSchedule implements Runnable {
			public void run() {
				System.out.println("当前线程:" + Thread.currentThread().getName()
						+ "  当前时间:" + new Date(System.currentTimeMillis()));
			}
		}
	}

运行结果:

至于scheduleWithFixedDelay方法,大家就把代码稍微修改一下执行试试就行,这里就不重复了。而SingleThreadScheduledExecutor的使用的方法基本是类似,只不过是单线程罢了,这里也不再描述了。

如何捕获线程池中线程运行时的异常

主要有下面几个解决方案

  • 在我们提供的Runnable的run方法中捕获任务代码可能抛出的所有异常,包括未检测异常
  • 使用ExecutorService.submit执行任务,利用返回的Future对象的get方法接收抛出的异常,然后进行处理
  • 重写ThreadPoolExecutor.afterExecute方法,处理传递到afterExecute方法中的异常
  • 为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常 (只能够捕获execute()方法)

线程代码不能抛出任何checked异常。所有的线程中的checked异常都只能被线程本身消化掉。 这样本身也是符合线程的设计理念的,线程本身就是被看作独立的执行片断,它应该对自己负责,所以由它来消化所有的checked异常是很正常的。

但是,线程代码中是可以抛出错误(Error)和运行级别异常(RuntimeException)的。Error俺们可以忽略,因为通常Error是应该留给vm的,而RuntimeException确是比较正常的,如果在运行过程中满足了某种条件导致线程必须中断,可以选择使用抛出运行级别异常来处理,当线程代码抛出运行级别异常之后,线程会中断。这点java中解释得很清楚:
@see Thread
All threads that are not daemon threads have died, either by returning from the call to the run method or “by throwing an exception that propagates beyond the run method”.
但是对于invoke此线程的主线程会产生什么影响呢?主线程不受这个影响,不会处理这个RuntimeException,而且根本不能catch到这个异常。会继续执行自己的代码 :)
所以得到结论:线程方法的异常只能自己来处理。

 

1.通过给thread设置一个UncaughtExceptionHandler,可以确保在该线程出现异常时能通过回调UncaughtExceptionHandler接口的public void uncaughtException(Thread t, Throwable e) 方法来处理异常,

@FunctionalInterface
public interface UncaughtExceptionHandler {
    /**
     * Method invoked when the given thread terminates due to the
     * given uncaught exception.
     * <p>Any exception thrown by this method will be ignored by the
     * Java Virtual Machine.
     * @param t the thread
     * @param e the exception
     */
    void uncaughtException(Thread t, Throwable e);
}

UncaughtExceptionHandler是一个FunctionalInterface ,只有一个抽象方法,该回调接口会被Thread中的dispatchUncaughtException调用

/**
 * Dispatch an uncaught exception to the handler. This method is
 * intended to be called only by the JVM.
 */
private void dispatchUncaughtException(Throwable e) {
    getUncaughtExceptionHandler().uncaughtException(this, e);
}

当线程在运行过程中出现异常时,JVM会调用dispatchUncaughtException方法,该方法会将对应的线程实例以及异常信息传递给回调接口

如在使用线程池的时候,我们可以通过自定义的线程工厂来实现处理异常,具体代码如下:

//自定义线程工厂类,创建线程的时候设置线程的UncaughtExeceptionHandler即可
public class HandlerThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread( r,"Thread" + threadNumber.getAndIncrement());
        t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.out.println("线程名字===="+t.getName());
                System.out.println("捕获异常" + e.toString());
            }
        });
        return t;
    }

}

在使用线程池的时候,传入我们自定义的ThreadFactory
public class ExeceptionTest implements Runnable {
    @Override
    public void run() {
        throw new RuntimeException("抛异常");

    }


    public static void main(String[] args){
        //创建线程池 指定线程池创建线程的 ThreadFactory 并设置线程名字
        ExecutorService service = Executors.newCachedThreadPool(new HandlerThreadFactory());
        service.execute(new ExeceptionTest());

    }

}

执行结果:
线程名字====Thread1
捕获异常java.lang.RuntimeException: 抛异常

2.直接重写线程池中的protected void afterExecute(Runnable r, Throwable t) { }方法

  • 在我们提供的Runnable的run方法中捕获任务代码可能抛出的所有异常,包括未检测异常
  • 使用ExecutorService.submit执行任务,利用返回的Future对象的get方法接收抛出的异常,然后进行处理
  • 重写ThreadPoolExecutor.afterExecute方法,处理传递到afterExecute方法中的异常
  • 为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常 (不推荐)
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_37598682/article/details/80166017

智能推荐

解决Mac下MX4手机无法连接adb问题之解决方案-程序员宅基地

文章浏览阅读46次。一般的android连接mac 很方便不用安装驱动就可以啦,可是不知道为什么特殊情况下有的android手机(小米2,华为等)就是连接不上,下来就说说特殊情况下如何连接。使用USB连接安卓手机后可以做2件事情:1.关于本机-->更多信息->概系统览->系统报告->usb->你所连接的device-->供应商ID(Vendor ID)2..打开终端,输..._macpro adb连接mix4

老花眼:男女的“更年期”-程序员宅基地

文章浏览阅读671次。What do you think, how many operations can be done on one eye? A clinical case of one of my patients confirms that more than 20 operations of various kinds are not the limit. Although, no doubt, thi..._更年期 眼压高

MongoDB索引详解_mongodb索引的数据结构-程序员宅基地

文章浏览阅读1.2k次,点赞15次,收藏9次。MongoDB索引详解_mongodb索引的数据结构

SWAN之ikev2/acert-inline测试_ikev2测试向量-程序员宅基地

文章浏览阅读369次。本测试中远程用户(roadwarrior)carol和dave与网关moon建立连接。认证方式基于X.509证书,为了对远程用户进行授权,moon网关期望用户在IKEv2报文的CERT载荷中带有属性证书。carol主机具有合法的证书,但是dave提供的是两个无效属性证书:一个证书不是用于sales组;另外一个是由已过期的AA所签发。以下启动ikev2/acert-inline测试用例,注意在启动..._ikev2测试向量

威雅学校:2024威雅校友英国首聚!共同编织校友网络,共创威雅美好未来-程序员宅基地

文章浏览阅读826次,点赞24次,收藏20次。校长先生还表示:“孩子们的热情和激情充分证明了在常州威雅求学之际所获得的人生观和价值观,影响是十分深远的,我很高兴地得知他们充分拥抱了‘终身学习’的教育理念,很高兴地听到他们计划在不久的将来创立自己的企业,学习更多的语言,继续开展研究生阶段的学习,或申请攻读博士。我们可以确信——这群杰出的年轻人一定会拥有无比光明的未来,尤其在看到他们可以在不同的语言之间自如地转换,比如与服务员用粤语交谈,用普通话谈论菜肴的可口和美味,随即迅速转换为英语,描述目前的住宿情况和个人烹饪技能的时候。

JS对象详解_js撖寡情-程序员宅基地

文章浏览阅读1w次,点赞8次,收藏37次。JS对象详解js的对象是什么?js的对象类型有哪些?具体实例实例是什么?ECMA-262对JS对象的定义:属性的无序集合,每个属性存放一个原始值、对象或函数对象是无特定顺序的值的数组对象是一种特殊的数据类型,可以包含多个成员对象成员:Object=Property+Method属性(Property):封装对象的数据,表示与对象有关的值;对象名、属性名方法(Method):封装对象的行为,表示对象可以执行的行为或可以完成的功能;对象名、方法名JS的对象类型:内部对象:原生对象/内置对象_js撖寡情

随便推点

微信小程序蓝牙功能_微信小程序蓝牙连接成功之后-程序员宅基地

文章浏览阅读1.3k次,点赞4次,收藏10次。wx.openSetting(Object object) | 微信开放文档 (qq.com)微信官方文档小程序相关API。_微信小程序蓝牙连接成功之后

【Unity3D小技巧】Unity3D中Animation和Animator动画的播放、暂停、倒放控制_unity 动画倒放-程序员宅基地

文章浏览阅读1.6w次,点赞23次,收藏116次。在日常开发中,常常会遇到要控制动画的播放、暂停和倒放的情况。这篇文章就总结一下,Animation和Animator动画播放系统的控制播放、暂停、倒放的代码。首先,来了解一下Animation和Animator的区别和联系。_unity 动画倒放

vue + el-calendar 日历考勤对接后台数据_el-calendar跟后台返回数据结合-程序员宅基地

文章浏览阅读999次。HTML部分<template> <div class="container"> <!-- <el-row> <el-col :span="24" style="margin-bottom: 20px;text-align: right;"> <el-date-picker v-model="queryParams.month" type="month" placeholder="请选择月份" valu_el-calendar跟后台返回数据结合

(Java)面向对象编程六大基本原则_java 面向对象 6大原则 详解-程序员宅基地

文章浏览阅读1.9k次。《Android源码设计模式解析与实战》面向对象六大原则_java 面向对象 6大原则 详解

关于Word中使用公式3.0编辑的中括号无法完全显示的解决办法_word中括号公式显示不完全-程序员宅基地

文章浏览阅读5.1k次,点赞6次,收藏5次。在Word中,使用公式编辑器3.0输入中括号的时候,会出现中括号的上半部分没办法完全显示的问题,也就是下图所示的情况。至于为什么会出现这个情况,我也不清楚,以前在打公式的时候基本上没出现过,后来慢慢出现了,网上有说改行距的,我试过了,这是公式本身的尺寸限制,改行距不可以,如果在Word里面显示不了的话,打印出来肯定也是不行的,现在说明一种能解决这个问题的方法。1. 双击公式,进入公..._word中括号公式显示不完全

滤除50HZ工频干的滤波电路及其仿真_50hz工频干扰的消除电路-程序员宅基地

文章浏览阅读6.1k次,点赞4次,收藏28次。工频干扰:工频干扰是由电力系统引起的50HZ的正弦波对测量过程的干扰。实现对50HZ正弦波的滤波可以采用带阻滤波器(陷波器),电路图及其交流分析如下:仿真电路图:仿真效果图:..._50hz工频干扰的消除电路

推荐文章

热门文章

相关标签