Spring TaskScheduler实现定时任务分析

Spring TaskScheduler实现定时任务分析

Scroll Down

实现类

实际上还是ThreadPoolTaskScheduler的方法,只是封装了一层更适合项目使用

@Component
public class Schedule {
	@Autowired
	private TaskScheduler taskScheduler;

	/**
	 * 每x毫秒钟执行(如果时间已过立即执行一次)
	 * @param task 任务
	 * @param startSeconds 执行周期(毫秒)
	 */
	public void addEveryMillisecond(Runnable task, long startSeconds) {
		taskScheduler.scheduleAtFixedRate(task, startSeconds);
	}

	/**
	 * 每x秒钟执行(如果时间已过立即执行一次)
	 * @param task 任务
	 * @param startSeconds 执行周期(秒)
	 */
	public void addEverySecond(Runnable task, int startSeconds) {
		int millSeconds = startSeconds * TimeConstant.ONE_SECOND_MILLISECOND;
		Date startDate = DateUtils.getDelayDate(millSeconds, TimeUnit.MILLISECONDS);
		taskScheduler.scheduleAtFixedRate(task, startDate, millSeconds);
	}

	/**
	 * 每x秒钟执行(如果时间已过立即执行一次)
	 * @param task 任务
	 * @param startSeconds 延时启动(秒) 
	 * @param intervalSeconds 执行周期(秒)
	 */
	public void addEverySecond(Runnable task, int startSeconds, int intervalSeconds) {
		int millSeconds = intervalSeconds * TimeConstant.ONE_SECOND_MILLISECOND;
		Date startDate = DateUtils.getDelayDate(startSeconds * TimeConstant.ONE_SECOND_MILLISECOND, TimeUnit.MILLISECONDS);
		taskScheduler.scheduleAtFixedRate(task, startDate, millSeconds);
	}

	/**
	 * 每x分钟执行 (如果时间已过立即执行一次)
	 * @param task			runnable对象
	 * @param startMinute	执行周期时间(分钟)
	 */
	public void addEveryMinute(Runnable task, int startMinute) {
		int millSeconds = startMinute * 60 * TimeConstant.ONE_SECOND_MILLISECOND;
		Date startDate = DateUtils.getDelayDate(millSeconds, TimeUnit.MILLISECONDS);
		taskScheduler.scheduleAtFixedRate(task, startDate, millSeconds);
	}

	/**
	 * 每x分钟执行(整数倍时间)
	 * @param task			runnable对象
	 * @param startMinute	执行周期时间(分钟) 
	 */
	public void addEveryMinuteZeroStart(Runnable task, int startMinute) {
		int millSeconds = startMinute * 60 * TimeConstant.ONE_SECOND_MILLISECOND;
		Date startDate = DateUtils.getDelayMinuteDate(startMinute);
		taskScheduler.scheduleAtFixedRate(task, startDate, millSeconds);
	}

	/**
	 * 每小时整点触发(每天24次) 重复执行
	 * @param task	任务
	 */
	public void addEveryHour(Runnable task) {
		long oneHourMillisecond = TimeConstant.ONE_HOUR_MILLISECOND;
		long nextHourTime = TimeUtils.getNextHourTime();
		// 调度首次触发时间为下个小时整点+1000ms(防止在前一个小时的59秒就开始执行任务)
		Date startDate = new Date(nextHourTime + 1000);

		taskScheduler.scheduleAtFixedRate(task, startDate, oneHourMillisecond);
	}

	/**
	 * 
	 * 每天x点执行.(每天一次) (如果时间已过立即执行一次),然后延迟一天, 重复执行
	 * @param task
	 * @param hour  1-24 小时定时执行
	 */
	public void addFixedTime(Runnable task, int hour) {
		if (hour == 0) {
			hour = 24;
		}
		addFixedTime(task, hour, 0, 0);
	}

	/**
	 * 每天x点执行.(每天一次) (如果时间已过立即执行一次),然后延迟一天, 重复执行
	 * @param task
	 * @param hour
	 * @param minutes
	 * @param seconds
	 */
	public void addFixedTime(Runnable task, int hour, int minutes, int seconds) {
		if (hour == 0) {
			hour = 24;
		}
		long oneDayMillisecond = TimeConstant.ONE_DAY_MILLISECOND;
		Calendar c = Calendar.getInstance();
		c.set(Calendar.HOUR_OF_DAY, hour);
		c.set(Calendar.MINUTE, minutes);
		c.set(Calendar.SECOND, seconds);
		c.set(Calendar.MILLISECOND, 0);
		// 如果调度时间小于当前时间,则调度时间往后延时一天
		if (c.getTimeInMillis() < System.currentTimeMillis()) {
			c.setTimeInMillis(c.getTimeInMillis() + oneDayMillisecond);
		}
		Date startDate;
		if (hour == 24 && minutes == 0 && seconds == 0) {
			// 对于跨天的任务延时2秒执行,防止在前一天的59秒就开始执行任务
			startDate = new Date(TimeUtils.setFixTime(hour) + 2000);
		} else {
			// 普通定时任务延时1秒执行
			startDate = new Date(c.getTimeInMillis() + 1000);
		}
		taskScheduler.scheduleAtFixedRate(task, startDate, oneDayMillisecond);
	}

	/**
	 * 延迟执行 
	 * @param task 任务
	 * @param seconds 延迟时间(秒)
	 */
	public void addDelaySeconds(Runnable task, int seconds) {
		long millSeconds = TimeUnit.SECONDS.toMillis(seconds);
		taskScheduler.schedule(task, new Date(System.currentTimeMillis() + millSeconds));
	}

	/**
	 * 定时任务
	 * @param task
	 * @param time
	 */
	public void addDelayTask(Runnable task, long time) {
		taskScheduler.schedule(task, new Date(time));
	}
}

使用方法

新建定时任务扫描配置文件,使用起来很简单,schedule调度方法,设置执行方法类和时间

	/**
	 * 扫描配置文件变更间隔(毫秒)
	 */
	@Autowired(required = false)
	@Qualifier("dataconfig.flush_time")
	private long flushTime = 10000l;

	@Autowired
	private Schedule schedule;

	private static File dir;

	@Override
	public void afterPropertiesSet() throws Exception {
		checkFolderExist();
		dir = new File(ClassLoader.getSystemResource("").getPath() + path);
		schedule.addEveryMillisecond(new Runnable() {
			@Override
			public void run() {
				reloadConfig();
			}
		}, flushTime);
	}

ThreadPoolTaskScheduler实现

	@Override
	public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) {
		ScheduledExecutorService executor = getScheduledExecutor();
		try {
			return executor.scheduleAtFixedRate(errorHandlingTask(task, true), 0, period, TimeUnit.MILLISECONDS);
		}
		catch (RejectedExecutionException ex) {
			throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
		}
	}

ScheduledExecutorService

ScheduledThreadPoolExecutor实现

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

ScheduledFutureTask重复任务

任务封装成ScheduledFutureTask(sft)对象
delayedExecute替换(sft)为RunnableScheduledFuture(t)执行方法,设置周期任务(sft)重复任务为(t)

        /**
         * Creates a periodic action with given nano time and period.
         */
        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;//下次执行任务的时间
            this.period = period;//任务执行的周期
            this.sequenceNumber = sequencer.getAndIncrement();
        }

delayedExecute执行任务

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())//当前线程池是否已执行
            reject(task);//拒绝策略
        else {
            super.getQueue().add(task);////加入任务执行队列
            if (isShutdown() &&//线程池是否关闭&&是否可以执行定期任务&&移除任务
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))//移除任务,终止线程池
                task.cancel(false);//以不可中断方式执行完成执行中的调度任务

            else
                ensurePrestart();//预先启动工作线程确保执行
        }
    }

ensurePrestart确保执行

void ensurePrestart() {
        int wc = workerCountOf(ctl.get());//获取有效线程数
        //添加一个任务为空的线程,执行时都会从任务队列中获取任务
        if (wc < corePoolSize)//有效线程数小于核心线程数
            addWorker(null, true);//有效线程最大值为核心线程数
        else if (wc == 0)//有效线程数为0
            addWorker(null, false);//有效线程最大值为线程池中允许存在的最大线程数
    }

remove移除任务

public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);//移除任务
        tryTerminate(); //尝试终止线程池
        return removed;
    }