运行报警

yanhom运行报警dynamictp大约 3 分钟

告警类型

框架目前提供以下告警功能,每一个告警项都可以独立配置是否开启、告警阈值、告警间隔时间、平台等,具体代码请看 core 模块 notifier 包, 告警信息同时会高亮与该告警项相关的字段。

  • 线程池活跃度告警
  1. 活跃度 = (activeCount / maximumPoolSize) * 100

  2. 比如 threshold 阈值配置 80,表示活跃度达到 80% 时触发告警

  3. 服务启动后会开启一个定时监控任务,每隔一定时间(可配置)去计算线程池的活跃度,达到配置的 threshold 阈值后会触发一次告警,告警间隔内多次触发不会发送告警通知

  • 队列容量告警
  1. 容量使用率 = (queueSize / queueCapacity) * 100

  2. 比如 threshold 阈值配置 80,表示容量使用率达到 80% 时触发告警

  3. 服务启动后会开启一个定时监控任务,每隔一定时间去计算任务队列的使用率,达到配置的 threshold 阈值后会触发一次告警,告警间隔内多次触发不会发送告警通知

  • 拒绝策略告警

    线程池线程数达到配置的最大线程数,且任务队列已满,再提交任务会触发拒绝策略。DtpExecutor 线程池用到的 RejectedExecutionHandler 是经过动态代理包装过的, 在执行具体的拒绝策略之前会执行 RejectedAware 类 beforeReject() 方法,此方法会去做拒绝数量累加(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值, 则会触发一次告警通知(同时重置周期累加值为 0 及上次告警时间为当前时间),告警间隔内多次触发不会发送告警通知

    /**
     * Do sth before reject.
     *
     * @param runnable Runnable instance
     * @param executor ThreadPoolExecutor instance
     * @param log      logger
     */
    default void beforeReject(Runnable runnable, ThreadPoolExecutor executor, Logger log) {
        if (executor instanceof DtpExecutor) {
            val dtpRunnable = (DtpRunnable) runnable;
            dtpRunnable.cancelQueueTimeoutTask();
            DtpExecutor dtpExecutor = (DtpExecutor) executor;
            dtpExecutor.incRejectCount(1);
            AlarmManager.doAlarmAsync(dtpExecutor, REJECT);
            log.warn("DynamicTp execute, thread pool is exhausted, tpName: {}, taskName: {}, traceId: {}, " +
                            "poolSize: {} (active: {}, core: {}, max: {}, largest: {}), " +
                            "task: {} (completed: {}), queueCapacity: {}, (currSize: {}, remaining: {}), " +
                            "executorStatus: (isShutdown: {}, isTerminated: {}, isTerminating: {})",
                    dtpExecutor.getThreadPoolName(), dtpRunnable.getTaskName(), MDC.get(TRACE_ID), executor.getPoolSize(),
                    executor.getActiveCount(), executor.getCorePoolSize(), executor.getMaximumPoolSize(),
                    executor.getLargestPoolSize(), executor.getTaskCount(), executor.getCompletedTaskCount(),
                    dtpExecutor.getQueueCapacity(), dtpExecutor.getQueue().size(), executor.getQueue().remainingCapacity(),
                    executor.isShutdown(), executor.isTerminated(), executor.isTerminating());
        }
    }
    
  • 任务排队超时告警

    重写 ThreadPoolExecutor 的 execute() 方法和 beforeExecute() 方法,采用时间轮处理任务排队超时逻辑。 任务提交时用 DtpRunnable 包装任务, 并根据 queueTimeout 开启一个处理排队超时的 TimerTask, 排队超时后执行 TimerTask 的 run 方法,累加排队超时任务数量(总数值累加、周期值累加)。 且判断如果周期累计值达到配置的阈值, 则会触发一次告警通知(同时重置周期累加值为 0 及上次告警时间为当前时间),告警间隔内多次触发不会发送告警通知。

    public void execute(Runnable command) {
        DtpRunnable dtpRunnable = (DtpRunnable) wrapTasks(command);
        dtpRunnable.startQueueTimeoutTask(this);
        super.execute(dtpRunnable);
    }
    
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        DtpRunnable runnable = (DtpRunnable) r;
        runnable.cancelQueueTimeoutTask();
        runnable.startRunTimeoutTask(this, t);
    }
    
  • 任务执行超时告警

    重写 ThreadPoolExecutor 的 beforeExecute() 和 afterExecute() 方法,采用时间轮处理任务执行超时逻辑。 beforeExecute() 阶段根据 runTimeout 开启一个处理执行超时的 TimerTask,超时后执行 TimerTask 的 run 方法, 累加执行超时任务数量(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值,则会触发一次告警通知 (同时重置周期累加值为 0 及上次告警时间为当前时间),告警间隔内多次触发不会发送告警通知。

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        ((DtpRunnable) r).cancelRunTimeoutTask();
        tryPrintError(r, t);
        clearContext();
    }
    
上次编辑于:
贡献者: yanhom