报警

yanhom2022年6月11日
  • 报警
  • 报警
大约 4 分钟

提示

框架目前提供以下告警功能,每一个告警项都可以独立配置是否开启、告警阈值、告警间隔时间、平台等,具体代码请看 core 模块 notify 包, 告警信息同时会高亮与该项相关的字段。

1.核心参数变更通知

2.线程池活跃度告警

3.队列容量告警

4.拒绝策略告警

5.任务执行超时告警

6.任务排队超时告警

线程池活跃度告警

活跃度 = activeCount / maximumPoolSize

服务启动后会开启一个定时监控任务,每隔一定时间(可配置)去计算线程池的活跃度,达到配置的 threshold 阈值后会触发一次告警,告警间隔内多次触发不会发送告警通知

队列容量告警

容量使用率 = queueSize / queueCapacity

服务启动后会开启一个定时监控任务,每隔一定时间去计算任务队列的使用率,达到配置的 threshold 阈值后会触发一次告警,告警间隔内多次触发不会发送告警通知

拒绝策略告警

/**
 * Do sth before reject.
 * @param executor ThreadPoolExecutor instance
 */
default void beforeReject(ThreadPoolExecutor executor) {
    if (executor instanceof DtpExecutor) {
        DtpExecutor dtpExecutor = (DtpExecutor) executor;
        dtpExecutor.incRejectCount(1);
        Runnable runnable = () -> AlarmManager.doAlarm(dtpExecutor, REJECT);
        AlarmManager.triggerAlarm(dtpExecutor.getThreadPoolName(), REJECT.getValue(), runnable);
    }
}

线程池线程数达到配置的最大线程数,且任务队列已满,再提交任务会触发拒绝策略。DtpExecutor 线程池用到的 RejectedExecutionHandler 是经过动态代理包装过的,在执行具体的拒绝策略之前会执行RejectedAware类beforeReject()方法,此方法会去做拒绝数量累加(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值,则会触发一次告警通知(同时重置周期累加值为0及上次告警时间为当前时间),告警间隔内多次触发不会发送告警通知

任务队列超时告警

重写 ThreadPoolExecutor 的 execute() 方法和 beforeExecute() 方法,如果配置了执行超时或排队超时值,则会用DtpRunnable包装任务,同时记录任务的提交时间submitTime,beforeExecute根据当前时间和submitTime的差值就可以计算到该任务在队列中的等待时间,然后判断如果差值大于配置的queueTimeout则累加排队超时任务数量(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值,则会触发一次告警通知(同时重置周期累加值为0及上次告警时间为当前时间),告警间隔内多次触发不会发送告警通知

@Override
public void execute(Runnable command) {
    String taskName = null;
    if (command instanceof NamedRunnable) {
        taskName = ((NamedRunnable) command).getName();
    }

    if (CollUtil.isNotEmpty(taskWrappers)) {
        for (TaskWrapper t : taskWrappers) {
            command = t.wrap(command);
        }
    }

    if (runTimeout > 0 || queueTimeout > 0) {
        command = new DtpRunnable(command, taskName);
    }
    super.execute(command);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
    if (!(r instanceof DtpRunnable)) {
        super.beforeExecute(t, r);
        return;
    }
    DtpRunnable runnable = (DtpRunnable) r;
    long currTime = System.currentTimeMillis();
    if (runTimeout > 0) {
        runnable.setStartTime(currTime);
    }
    if (queueTimeout > 0) {
        long waitTime = currTime - runnable.getSubmitTime();
        if (waitTime > queueTimeout) {
            queueTimeoutCount.incrementAndGet();
            Runnable alarmTask = () -> AlarmManager.doAlarm(this, QUEUE_TIMEOUT);
            AlarmManager.triggerAlarm(this.getThreadPoolName(), QUEUE_TIMEOUT.getValue(), alarmTask);
            if (StringUtils.isNotBlank(runnable.getTaskName())) {
                log.warn("DynamicTp execute, queue timeout, poolName: {}, taskName: {}, waitTime: {}ms",
                        this.getThreadPoolName(), runnable.getTaskName(), waitTime);
            }
        }
    }

    super.beforeExecute(t, r);
}

任务执行超时告警

重写ThreadPoolExecutor的afterExecute()方法,根据当前时间和beforeExecute()中设置的startTime的差值即可算出任务的实际执行时间,然后判断如果差值大于配置的runTimeout则累加排队超时任务数量(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值,则会触发一次告警通知(同时重置周期累加值为0及上次告警时间为当前时间),告警间隔内多次触发不会发送告警通知

@Override
protected void afterExecute(Runnable r, Throwable t) {

    if (runTimeout > 0) {
        DtpRunnable runnable = (DtpRunnable) r;
        long runTime = System.currentTimeMillis() - runnable.getStartTime();
        if (runTime > runTimeout) {
            runTimeoutCount.incrementAndGet();
            Runnable alarmTask = () -> AlarmManager.doAlarm(this, RUN_TIMEOUT);
            AlarmManager.triggerAlarm(this.getThreadPoolName(), RUN_TIMEOUT.getValue(), alarmTask);
            if (StringUtils.isNotBlank(runnable.getTaskName())) {
                log.warn("DynamicTp execute, run timeout, poolName: {}, taskName: {}, runTime: {}ms",
                        this.getThreadPoolName(), runnable.getTaskName(), runTime);
            }
        }
    }

    super.afterExecute(r, t);
}

上次编辑于: 2022/11/8 19:46:12
贡献者: yanhom