【开发百宝箱系列】- 扩展 ThreadPoolExecutor 线程池

扩展我们的ThreadPoolExecutor线程池,让它在调度任务之前,先保存一下提交任务线程的堆栈信息

public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    public TraceThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit,
                                   BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,  workQueue);
    }

    public TraceThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit,
                                   BlockingQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,  workQueue, threadFactory);
    }

    public TraceThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit,
                                   BlockingQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,  workQueue, threadFactory, handler);
    }

    @Override
    public void execute(Runnable task) {
        final long startTime = System.currentTimeMillis();
        super.execute(wrap(task, clientTrace(),  Thread.currentThread().getName(), startTime));
    }

    @Override
    public Future<?> submit(Runnable task) {
        final long startTime = System.currentTimeMillis();
        return super.submit(wrap(task, clientTrace(),  Thread.currentThread().getName(), startTime));
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        System.out.println("准备执行:" +  Thread.currentThread().getName());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        System.out.println("执行完成:" +  Thread.currentThread().getName());
    }

    @Override
    protected void terminated() {
        System.out.println("线程池退出");
    }

    private Exception clientTrace() {
        return new Exception("Client stack trace ");
    }

    private Runnable wrap(final Runnable task, final Exception  clientStack, final String clientThreadName, final long startTime) {
        return () -> {
            try {
                final long queueDuration = System.currentTimeMillis() -  startTime;
                System.out.println("Task " +  Thread.currentThread().getName() + " spent " + queueDuration + "ms in  queue");
                task.run();
            } catch (Exception e) {
                clientStack.printStackTrace();
                throw e;
            }
        };
    }
}

使用 Guava 中工具类 ThreadFactoryBuilder

public static void main(String[] args) {
      int core = Runtime.getRuntime().availableProcessors();
      int max = core * 2;
      BlockingQueue<Runnable> blockingQueue = new  LinkedBlockingQueue<>(1000);
      final ThreadFactory threadFactory = new  ThreadFactoryBuilder().setNameFormat("Orders-%d").setDaemon(false).build();
      TraceThreadPoolExecutor threadPool = new  TraceThreadPoolExecutor(core, max, 10, TimeUnit.SECONDS, blockingQueue,
                  threadFactory, new CallerRunsPolicy());
      for (int i = 0; i < 10; i++) {
            threadPool.execute(() -> {
                  int a = new Random().nextInt(1000);
                  if (a > 500) {
                        a = 100/0;
                  }
                  System.out.println("====>"+ new  Random().nextInt(1000));
            });
      }
      try {
            TimeUnit.SECONDS.sleep(10);
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
      threadPool.shutdown();
}

对线程池中死锁检查

public static void main(String[] args) throws InterruptedException {

  ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
  Runnable dlCheck = new Runnable() {

      @Override
      public void run() {
          long[] threadIds = mbean.findDeadlockedThreads();
          if (threadIds != null) {
                     ThreadInfo[] threadInfos = mbean.getThreadInfo(threadIds);
                     System.out.println("Detected deadlock threads:");
              for (ThreadInfo threadInfo : threadInfos) {
                  System.out.println(threadInfo.getThreadName());
              }
          }
       }
    };

       ScheduledExecutorService scheduler =Executors.newScheduledThreadPool(1);
       // 稍等5秒,然后每10秒进行一次死锁扫描
        scheduler.scheduleAtFixedRate(dlCheck, 5L, 10L, TimeUnit.SECONDS);
        // 死锁样例代码…
}

【开发百宝箱系列】- FutrueTask 使用

/**
* 当一个线程需要等待另一个线程把某个任务执行完后它才能继续执行,此时可以使用FutureTask。
* 假设有多个线程执行若干任务,每个任务最多只能被执行一次。
* 当多个线程试图同时执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完后才能继续执行
*
* @author gerry pang
* @since 2020年10月21日 下午4:33:09
*/
public class FutureTaskDemo {

      private final Map<String, Future<String>> taskCache = new  ConcurrentHashMap<>();
      public String executeTask(String taskName) {
            Future<String> future = taskCache.get(taskName);
            if (future == null) {
                  Callable<String> task = new Callable<String>() {
                        @Override
                        public String call() throws Exception {
                              System.out.println("==> start  taskName:"+taskName);
                              Thread.sleep(new Random().nextInt(1000));
                              System.out.println("==> end  taskName:"+taskName);
                              return taskName;
                        }
                  };

                  FutureTask<String> futureTask = new  FutureTask<String>(task);
                  future = taskCache.putIfAbsent(taskName, futureTask);
                  if (future == null) {
                        future = futureTask;
                        futureTask.run();
                  }
            }
            try {
                  return future.get();
            } catch (InterruptedException e) {
                  e.printStackTrace();
            } catch (ExecutionException e) {
                  e.printStackTrace();
            }
            return taskName;
      }

      public static void main(String[] args) {
            FutureTaskDemo futureTaskDemo = new FutureTaskDemo();
            futureTaskDemo.executeTask("123");
            futureTaskDemo.executeTask("123");
            futureTaskDemo.executeTask("456");
            futureTaskDemo.executeTask("456");
            futureTaskDemo.executeTask("000");
            futureTaskDemo.executeTask("123");
      }
}

执行结果

==> start taskName:123
==> end taskName:123
123
123
==> start taskName:456
==> end taskName:456
456
456
==> start taskName:000
==> end taskName:000
000
123

参考资料

  • 《并发编程的艺术》 10.4.2 FutrueTask的使用

【开发百宝箱系列】- CountDownLunch标准用法

CountDownLunch 标准用法1:开始+结束信号

class Driver {
  void main() throws InterruptedException {
    CountDownLatch startSignal = new CountDownLatch(1);
    CountDownLatch doneSignal = new CountDownLatch(N);

    for (int i = 0; i < N; ++i) {
      // create and start threads
      new Thread(new Worker(startSignal, doneSignal)).start();
    }

    doSomethingElse();            // don&#039;t let run yet
    startSignal.countDown();      // let all threads proceed
    doSomethingElse();

    doneSignal.await();           // wait for all to finish
  }
}

class Worker implements Runnable {

  private final CountDownLatch startSignal;
  private final CountDownLatch doneSignal;

  Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
    this.startSignal = startSignal;
    this.doneSignal = doneSignal;
  }

  public void run() {
    try {
      startSignal.await();
      doWork();
      doneSignal.countDown();
    } catch (InterruptedException ex) {
        // error
    } // return;
  }

  void doWork() { ... }

}

CountDownLatch 标准用法2:基于线程池

class Driver2 {
   void main() throws InterruptedException {
     CountDownLatch doneSignal = new CountDownLatch(N);
     ExecutorService e = Executors.newFixedThreadPool(N);

     for (int i = 0; i < N; ++i) {
       // create and start threads
       e.execute(new WorkerRunnable(doneSignal, i));
     }

     doneSignal.await();           // wait for all to finish
   }
}

class WorkerRunnable implements Runnable {
   private final CountDownLatch doneSignal;
   private final int i;

   WorkerRunnable(CountDownLatch doneSignal, int i) {
     this.doneSignal = doneSignal;
     this.i = i;
   }

   public void run() {
     try {
       doWork(i);
       doneSignal.countDown();
     } catch (InterruptedException ex) {
        // error
     } // return;
   }

   void doWork() { ... }
}

【开发百宝箱系列】- 安全地终止线程

public class Shutdown {
    private static class Runner implements Runnable {
        private long i;
        private volatile boolean on = true;

        @Override
        public void run() {
            while (on && !Thread.currentThread().isInterrupted()) {
               i++;
            }
            System.out.println("Count i = " + i);
        }

        public void cancel() {
            on = false;
        }
    }

    public static void main(String[] args) throws Exception {
        Runner one = new Runner();
        Thread countThread = new Thread(one, "CountThread");
        countThread.start();
        // 睡眠1秒,main线程对CountThread进行中断,使CountThread能够感知中断而结束
        TimeUnit.SECONDS.sleep(1);
        countThread.interrupt();
        Runner two = new Runner();
        countThread = new Thread(two, "CountThread");
        countThread.start();
        // 睡眠1秒,main线程对Runner two进行取消,使CountThread能够感知on为false而结束
        TimeUnit.SECONDS.sleep(1);
        two.cancel();
    }
}

JDK动态代理源码分析

环境支持

JDK代理是不需要第三方库支持的,只需要 JDK 环境就可以进行代理

使用条件:

  • 必须实现 InvocationHandler 接口;
  • 使用 Proxy.newProxyInstance 产生代理对象;
  • 被代理的对象必须要实现接口;

使用 JDK 动态代理 5 大步骤

  • 通过实现 InvocationHandler 接口来自定义自己的 InvocationHandler;
  • 通过 Proxy.getProxyClass 获得动态代理类;
  • 通过反射机制获得代理类的构造方法,方法签名为 getConstructor(InvocationHandler.class);
  • 通过构造函数获得代理对象并将自定义的 InvocationHandler 实例对象传为参数传入;
  • 通过代理对象调用目标方法;

案例

IHello 接口

public interface IHello {
    void sayHello();
}

IHello 接口实现类

public class HelloImpl implements IHello {

    @Override
    public void sayHello() {
        System.out.println("hello world !!! ");
    }
}

通过实现 InvocationHandler 接口来自定义 InvocationHandler;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

public class MyInvocationHandler implements InvocationHandler {

    private Object target;

    public MyInvocationHandler(Object target) {
        this.target = target;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 在覆写的 invoke 方法内反射调用被代理对象的前后实现代码增强逻辑
        System.out.println("----- before invoke ------");
        Object result = method.invoke(target, args);
        System.out.println("----- after invoke ------");
        return result;
    }
}

测试

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;

public class MyProxyTest {

    public static void main(String[] args) {
        // 第一种方法
        demoOne();
        // 第二种方法
        demoTwo();
    }

    private static void demoTwo() {
        // 1、生成$Proxy0的class文件
        System.getProperties().put(&quot;sun.misc.ProxyGenerator.saveGeneratedFiles&quot;, &quot;true&quot;);
        // 2、Proxy.getProxyClass 获取动态代理类
        Class proxyClass = Proxy.getProxyClass(IHello.class.getClassLoader(), IHello.class);
        try {
            // 3、通过反射机制获得代理类的构造方法
            Constructor constructor = proxyClass.getConstructor(InvocationHandler.class);
            // 4、通过代理类构造函数来创建动态代理对象,将自定义的InvocationHandler实例传入
            IHello hello = (IHello) constructor.newInstance(new MyInvocationHandler(new HelloImpl()));
            // 5、通过代理对象调用目标方法
            hello.sayHello();
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }
    }

    private static void demoOne() {
        IHello target = new HelloImpl();
        // 将2~4步骤封装好的简便方法来创建动态代理对象 $Proxy0
        // 注意:只能强转为接口类,不能是具体的实现类型
        IHello hello = (IHello) Proxy.newProxyInstance(IHello.class.getClassLoader(), 
            new Class[]{IHello.class}, 
            new MyInvocationHandler(target));
        hello.sayHello();
    }
}

源码分析

Proxy.newProxyInstance

    public static Object newProxyInstance(ClassLoader loader,  Class&lt;?&gt;[] interfaces,  InvocationHandler h)
        throws IllegalArgumentException {
        // 检查是否为null
        Objects.requireNonNull(h);
        // 拷贝所有接口类
        final Class&lt;?&gt;[] intfs = interfaces.clone();
        // 获取系统安全管理器
        final SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
           // Reflection.getCallerClass返回调用该方法的方法的调用类;loader:接口的类加载器
           // 进行包访问权限、类加载器权限等检查
            checkProxyAccess(Reflection.getCallerClass(), loader, intfs);
        }

        /*
         * Look up or generate the designated proxy class.
         * 查找所有接口类的代理类
         */
        Class&lt;?&gt; cl = getProxyClass0(loader, intfs);

        /*
         * Invoke its constructor with the designated invocation handler.
         */
        try {
            if (sm != null) {
                checkNewProxyPermission(Reflection.getCallerClass(), cl);
            }
            // 获取代理构造对象
            final Constructor&lt;?&gt; cons = cl.getConstructor(constructorParams);
            final InvocationHandler ih = h;
            // 检查代理构造对象构造方法权限,如果过不是public,设置为可访问
            if (!Modifier.isPublic(cl.getModifiers())) {
                AccessController.doPrivileged(new PrivilegedAction&lt;Void&gt;() {
                    public Void run() {
                        cons.setAccessible(true);
                        return null;
                    }
                });
            }
            // 通过代理构造对象,创建代理类
            return cons.newInstance(new Object[]{h});
        } catch (IllegalAccessException|InstantiationException e) {
            throw new InternalError(e.toString(), e);
        } catch (InvocationTargetException e) {
            Throwable t = e.getCause();
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new InternalError(t.toString(), t);
            }
        } catch (NoSuchMethodException e) {
            throw new InternalError(e.toString(), e);
        }
    }

newProxyInstance()方法主要执行逻辑

  • 生成代理类
    getProxyClass0(loader, intfs);
  • 获取构造器
    final Constructor<?> cons = cl.getConstructor(constructorParams);
  • 生成代理对象
    cons.newInstance(new Object[]{h});

获取代理对象类

private static Class<?> getProxyClass0(ClassLoader loader, Class<?>... interfaces) {
    if (interfaces.length > 65535) {
        throw new IllegalArgumentException("interface limit exceeded");
    }

    // If the proxy class defined by the given loader implementing
    // the given interfaces exists, this will simply return the cached copy;
    // otherwise, it will create the proxy class via the ProxyClassFactory
    // 提升性能,如果加载过从缓存中获取,没有通过 ProxyClassFactory 创建被代理类
    return proxyClassCache.get(loader, interfaces);
}

会在项目的根目录下的/com/sun/proxy/下生成名为$Proxy0.class 文件

package com.sun.proxy;

import com.gerry.pang.lettucedemo.service.IHello;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;

public final class $Proxy0 extends Proxy implements IHello {
    private static Method m1;
    private static Method m3;
    private static Method m2;
    private static Method m0;

    public $Proxy0(InvocationHandler var1) throws  {
        super(var1);
    }

    public final boolean equals(Object var1) throws  {
        try {
            return ((Boolean)super.h.invoke(this, m1, new Object[]{var1})).booleanValue();
        } catch (RuntimeException | Error var3) {
            throw var3;
        } catch (Throwable var4) {
            throw new UndeclaredThrowableException(var4);
        }
    }

    //  重点:代理类中代理方法
    public final void sayHello() throws  {
        try {
            // InvocationHandler h;
            // 当前代理类对象,被类中方法, 方法入参
            super.h.invoke(this, m3, (Object[])null);
        } catch (RuntimeException | Error var2) {
            throw var2;
        } catch (Throwable var3) {
            throw new UndeclaredThrowableException(var3);
        }
    }

    public final String toString() throws  {
        try {
            return (String)super.h.invoke(this, m2, (Object[])null);
        } catch (RuntimeException | Error var2) {
            throw var2;
        } catch (Throwable var3) {
            throw new UndeclaredThrowableException(var3);
        }
    }

    public final int hashCode() throws  {
        try {
            return ((Integer)super.h.invoke(this, m0, (Object[])null)).intValue();
        } catch (RuntimeException | Error var2) {
            throw var2;
        } catch (Throwable var3) {
            throw new UndeclaredThrowableException(var3);
        }
    }

    static {
        try {
            m1 = Class.forName("java.lang.Object").getMethod("equals", Class.forName("java.lang.Object"));
            // 获取到被代理类中的hello方法对象
            m3 = Class.forName("com.gerry.pang.lettucedemo.service.IHello").getMethod("sayHello");
            m2 = Class.forName("java.lang.Object").getMethod("toString");
            m0 = Class.forName("java.lang.Object").getMethod("hashCode");
        } catch (NoSuchMethodException var2) {
            throw new NoSuchMethodError(var2.getMessage());
        } catch (ClassNotFoundException var3) {
            throw new NoClassDefFoundError(var3.getMessage());
        }
    }
}

总结

动态代理执行流程

JDK动态代理基于拦截器和反射来实现。通过Proxy.newProxyInstance 自动生成基于接口的被代理对象的类 $Proxy0,在这个类继承了Proxy类,同时实现了IHello接口,即代理类接口,覆写了sayHello方法,其内部是通过反射调用被代理类的sayHello方法,也就是说这里就是个壳子。