【开发百宝箱系列】- 扩展 ThreadPoolExecutor 线程池

扩展我们的ThreadPoolExecutor线程池,让它在调度任务之前,先保存一下提交任务线程的堆栈信息

public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    public TraceThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit,
                                   BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,  workQueue);
    }

    public TraceThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit,
                                   BlockingQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,  workQueue, threadFactory);
    }

    public TraceThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit,
                                   BlockingQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,  workQueue, threadFactory, handler);
    }

    @Override
    public void execute(Runnable task) {
        final long startTime = System.currentTimeMillis();
        super.execute(wrap(task, clientTrace(),  Thread.currentThread().getName(), startTime));
    }

    @Override
    public Future<?> submit(Runnable task) {
        final long startTime = System.currentTimeMillis();
        return super.submit(wrap(task, clientTrace(),  Thread.currentThread().getName(), startTime));
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        System.out.println("准备执行:" +  Thread.currentThread().getName());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        System.out.println("执行完成:" +  Thread.currentThread().getName());
    }

    @Override
    protected void terminated() {
        System.out.println("线程池退出");
    }

    private Exception clientTrace() {
        return new Exception("Client stack trace ");
    }

    private Runnable wrap(final Runnable task, final Exception  clientStack, final String clientThreadName, final long startTime) {
        return () -> {
            try {
                final long queueDuration = System.currentTimeMillis() -  startTime;
                System.out.println("Task " +  Thread.currentThread().getName() + " spent " + queueDuration + "ms in  queue");
                task.run();
            } catch (Exception e) {
                clientStack.printStackTrace();
                throw e;
            }
        };
    }
}

使用 Guava 中工具类 ThreadFactoryBuilder

public static void main(String[] args) {
      int core = Runtime.getRuntime().availableProcessors();
      int max = core * 2;
      BlockingQueue<Runnable> blockingQueue = new  LinkedBlockingQueue<>(1000);
      final ThreadFactory threadFactory = new  ThreadFactoryBuilder().setNameFormat("Orders-%d").setDaemon(false).build();
      TraceThreadPoolExecutor threadPool = new  TraceThreadPoolExecutor(core, max, 10, TimeUnit.SECONDS, blockingQueue,
                  threadFactory, new CallerRunsPolicy());
      for (int i = 0; i < 10; i++) {
            threadPool.execute(() -> {
                  int a = new Random().nextInt(1000);
                  if (a > 500) {
                        a = 100/0;
                  }
                  System.out.println("====>"+ new  Random().nextInt(1000));
            });
      }
      try {
            TimeUnit.SECONDS.sleep(10);
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
      threadPool.shutdown();
}

对线程池中死锁检查

public static void main(String[] args) throws InterruptedException {

  ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
  Runnable dlCheck = new Runnable() {

      @Override
      public void run() {
          long[] threadIds = mbean.findDeadlockedThreads();
          if (threadIds != null) {
                     ThreadInfo[] threadInfos = mbean.getThreadInfo(threadIds);
                     System.out.println("Detected deadlock threads:");
              for (ThreadInfo threadInfo : threadInfos) {
                  System.out.println(threadInfo.getThreadName());
              }
          }
       }
    };

       ScheduledExecutorService scheduler =Executors.newScheduledThreadPool(1);
       // 稍等5秒,然后每10秒进行一次死锁扫描
        scheduler.scheduleAtFixedRate(dlCheck, 5L, 10L, TimeUnit.SECONDS);
        // 死锁样例代码…
}

发表评论