亚洲必赢手机ScheduledThreadPoolExecutor详解。ScheduledThreadPoolExecutor详解。

by admin on 2018年10月5日

       本文主要分为两个部分,第一部分首先会见指向ScheduledThreadPoolExecutor进行简短的牵线,并且会介绍其重要性API的使办法,然后介绍了彼动时的注意点,第二局部则根本对ScheduledThreadPoolExecutor的兑现细节进行介绍。

       本文主要分为两只有,第一有些首先会见针对ScheduledThreadPoolExecutor进行简易的介绍,并且会介绍其根本API的行使方式,然后介绍了那以时之注意点,第二片则根本针对ScheduledThreadPoolExecutor的贯彻细节进行介绍。

1. 用到简介

       ScheduledThreadPoolExecutor是一个使用线程池执行定时任务的接近,相较于Java被提供的别一个执行定时任务之类Timer,其要有如下两单亮点:

  • 采用多线程执行任务,不用操心任务尽时间过长使造成任务相互阻塞的景,Timer是单线程执行的,因而会冒出是问题;
  • 无须操心任务执行进程中,如果线程失活,其会新建线程执行任务,Timer类的单线程挂掉后是勿会见再度创设线程执行后续任务的。

       除去上述两单亮点外,ScheduledThreadPoolExecutor还提供了非常灵活的API,用于执行任务。其职责的履行政策要分为两要命类:①以必延迟之后就实行同样次于有任务;②在早晚延迟之后周期性的实践某任务。如下是那要API:

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay, long period, TimeUnit unit);

       上述四个章程中,第一单同第二单道属于第一好像,即于delay指定的延期之后执行第一独参数所指定的职责,区别在,第二个方法执行后会出返值,而首先只法子执行下是未曾返回值的。第三独与季个点子则属第二类似,即在第二单参数(initialDelay)指定的时间之后开周期性的实践任务,执行周期间隔也老三个参数指定的时日,但是及时简单单艺术的区别在第三只方式执行任务的间距是一贯的,无论上一个任务是否履就,而第四单办法的执行时间隔是不定点的,其会当周期任务之达成一个任务尽就后才开始算计时,并于指定时间距离之后才起实施任务。如下是使scheduleWithFixedDelay()和scheduleAtFixedRate()方法修的测试用例:

public class ScheduledThreadPoolExecutorTest {
  private ScheduledThreadPoolExecutor executor;
  private Runnable task;

  @Before
  public void before() {
    executor = initExecutor();
    task = initTask();
  }

  private ScheduledThreadPoolExecutor initExecutor() {
    return new ScheduledThreadPoolExecutor(2);;
  }

  private Runnable initTask() {
    long start = System.currentTimeMillis();
    return () -> {
      print("start task: " + getPeriod(start, System.currentTimeMillis()));
      sleep(SECONDS, 10);
      print("end task: " + getPeriod(start, System.currentTimeMillis()));
    };
  }

  @Test
  public void testFixedTask() {
    print("start main thread");
    executor.scheduleAtFixedRate(task, 15, 30, SECONDS);
    sleep(SECONDS, 120);
    print("end main thread");
  }

  @Test
  public void testDelayedTask() {
    print("start main thread");
    executor.scheduleWithFixedDelay(task, 15, 30, SECONDS);
    sleep(SECONDS, 120);
    print("end main thread");
  }

  private void sleep(TimeUnit unit, long time) {
    try {
      unit.sleep(time);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  private int getPeriod(long start, long end) {
    return (int)(end - start) / 1000;
  }

  private void print(String msg) {
    System.out.println(msg);
  }
}

       可以见到,上述两单测试用例代码块为主是一律的,区别在于第一独用例调用的是scheduleAtFixedRate()方法,而第二单用例调用的凡scheduleWithFixedDelay()。这里少只用例都是设置的当推15s后每个30s执行同一潮指定的任务,而拖欠任务履行时长为10s。如下分别是立简单只测试用例的执行结果:

start main thread
start task: 15
end task: 25
start task: 45
end task: 55
start task: 75
end task: 85
start task: 105
end task: 115
end main thread

start main thread
start task: 15
end task: 25
start task: 55
end task: 65
start task: 95
end task: 105
end main thread

      对比上述执行结果可以看看,对于scheduleAtFixedRate()方法,其每次执行任务的始时距离都为稳不换的30s,与职责执行时长无关,而于scheduleWithFixedDelay()方法,其每次执行任务之初步时间距离都为上次职责执行时长指定的岁月距离。

       这里关于ScheduledThreadPoolExecutor的采取有三接触需要说明如下:

  • ScheduledThreadPoolExecutor继承自ThreadPoolExecutor(ThreadPoolExecutor详解),因而也起连续而来之execute()和submit()方法,但是ScheduledThreadPoolExecutor重写了及时简单只主意,重写的主意是直开立两个就实施并且就实行同样蹩脚的职责;
  • ScheduledThreadPoolExecutor使用ScheduledFutureTask封装每个需要实行的任务,而任务都是加大入DelayedWorkQueue队列中的,该队是一个采取数组实现之预队列,在调用ScheduledFutureTask::cancel()方法时,其见面根据removeOnCancel变量的安装来确认是不是要拿当前任务真正的于队列中移除,而无单独是标识其为都删除状态;
  • ScheduledThreadPoolExecutor提供了一个钩子方法decorateTask(Runnable,
    RunnableScheduledFuture)用于对实行的职责进展装点,该法第一独参数是调用方传入的天职实例,第二个参数则是行使ScheduledFutureTask对用户传入任务实例进行打包后的实例。这里要留意的凡,在ScheduledFutureTask对象中有一个heapIndex变量,该变量用于记录时实例处于队列数组中之下标位置,该变量可以用诸如contains(),remove()等办法的时光复杂度从O(N)降低至O(logN),因而效率提升是比大的,但是只要此用户还写decorateTask()方法封装了班中的天职实例,那么heapIndex的优化就未在了,因而这里强烈建议是尽量不要还写该措施,或者重写时为要复用ScheduledFutureTask类。

1. 用到简介

       ScheduledThreadPoolExecutor是一个使用线程池执行定时任务之好像,相较于Java被提供的旁一个行定时任务之类Timer,其关键出如下两单亮点:

  • 下多线程执行任务,不用顾虑任务尽时了长如果造成任务相互阻塞的情,Timer是单线程执行之,因而会冒出是问题;
  • 永不操心任务执行过程遭到,如果线程失活,其见面新建线程执行任务,Timer类的单线程挂掉后是不会见再也创设线程执行后续任务之。

       除去上述两单优点外,ScheduledThreadPoolExecutor还提供了非常灵活的API,用于实践任务。其职责之行策略要分为两格外接近:①在早晚延迟之后才实行同一软之一任务;②于必然延迟之后周期性的履行某任务。如下是其利害攸关API:

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay, long period, TimeUnit unit);

       上述四单办法被,第一单跟亚只点子属于第一看似,即当delay指定的推之后执行第一个参数所指定的任务,区别在,第二只措施执行后会生出返值,而首先个方式执行下是没返回值的。第三单和季只法子则属第二好像,即当亚独参数(initialDelay)指定的工夫过后开周期性的执行任务,执行周期间隔为老三只参数指定的光阴,但是及时片独道的区分在第三个措施执行任务的距离是定位的,无论上一个任务是否执行好,而第四独方法的实行时间隔是休定点的,其会面在周期任务的上一个任务执行好之后才起算时,并当指定时间间隔之后才开履行任务。如下是采用scheduleWithFixedDelay()和scheduleAtFixedRate()方法编的测试用例:

public class ScheduledThreadPoolExecutorTest {
  private ScheduledThreadPoolExecutor executor;
  private Runnable task;

  @Before
  public void before() {
    executor = initExecutor();
    task = initTask();
  }

  private ScheduledThreadPoolExecutor initExecutor() {
    return new ScheduledThreadPoolExecutor(2);;
  }

  private Runnable initTask() {
    long start = System.currentTimeMillis();
    return () -> {
      print("start task: " + getPeriod(start, System.currentTimeMillis()));
      sleep(SECONDS, 10);
      print("end task: " + getPeriod(start, System.currentTimeMillis()));
    };
  }

  @Test
  public void testFixedTask() {
    print("start main thread");
    executor.scheduleAtFixedRate(task, 15, 30, SECONDS);
    sleep(SECONDS, 120);
    print("end main thread");
  }

  @Test
  public void testDelayedTask() {
    print("start main thread");
    executor.scheduleWithFixedDelay(task, 15, 30, SECONDS);
    sleep(SECONDS, 120);
    print("end main thread");
  }

  private void sleep(TimeUnit unit, long time) {
    try {
      unit.sleep(time);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  private int getPeriod(long start, long end) {
    return (int)(end - start) / 1000;
  }

  private void print(String msg) {
    System.out.println(msg);
  }
}

       可以视,上述两只测试用例代码块为主是同样的,区别在第一个用例调用的凡scheduleAtFixedRate()方法,而第二单用例调用的是scheduleWithFixedDelay()。这里少独用例都是安的当缓15s晚每个30s执行同一不行指定的天职,而该任务尽时长为10s。如下分别是即时半独测试用例的实行结果:

start main thread
start task: 15
end task: 25
start task: 45
end task: 55
start task: 75
end task: 85
start task: 105
end task: 115
end main thread

start main thread
start task: 15
end task: 25
start task: 55
end task: 65
start task: 95
end task: 105
end main thread

      对比上述执行结果可以见到,对于scheduleAtFixedRate()方法,其每次执行任务之开端时间距离都为固定不变换的30s,与任务执行时长无关,而于scheduleWithFixedDelay()方法,其每次执行任务之起时间距离都为上次职责执行时累加指定的日间隔。

       这里关于ScheduledThreadPoolExecutor的以有三接触需要征如下:

  • ScheduledThreadPoolExecutor继承自ThreadPoolExecutor(ThreadPoolExecutor详解),因而为出延续而来之execute()和submit()方法,但是ScheduledThreadPoolExecutor重写了当时点儿只方式,重写的法门是直创造两独就执行并且就实行同一次等的天职;
  • ScheduledThreadPoolExecutor使用ScheduledFutureTask封装每个需要实行的任务,而任务还是加大入DelayedWorkQueue队列中之,该队是一个采取数组实现的先期队列,在调用ScheduledFutureTask::cancel()方法时,其会面根据removeOnCancel变量的安装来确认是否要用当前任务真正的自队列中移除,而无单单是标识其也曾经去状态;
  • ScheduledThreadPoolExecutor提供了一个钩子方法decorateTask(Runnable,
    RunnableScheduledFuture)用于对履行之任务进行装饰,该法第一单参数是调用方传入的天职实例,第二独参数则是使ScheduledFutureTask对用户传入任务实例进行包装后的实例。这里用留意的是,在ScheduledFutureTask对象吃有一个heapIndex变量,该变量用于记录时实例处于队列数组中的下标位置,该变量可以用诸如contains(),remove()等办法的年华复杂度从O(N)降低到O(logN),因而效率提升是于大之,但是倘若这里用户更写decorateTask()方法封装了排中之任务实例,那么heapIndex的优化就未有了,因而此强烈建议是拼命三郎不要还写该方式,或者重写时也要复用ScheduledFutureTask类。

2. 源码详解

2. 源码详解

2.1 主要性能

       ScheduledThreadPoolExecutor主要出四只属性,分别如下:

private volatile boolean continueExistingPeriodicTasksAfterShutdown;

private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

private volatile boolean removeOnCancel = false;

private static final AtomicLong sequencer = new AtomicLong();
  • continueExistingPeriodicTasksAfterShutdown:用于标识当前Executor对象shutdown时,是否继续执行已经存在被任务队列中之定时任务(调用scheduleAtFixedRate()方法变的任务);
  • executeExistingDelayedTasksAfterShutdown:用于标识当前Executor对象shutdown时,是否继续执行已经在于任务队列中的定时任务(调用scheduleWithFixedDelay()方法变的天职);
  • removeOnCancel:用于标识如果当前任务已经撤回了,是否拿其自任务队列中确确实实的移除,而未就是标识其为除去状态;
  • sequencer:其为一个AtomicLong类型的变量,该变量记录了即任务为创造时凡第几独任务之一个序号,这个序号的首要用以确认当半单任务开始实践时同一时具体哪个任务先实行,比如简单独任务之开端推行时都为1515847881158,那么序号小的天职将先实施。

2.1 主要性能

       ScheduledThreadPoolExecutor主要有四单特性,分别如下:

private volatile boolean continueExistingPeriodicTasksAfterShutdown;

private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

private volatile boolean removeOnCancel = false;

private static final AtomicLong sequencer = new AtomicLong();
  • continueExistingPeriodicTasksAfterShutdown:用于标识当前Executor对象shutdown时,是否继续执行已经存在让任务队列中之定时任务(调用scheduleAtFixedRate()方法变的任务);
  • executeExistingDelayedTasksAfterShutdown:用于标识当前Executor对象shutdown时,是否继续执行已经有让任务队列中之定时任务(调用scheduleWithFixedDelay()方法变的天职);
  • removeOnCancel:用于标识如果当前任务已经撤了,是否将该从任务队列中确实的移除,而非特是标识其为除去状态;
  • sequencer:其也一个AtomicLong类型的变量,该变量记录了当下任务为创造时凡第几单任务的一个序号,这个序号的重要用以确认当半独任务开始实施时一模一样时具体哪个任务先实行,比如简单单任务之开实行时都也1515847881158,那么序号小的天职将优先实施。

2.2 ScheduledFutureTask

       在ScheduledThreadPoolExecutor中,主要采用ScheduledFutureTask封装需要实践之职责,该类的要声明如下:

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {

  private final long sequenceNumber;    // 记录当前实例的序列号
  private long time;    // 记录当前任务下次开始执行的时间

  // 记录当前任务执行时间间隔,等于0则表示当前任务只执行一次,大于0表示当前任务为fixedRate类型的任务,
  // 小于0则表示其为fixedDelay类型的任务
  private final long period;

  RunnableScheduledFuture<V> outerTask = this;  // 记录需要周期性执行的任务的实例
  int heapIndex;    // 记录当前任务在队列数组中位置的下标

  ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();  // 序号在创建任务实例时指定,且后续不会变化
  }

  public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
  }

  // 各个任务在队列中的存储方式是一个基于时间和序号进行比较的优先队列,当前方法定义了优先队列中两个
  // 任务执行的先后顺序。这里先对两个任务开始执行时间进行比较,时间较小者优先执行,若开始时间相同,
  // 则比较两个任务的序号,序号小的任务先执行
  public int compareTo(Delayed other) {
    if (other == this)
      return 0;
    if (other instanceof ScheduledFutureTask) {
      ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
      long diff = time - x.time;
      if (diff < 0)
        return -1;
      else if (diff > 0)
        return 1;
      else if (sequenceNumber < x.sequenceNumber)
        return -1;
      else
        return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
  }

  public boolean isPeriodic() { // 判断是否为周期性任务
    return period != 0;
  }

  // 当前任务执行之后,会判断当前任务是否为周期性任务,如果为周期性任务,那么就调用当前方法计算
  // 当前任务下次开始执行的时间。这里如果当前任务是fixedRate类型的任务(p > 0),那么下次执行时间
  // 就是此次执行的开始时间加上时间间隔,如果当前任务是fixedDelay类型的任务(p < 0),那么下次执行
  // 时间就是当前时间(triggerTime()方法会获取系统当前时间)加上任务执行时间间隔。可以看到,定频率
  // 和定延迟的任务的执行时间区别就在当前方法中进行了指定,因为调用当前方法时任务已经执行完成了,
  // 因而triggerTime()方法中获取的时间就是任务执行完成之后的时间点
  private void setNextRunTime() {
    long p = period;
    if (p > 0)
      time += p;
    else
      time = triggerTime(-p);
  }

  // 取消当前任务的执行,super.cancel(boolean)方法也即FutureTask.cancel(boolean)方法。该方法传入
  // true表示如果当前任务正在执行,那么立即终止其执行;传入false表示如果当前方法正在执行,那么等待其
  // 执行完成之后再取消当前任务。
  public boolean cancel(boolean mayInterruptIfRunning) {
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    // 判断是否设置了取消后移除队列中当前任务,是则移除当前任务
    if (cancelled && removeOnCancel && heapIndex >= 0)  
      remove(this);
    return cancelled;
  }

  public void run() {
    boolean periodic = isPeriodic();    // 判断是否为周期性任务
    if (!canRunInCurrentRunState(periodic)) // 判断是否能够在当前状态下执行该任务
      cancel(false);
    else if (!periodic) // 如果能执行当前任务,但是任务不是周期性的,那么就立即执行该任务一次
      ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) { // 是周期性任务,则立即执行当前任务并且重置
      setNextRunTime(); // 在当前任务执行完成后调用该方法计算当前任务下次执行的时间
      reExecutePeriodic(outerTask); // 将当前任务放入任务队列中以便下次执行
    }
  }
}

       在ScheduledFutureTask中,主要发生三独点用强调:

  • 对run()方法的率先独分支,canRunInCurrentRunState()方法的声明如下所示,可以看看,该措施是用于判断当前任务如果也周期性任务,那么其是否允许在shutdown状态下继续执行已经是的周期性任务,是虽然象征即状态下是好尽当前任务的,这里isRunningOrShutdown()方法继承自ThreadPoolExecutor;

    boolean canRunInCurrentRunState(boolean periodic) {
    return isRunningOrShutdown(periodic ?

                             continueExistingPeriodicTasksAfterShutdown :
                             executeExistingDelayedTasksAfterShutdown);
    

    }

  • 每当run()方法的末尾一个if分支中,其首先会履行当前任务,在履行得时才见面调用setNextRunTime()方法设置下次任务履行时,也就是说对于fixedRate和fixedDelay类型的天职还是于这个时间接触才装的,因而虽然fixedRate类型的职责,即使该任务下次执行时较目前时如果早,其也止会于当前任务执行好后旋即施行,而不见面以及当前任务还非执行完时就算行;对于fixedDelay任务虽然无见面满怀于拖欠问题,因为那是盖任务就后的时空接触啊底蕴测算下次执行之时间点;

  • 于run()方法的结尾一个分层中之reExecutePeriodic()方法,其会面以当前任务加入到任务队列中,并且调用父类的ensurePrestart()方法确保发生可用之线程来施行当前任务,如下是拖欠方式的现实性实现:

    void reExecutePeriodic(RunnableScheduledFuture task) {
    if (canRunInCurrentRunState(true)) { // 判断当前任务是否可继续执行

    super.getQueue().add(task); // 将当前任务加入到任务队列中
    if (!canRunInCurrentRunState(true) && remove(task)) // 双检查法判断任务在加入过程中是否取消了
      task.cancel(false);
    else
      ensurePrestart(); // 初始化核心线程等确保任务可以被执行
    

    }
    }

       从ScheduledFutureTask的实现总结来拘禁,当各国创建一个此类实例时,会初始化该类的组成部分首要性能,如下次上马实践之光阴与实行的周期。当某个线程调用该任务,即执行该任务的run()方法时,如果该任务不呢周期性任务,那么执行该任务之后就是非会见有任何的动作,如果该任务吗周期性任务,那么当将当前任务执行了后,还会重置当前任务的状态,并且计算下次实施当前任务的时间,然后拿其推广入行中以便下次执行。

2.2 ScheduledFutureTask

       在ScheduledThreadPoolExecutor中,主要利用ScheduledFutureTask封装需要实践之天职,该类的第一声明如下:

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {

  private final long sequenceNumber;    // 记录当前实例的序列号
  private long time;    // 记录当前任务下次开始执行的时间

  // 记录当前任务执行时间间隔,等于0则表示当前任务只执行一次,大于0表示当前任务为fixedRate类型的任务,
  // 小于0则表示其为fixedDelay类型的任务
  private final long period;

  RunnableScheduledFuture<V> outerTask = this;  // 记录需要周期性执行的任务的实例
  int heapIndex;    // 记录当前任务在队列数组中位置的下标

  ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();  // 序号在创建任务实例时指定,且后续不会变化
  }

  public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
  }

  // 各个任务在队列中的存储方式是一个基于时间和序号进行比较的优先队列,当前方法定义了优先队列中两个
  // 任务执行的先后顺序。这里先对两个任务开始执行时间进行比较,时间较小者优先执行,若开始时间相同,
  // 则比较两个任务的序号,序号小的任务先执行
  public int compareTo(Delayed other) {
    if (other == this)
      return 0;
    if (other instanceof ScheduledFutureTask) {
      ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
      long diff = time - x.time;
      if (diff < 0)
        return -1;
      else if (diff > 0)
        return 1;
      else if (sequenceNumber < x.sequenceNumber)
        return -1;
      else
        return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
  }

  public boolean isPeriodic() { // 判断是否为周期性任务
    return period != 0;
  }

  // 当前任务执行之后,会判断当前任务是否为周期性任务,如果为周期性任务,那么就调用当前方法计算
  // 当前任务下次开始执行的时间。这里如果当前任务是fixedRate类型的任务(p > 0),那么下次执行时间
  // 就是此次执行的开始时间加上时间间隔,如果当前任务是fixedDelay类型的任务(p < 0),那么下次执行
  // 时间就是当前时间(triggerTime()方法会获取系统当前时间)加上任务执行时间间隔。可以看到,定频率
  // 和定延迟的任务的执行时间区别就在当前方法中进行了指定,因为调用当前方法时任务已经执行完成了,
  // 因而triggerTime()方法中获取的时间就是任务执行完成之后的时间点
  private void setNextRunTime() {
    long p = period;
    if (p > 0)
      time += p;
    else
      time = triggerTime(-p);
  }

  // 取消当前任务的执行,super.cancel(boolean)方法也即FutureTask.cancel(boolean)方法。该方法传入
  // true表示如果当前任务正在执行,那么立即终止其执行;传入false表示如果当前方法正在执行,那么等待其
  // 执行完成之后再取消当前任务。
  public boolean cancel(boolean mayInterruptIfRunning) {
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    // 判断是否设置了取消后移除队列中当前任务,是则移除当前任务
    if (cancelled && removeOnCancel && heapIndex >= 0)  
      remove(this);
    return cancelled;
  }

  public void run() {
    boolean periodic = isPeriodic();    // 判断是否为周期性任务
    if (!canRunInCurrentRunState(periodic)) // 判断是否能够在当前状态下执行该任务
      cancel(false);
    else if (!periodic) // 如果能执行当前任务,但是任务不是周期性的,那么就立即执行该任务一次
      ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) { // 是周期性任务,则立即执行当前任务并且重置
      setNextRunTime(); // 在当前任务执行完成后调用该方法计算当前任务下次执行的时间
      reExecutePeriodic(outerTask); // 将当前任务放入任务队列中以便下次执行
    }
  }
}

       在ScheduledFutureTask中,主要有三单点需要强调:

  • 对于run()方法的第一只支行,canRunInCurrentRunState()方法的宣示如下所示,可以看看,该措施是用以判断当前任务如果也周期性任务,那么其是否允许在shutdown状态下继续执行已经在的周期性任务,是虽然象征手上状态下是好实施当前任务的,这里isRunningOrShutdown()方法继承自ThreadPoolExecutor;

boolean canRunInCurrentRunState(boolean periodic) {
  return isRunningOrShutdown(periodic ?
                             continueExistingPeriodicTasksAfterShutdown :
                             executeExistingDelayedTasksAfterShutdown);
}
  • 每当run()方法的终极一个if分支中,其首先会尽当前任务,在实行就时才见面调用setNextRunTime()方法设置下次任务尽时间,也就是说对于fixedRate和fixedDelay类型的职责都是以这个时刻点才装的,因而虽然fixedRate类型的任务,即使该任务下次执行时间比目前时若是早,其也止会在当前任务执行就后立刻实施,而非会见暨当前任务还非实行完时即令尽;对于fixedDelay任务虽然无会见满怀于拖欠问题,因为那个是盖任务完成后底岁月接触呢底蕴测算下次执行的时间点;
  • 对于run()方法的末梢一个分层中的reExecutePeriodic()方法,其会拿当前任务加入到任务队列中,并且调用父类的ensurePrestart()方法确保发生可用之线程来施行当前任务,如下是该方式的具体贯彻:

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
  if (canRunInCurrentRunState(true)) {  // 判断当前任务是否可以继续执行
    super.getQueue().add(task); // 将当前任务加入到任务队列中
    if (!canRunInCurrentRunState(true) && remove(task)) // 双检查法判断任务在加入过程中是否取消了
      task.cancel(false);
    else
      ensurePrestart(); // 初始化核心线程等确保任务可以被执行
  }
}

       从ScheduledFutureTask的实现总结来拘禁,当各国创建一个此类实例时,会初始化该类的有些重点性能,如下次初始施行之辰及施行之周期。当某个线程调用该任务,即实行该任务之run()方法时,如果该任务不也周期性任务,那么执行该任务之后就是不见面来其它的动作,如果该任务也周期性任务,那么以将当前任务执行了后,还见面重置当前任务的状态,并且计算下次实行当前任务的流年,然后用那放入行中以便下次执行。

2.3 DelayedWorkQueue

       DelayedWorkQueue的贯彻与DelayQueue以及PriorityQueue的落实中心相似,形式都也一个先行队列,并且底层是行使堆结构来实现优先队列的功力,在数量存储方达成,其动的是累组来落实。这里DelayedWorkQueue与DelayQueue以及PriorityQueue不同的接触在DelayedWorkQueue中举足轻重囤积ScheduledFutureTask类型的天职,该任务中生一个heapIndex属性保存了即任务在眼前行数组中的职下标,其重要性提升的是对准队列的诸如contains()和remove()等要稳定当前任务位置的方法的效率,时间复杂度可以从O(N)提升至O(logN)。如下是DelayedWorkQueue的兑现代码(这里只排有了此类的基本点性能与及贯彻ScheduledThreadPoolExecutor功能有关的法门,关于如何以数组实现优先队列请读者查阅有关文档):

static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {

  private static final int INITIAL_CAPACITY = 16;   // 数组初始化大小
  private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
  private final ReentrantLock lock = new ReentrantLock();   // 对添加和删除元素所使用的锁
  private int size = 0; // 当前队列中有效任务的个数

  private Thread leader = null; // 执行队列头部任务的线程
  private final Condition available = lock.newCondition();  // 除leader线程外其余线程的等待队列

  // 在对任务进行移动时,判断其是否为ScheduledFutureTask实例,如果是则维护其heapIndex属性
  private void setIndex(RunnableScheduledFuture<?> f, int idx) {
    if (f instanceof ScheduledFutureTask)
      ((ScheduledFutureTask)f).heapIndex = idx;
  }

  private void siftUp(int k, RunnableScheduledFuture<?> key) {/* 省略 */}

  private void siftDown(int k, RunnableScheduledFuture<?> key) {/* 省略 */}

  private int indexOf(Object x) {
    if (x != null) {
      if (x instanceof ScheduledFutureTask) {   // 如果为ScheduledFutureTask则可返回其heapIndex属性
        int i = ((ScheduledFutureTask) x).heapIndex;
        if (i >= 0 && i < size && queue[i] == x)
          return i;
      } else {  // 如果不为ScheduledFutureTask实例,则需要遍历队列查询当前元素的位置
        for (int i = 0; i < size; i++)
          if (x.equals(queue[i]))
            return i;
      }
    }
    return -1;
  }

  public boolean offer(Runnable x) {
    if (x == null)
      throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      int i = size;
      if (i >= queue.length)
        grow(); // 队列容量不足,对其进行扩容
      size = i + 1;
      if (i == 0) { // 如果其为队列第一个元素,则将其放入队列头部
        queue[0] = e;
        setIndex(e, 0);
      } else {  //如果不为第一个元素,则通过堆的上移元素操作移动当前元素至合适的位置
        siftUp(i, e);
      }
      if (queue[0] == e) {  // 如果被更新的是队列头部元素,则更新记录的执行头部任务的线程
        leader = null;
        available.signal();
      }
    } finally {
      lock.unlock();
    }
    return true;
  }

  // 完成从队列拉取元素操作,并且将其从队列中移除
  private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    int s = --size;
    RunnableScheduledFuture<?> x = queue[s];
    queue[s] = null;    // 将队列最尾部的元素置空
    if (s != 0) // 将最后一个元素放入第一个位置,并且将其下推至合适的位置
      siftDown(0, x);   // 这里idx置为0是因为当前方法的入参f都为队列的第一个元素
    setIndex(f, -1);
    return f;
  }

  // 尝试从队列(堆)中获取元素,如果没有元素或者元素的延迟时间还未到则返回空
  public RunnableScheduledFuture<?> poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      RunnableScheduledFuture<?> first = queue[0];
      // 在此处代码控制了当从堆顶拉取元素时,如果元素的延迟时间还未达到,则不返回当前元素
      if (first == null || first.getDelay(NANOSECONDS) > 0)
        return null;
      else
        return finishPoll(first);   // 返回堆顶元素
    } finally {
      lock.unlock();
    }
  }

  // 通过无限for循环获取堆顶的元素,这里take()方法会阻塞当前线程,直至获取到了可执行的任务。
  // 可以看到,在第一次for循环中,如果堆顶不存在任务,则其会加入阻塞队列中,如果存在任务,但是
  // 其延迟时间还未到,那么当前线程会等待该延迟时间长的时间,然后查看任务是否可用,当获取到任务
  // 之后,其会将其从队列中移除,并且唤醒等待队列中其余等待的线程执行下一个任务
  public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
      for (;;) {
        RunnableScheduledFuture<?> first = queue[0];
        if (first == null)
          available.await();    // 堆内没有元素,当前线程进入等待队列中
        else {
          long delay = first.getDelay(NANOSECONDS);
          if (delay <= 0)   // 堆顶元素延迟时间小于0,可立即获取任务
            return finishPoll(first);
          first = null;
          if (leader != null)
            available.await();  // 已经有线程在等待堆顶元素,则当前线程进入等待队列中
          else {
            Thread thisThread = Thread.currentThread();
            leader = thisThread;
            try {
              available.awaitNanos(delay);  // 当前线程等待一定时长后获取任务并执行
            } finally {
              if (leader == thisThread)
                leader = null;
            }
          }
        }
      }
    } finally {
      if (leader == null && queue[0] != null)
        available.signal(); // 当前线程获取完任务之后唤醒等待队列中的下一个线程执行下一个任务
      lock.unlock();
    }
  }
}

       从DelayedWorkQueue的take()和poll()方法可关押出来,对于队列中任务之等时之界定重点是在即时简单单艺术中落实之,如果任务之等候时还不到,那么该办法就会见阻塞线程池中之线程,直至任务可以执行。

2.3 DelayedWorkQueue

       DelayedWorkQueue的实现同DelayQueue以及PriorityQueue的实现核心相似,形式都也一个预先队列,并且底层是运用堆结构来落实优先队列的效果,在数码存储方及,其用的是反复组来兑现。这里DelayedWorkQueue与DelayQueue以及PriorityQueue不同的触及在DelayedWorkQueue中最主要囤积ScheduledFutureTask类型的职责,该任务中有一个heapIndex属性保存了手上任务在现阶段班数组中的职位下标,其重大提升的是本着队列的诸如contains()和remove()等得一定当前任务位置的法的效率,时间复杂度可以从O(N)提升到O(logN)。如下是DelayedWorkQueue的落实代码(这里就排有了此类的重中之重性能与与实现ScheduledThreadPoolExecutor功能有关的不二法门,关于什么下数组实现优先队列请读者查阅有关文档):

static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {

  private static final int INITIAL_CAPACITY = 16;   // 数组初始化大小
  private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
  private final ReentrantLock lock = new ReentrantLock();   // 对添加和删除元素所使用的锁
  private int size = 0; // 当前队列中有效任务的个数

  private Thread leader = null; // 执行队列头部任务的线程
  private final Condition available = lock.newCondition();  // 除leader线程外其余线程的等待队列

  // 在对任务进行移动时,判断其是否为ScheduledFutureTask实例,如果是则维护其heapIndex属性
  private void setIndex(RunnableScheduledFuture<?> f, int idx) {
    if (f instanceof ScheduledFutureTask)
      ((ScheduledFutureTask)f).heapIndex = idx;
  }

  private void siftUp(int k, RunnableScheduledFuture<?> key) {/* 省略 */}

  private void siftDown(int k, RunnableScheduledFuture<?> key) {/* 省略 */}

  private int indexOf(Object x) {
    if (x != null) {
      if (x instanceof ScheduledFutureTask) {   // 如果为ScheduledFutureTask则可返回其heapIndex属性
        int i = ((ScheduledFutureTask) x).heapIndex;
        if (i >= 0 && i < size && queue[i] == x)
          return i;
      } else {  // 如果不为ScheduledFutureTask实例,则需要遍历队列查询当前元素的位置
        for (int i = 0; i < size; i++)
          if (x.equals(queue[i]))
            return i;
      }
    }
    return -1;
  }

  public boolean offer(Runnable x) {
    if (x == null)
      throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      int i = size;
      if (i >= queue.length)
        grow(); // 队列容量不足,对其进行扩容
      size = i + 1;
      if (i == 0) { // 如果其为队列第一个元素,则将其放入队列头部
        queue[0] = e;
        setIndex(e, 0);
      } else {  //如果不为第一个元素,则通过堆的上移元素操作移动当前元素至合适的位置
        siftUp(i, e);
      }
      if (queue[0] == e) {  // 如果被更新的是队列头部元素,则更新记录的执行头部任务的线程
        leader = null;
        available.signal();
      }
    } finally {
      lock.unlock();
    }
    return true;
  }

  // 完成从队列拉取元素操作,并且将其从队列中移除
  private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    int s = --size;
    RunnableScheduledFuture<?> x = queue[s];
    queue[s] = null;    // 将队列最尾部的元素置空
    if (s != 0) // 将最后一个元素放入第一个位置,并且将其下推至合适的位置
      siftDown(0, x);   // 这里idx置为0是因为当前方法的入参f都为队列的第一个元素
    setIndex(f, -1);
    return f;
  }

  // 尝试从队列(堆)中获取元素,如果没有元素或者元素的延迟时间还未到则返回空
  public RunnableScheduledFuture<?> poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      RunnableScheduledFuture<?> first = queue[0];
      // 在此处代码控制了当从堆顶拉取元素时,如果元素的延迟时间还未达到,则不返回当前元素
      if (first == null || first.getDelay(NANOSECONDS) > 0)
        return null;
      else
        return finishPoll(first);   // 返回堆顶元素
    } finally {
      lock.unlock();
    }
  }

  // 通过无限for循环获取堆顶的元素,这里take()方法会阻塞当前线程,直至获取到了可执行的任务。
  // 可以看到,在第一次for循环中,如果堆顶不存在任务,则其会加入阻塞队列中,如果存在任务,但是
  // 其延迟时间还未到,那么当前线程会等待该延迟时间长的时间,然后查看任务是否可用,当获取到任务
  // 之后,其会将其从队列中移除,并且唤醒等待队列中其余等待的线程执行下一个任务
  public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
      for (;;) {
        RunnableScheduledFuture<?> first = queue[0];
        if (first == null)
          available.await();    // 堆内没有元素,当前线程进入等待队列中
        else {
          long delay = first.getDelay(NANOSECONDS);
          if (delay <= 0)   // 堆顶元素延迟时间小于0,可立即获取任务
            return finishPoll(first);
          first = null;
          if (leader != null)
            available.await();  // 已经有线程在等待堆顶元素,则当前线程进入等待队列中
          else {
            Thread thisThread = Thread.currentThread();
            leader = thisThread;
            try {
              available.awaitNanos(delay);  // 当前线程等待一定时长后获取任务并执行
            } finally {
              if (leader == thisThread)
                leader = null;
            }
          }
        }
      }
    } finally {
      if (leader == null && queue[0] != null)
        available.signal(); // 当前线程获取完任务之后唤醒等待队列中的下一个线程执行下一个任务
      lock.unlock();
    }
  }
}

       从DelayedWorkQueue的take()和poll()方法可以拘留出来,对于队列中任务之等待时之限定重点是当及时片独道被贯彻之,如果任务之守候时还未及,那么该方式就见面阻塞线程池中之线程,直至任务可以履。

2.4 scheduleAtFixedRate()和scheduleWithFixedDelay()方法

       前面我们对ScheduledThreadPoolExecutor的基本点性能与重要内部类都进行了详尽的任课,基本上都可以看那是哪实现定时执行任务之意义的,接下去我们要针对客户端可调用的要害方式开展简要介绍,这里scheduleAtFixedRate()和scheduleWithFixedDelay()方法的兑现核心是同的,两独道极其细微之分在ScheduledFutureTask的setNextRunTime()方法的实现,该措施的兑现前面都拓展了讲学,我们这里虽然因为scheduleAtFixedRate()方法的落实为例对拖欠办法进行讲解。如下是拖欠法的现实性实现:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, 
                                              long period, TimeUnit unit) {
  if (command == null || unit == null)
    throw new NullPointerException();
  if (period <= 0)
    throw new IllegalArgumentException();
  ScheduledFutureTask<Void> sft =   // 封装客户端的任务实例
    new ScheduledFutureTask<Void>(command, null, 
                                  triggerTime(initialDelay, unit),unit.toNanos(period));
  RunnableScheduledFuture<Void> t = decorateTask(command, sft); // 对客户端任务实例进行装饰
  sft.outerTask = t;    // 初始化周期任务属性outerTask
  delayedExecute(t);    // 执行该任务
  return t;
}

       从上述代码可以拘留出来,scheduleAtFixedRate()首先对客户端任务实例进行了打包,装饰,并且初始化了包后底职责实例的outerTask属性,最后调用delayedExecute()方法执行任务。如下是delayedExecute()方法的贯彻:

private void delayedExecute(RunnableScheduledFuture<?> task) {
  if (isShutdown())
    reject(task);
  else {
    super.getQueue().add(task); // 添加当前任务到任务队列中
    if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
      task.cancel(false);   // 双检查法再次判断当前线程池是否处于可用状态,不是则移除当前任务
    else
      ensurePrestart(); // 若线程池没有初始化,则进行一些初始化工作
  }
}

       上述方式吧重中之重的尽任务的点子,该措施首先会用任务在到任务队列中,如果线程池已经初始化过,那么该任务就是见面发等的线程执行该任务。在加入到任务队列之后经过对检查法检查线程池是否业已shutdown了,如果是则将该任务由任务队列中移除。如果手上线程池没有shutdown,就调用继承自ThreadPoolExecutor的ensurePrestart()方法,该方法会对线程池进行局部初始化工作,如初始化核心线程,然后挨家挨户线程会调用上述等待队列的take()方法取得任务履行。

2.4 scheduleAtFixedRate()和scheduleWithFixedDelay()方法

       前面我们针对ScheduledThreadPoolExecutor的要性能与重要性内部类都进展了详实的教授,基本上已经得以见见其是安促成定时执行任务的效应的,接下我们任重而道远对客户端好调用的重中之重方法进行简短介绍,这里scheduleAtFixedRate()和scheduleWithFixedDelay()方法的贯彻基本是平等的,两个章程极其微薄之别在于ScheduledFutureTask的setNextRunTime()方法的兑现,该法的兑现前面已经展开了讲课,我们这里虽然因scheduleAtFixedRate()方法的落实为条例针对该方法进行讲解。如下是欠办法的实际贯彻:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, 
                                              long period, TimeUnit unit) {
  if (command == null || unit == null)
    throw new NullPointerException();
  if (period <= 0)
    throw new IllegalArgumentException();
  ScheduledFutureTask<Void> sft =   // 封装客户端的任务实例
    new ScheduledFutureTask<Void>(command, null, 
                                  triggerTime(initialDelay, unit),unit.toNanos(period));
  RunnableScheduledFuture<Void> t = decorateTask(command, sft); // 对客户端任务实例进行装饰
  sft.outerTask = t;    // 初始化周期任务属性outerTask
  delayedExecute(t);    // 执行该任务
  return t;
}

       从上述代码可以扣押下,scheduleAtFixedRate()首先针对客户端任务实例进行了包,装饰,并且初始化了包后的天职实例的outerTask属性,最后调用delayedExecute()方法执行任务。如下是delayedExecute()方法的贯彻:

private void delayedExecute(RunnableScheduledFuture<?> task) {
  if (isShutdown())
    reject(task);
  else {
    super.getQueue().add(task); // 添加当前任务到任务队列中
    if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
      task.cancel(false);   // 双检查法再次判断当前线程池是否处于可用状态,不是则移除当前任务
    else
      ensurePrestart(); // 若线程池没有初始化,则进行一些初始化工作
  }
}

       上述方式吗重要的实践任务的章程,该法首先会用任务在到任务队列中,如果线程池已经初始化过,那么该任务就会见生出等的线程执行该任务。在参加到任务队列之后通过对检查法检查线程池是否业已shutdown了,如果是则将拖欠任务由任务队列中移除。如果手上线程池没有shutdown,就调用继承自ThreadPoolExecutor的ensurePrestart()方法,该方法会对线程池进行局部初始化工作,如初始化核心线程,然后挨家挨户线程会调用上述等待队列的take()方法取得任务执行。

相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图