【开发百宝箱系列】- 扩展 ThreadPoolExecutor 线程池

扩展我们的ThreadPoolExecutor线程池,让它在调度任务之前,先保存一下提交任务线程的堆栈信息

public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    public TraceThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit,
                                   BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,  workQueue);
    }

    public TraceThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit,
                                   BlockingQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,  workQueue, threadFactory);
    }

    public TraceThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit,
                                   BlockingQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,  workQueue, threadFactory, handler);
    }

    @Override
    public void execute(Runnable task) {
        final long startTime = System.currentTimeMillis();
        super.execute(wrap(task, clientTrace(),  Thread.currentThread().getName(), startTime));
    }

    @Override
    public Future<?> submit(Runnable task) {
        final long startTime = System.currentTimeMillis();
        return super.submit(wrap(task, clientTrace(),  Thread.currentThread().getName(), startTime));
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        System.out.println("准备执行:" +  Thread.currentThread().getName());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        System.out.println("执行完成:" +  Thread.currentThread().getName());
    }

    @Override
    protected void terminated() {
        System.out.println("线程池退出");
    }

    private Exception clientTrace() {
        return new Exception("Client stack trace ");
    }

    private Runnable wrap(final Runnable task, final Exception  clientStack, final String clientThreadName, final long startTime) {
        return () -> {
            try {
                final long queueDuration = System.currentTimeMillis() -  startTime;
                System.out.println("Task " +  Thread.currentThread().getName() + " spent " + queueDuration + "ms in  queue");
                task.run();
            } catch (Exception e) {
                clientStack.printStackTrace();
                throw e;
            }
        };
    }
}

使用 Guava 中工具类 ThreadFactoryBuilder

public static void main(String[] args) {
      int core = Runtime.getRuntime().availableProcessors();
      int max = core * 2;
      BlockingQueue<Runnable> blockingQueue = new  LinkedBlockingQueue<>(1000);
      final ThreadFactory threadFactory = new  ThreadFactoryBuilder().setNameFormat("Orders-%d").setDaemon(false).build();
      TraceThreadPoolExecutor threadPool = new  TraceThreadPoolExecutor(core, max, 10, TimeUnit.SECONDS, blockingQueue,
                  threadFactory, new CallerRunsPolicy());
      for (int i = 0; i < 10; i++) {
            threadPool.execute(() -> {
                  int a = new Random().nextInt(1000);
                  if (a > 500) {
                        a = 100/0;
                  }
                  System.out.println("====>"+ new  Random().nextInt(1000));
            });
      }
      try {
            TimeUnit.SECONDS.sleep(10);
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
      threadPool.shutdown();
}

对线程池中死锁检查

public static void main(String[] args) throws InterruptedException {

  ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
  Runnable dlCheck = new Runnable() {

      @Override
      public void run() {
          long[] threadIds = mbean.findDeadlockedThreads();
          if (threadIds != null) {
                     ThreadInfo[] threadInfos = mbean.getThreadInfo(threadIds);
                     System.out.println("Detected deadlock threads:");
              for (ThreadInfo threadInfo : threadInfos) {
                  System.out.println(threadInfo.getThreadName());
              }
          }
       }
    };

       ScheduledExecutorService scheduler =Executors.newScheduledThreadPool(1);
       // 稍等5秒,然后每10秒进行一次死锁扫描
        scheduler.scheduleAtFixedRate(dlCheck, 5L, 10L, TimeUnit.SECONDS);
        // 死锁样例代码…
}

【开发百宝箱系列】- FutrueTask 使用

/**
* 当一个线程需要等待另一个线程把某个任务执行完后它才能继续执行,此时可以使用FutureTask。
* 假设有多个线程执行若干任务,每个任务最多只能被执行一次。
* 当多个线程试图同时执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完后才能继续执行
*
* @author gerry pang
* @since 2020年10月21日 下午4:33:09
*/
public class FutureTaskDemo {

      private final Map<String, Future<String>> taskCache = new  ConcurrentHashMap<>();
      public String executeTask(String taskName) {
            Future<String> future = taskCache.get(taskName);
            if (future == null) {
                  Callable<String> task = new Callable<String>() {
                        @Override
                        public String call() throws Exception {
                              System.out.println("==> start  taskName:"+taskName);
                              Thread.sleep(new Random().nextInt(1000));
                              System.out.println("==> end  taskName:"+taskName);
                              return taskName;
                        }
                  };

                  FutureTask<String> futureTask = new  FutureTask<String>(task);
                  future = taskCache.putIfAbsent(taskName, futureTask);
                  if (future == null) {
                        future = futureTask;
                        futureTask.run();
                  }
            }
            try {
                  return future.get();
            } catch (InterruptedException e) {
                  e.printStackTrace();
            } catch (ExecutionException e) {
                  e.printStackTrace();
            }
            return taskName;
      }

      public static void main(String[] args) {
            FutureTaskDemo futureTaskDemo = new FutureTaskDemo();
            futureTaskDemo.executeTask("123");
            futureTaskDemo.executeTask("123");
            futureTaskDemo.executeTask("456");
            futureTaskDemo.executeTask("456");
            futureTaskDemo.executeTask("000");
            futureTaskDemo.executeTask("123");
      }
}

执行结果

==> start taskName:123
==> end taskName:123
123
123
==> start taskName:456
==> end taskName:456
456
456
==> start taskName:000
==> end taskName:000
000
123

参考资料

  • 《并发编程的艺术》 10.4.2 FutrueTask的使用

【开发百宝箱系列】- CountDownLunch标准用法

CountDownLunch 标准用法1:开始+结束信号

class Driver {
  void main() throws InterruptedException {
    CountDownLatch startSignal = new CountDownLatch(1);
    CountDownLatch doneSignal = new CountDownLatch(N);

    for (int i = 0; i < N; ++i) {
      // create and start threads
      new Thread(new Worker(startSignal, doneSignal)).start();
    }

    doSomethingElse();            // don&#039;t let run yet
    startSignal.countDown();      // let all threads proceed
    doSomethingElse();

    doneSignal.await();           // wait for all to finish
  }
}

class Worker implements Runnable {

  private final CountDownLatch startSignal;
  private final CountDownLatch doneSignal;

  Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
    this.startSignal = startSignal;
    this.doneSignal = doneSignal;
  }

  public void run() {
    try {
      startSignal.await();
      doWork();
      doneSignal.countDown();
    } catch (InterruptedException ex) {
        // error
    } // return;
  }

  void doWork() { ... }

}

CountDownLatch 标准用法2:基于线程池

class Driver2 {
   void main() throws InterruptedException {
     CountDownLatch doneSignal = new CountDownLatch(N);
     ExecutorService e = Executors.newFixedThreadPool(N);

     for (int i = 0; i < N; ++i) {
       // create and start threads
       e.execute(new WorkerRunnable(doneSignal, i));
     }

     doneSignal.await();           // wait for all to finish
   }
}

class WorkerRunnable implements Runnable {
   private final CountDownLatch doneSignal;
   private final int i;

   WorkerRunnable(CountDownLatch doneSignal, int i) {
     this.doneSignal = doneSignal;
     this.i = i;
   }

   public void run() {
     try {
       doWork(i);
       doneSignal.countDown();
     } catch (InterruptedException ex) {
        // error
     } // return;
   }

   void doWork() { ... }
}

【开发百宝箱系列】- 安全地终止线程

public class Shutdown {
    private static class Runner implements Runnable {
        private long i;
        private volatile boolean on = true;

        @Override
        public void run() {
            while (on && !Thread.currentThread().isInterrupted()) {
               i++;
            }
            System.out.println("Count i = " + i);
        }

        public void cancel() {
            on = false;
        }
    }

    public static void main(String[] args) throws Exception {
        Runner one = new Runner();
        Thread countThread = new Thread(one, "CountThread");
        countThread.start();
        // 睡眠1秒,main线程对CountThread进行中断,使CountThread能够感知中断而结束
        TimeUnit.SECONDS.sleep(1);
        countThread.interrupt();
        Runner two = new Runner();
        countThread = new Thread(two, "CountThread");
        countThread.start();
        // 睡眠1秒,main线程对Runner two进行取消,使CountThread能够感知on为false而结束
        TimeUnit.SECONDS.sleep(1);
        two.cancel();
    }
}

结构化思维

步骤

对某一具体问题建立结构化思维主要分为两个大的步骤:
1、建立中心(问题)
2、对中心(问题)进行分解

第一步:建立中心

建立中心: 建立中心也就是要定义清楚要解决的问题,要明确目标,且不是一次就能建立成型的,需要多次

方式:
  1. 自上而下,适用于问题明确
  2. 自下而上,适用于不够明确复杂的问题,需要分类、剪枝、归纳汇总成中心

第二步:结构分解

主要分解方法

1) 演绎(因果)顺序:“大前提、小前提、结论”的演绎推理方式就是演绎顺序。
比如,经典三段论:所有人都要死,苏格拉底是人,苏格拉底要死。

2) 时间(步骤)顺序:“第一、第二、第三”,“首先、然后、再者”等,很多的时
间顺序同时也是因果顺序。

3) 空间(结构)顺序:“前端、后端、数据”,“波士顿、纽约、华盛顿”,化整为
零(将整体分解为部分)等都是空间顺序。

4) 程度(重要性)顺序: 比如“最重要、次重要、不重要”等。

注意: 满足 MECE(Mutually Exclusive Collectively Exhaustive,相互独立,完全穷尽)原则

其他工具:5W2H

参考资料

  • 阿里程序员的自我修养