threadpool


线程池

// 创建线程池的工具类Executors
public class Executors {
// 创建一个只有一个工作线程的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

// 创建一个固定数目线程的线程池(核心线程数和最大线程数相同)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

// 创建一个cache线程池,核心线程大小为0
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}


// 线程工厂,负责为线程池中的Worker创建线程-》可以创建个性化的线程
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
// 返回的线程又将Worker做了一层封装,当该线程执行strart方法时,将会调用Worker的run方法
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}


// jdk定义的线程池
public class ThreadPoolExecutor extends AbstractExecutorService {
// 核心线程数
private volatile int corePoolSize;
// 最大线程数
private volatile int maximumPoolSize;
// 阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 非工作线程存活时间
private volatile long keepAliveTime;
// 创建线程的工厂
private volatile ThreadFactory threadFactory;
// 线程池的拒绝策略
private volatile RejectedExecutionHandler handler;
// 存储创建好的Worker线程
private final HashSet<Worker> workers = new HashSet<Worker>();
// 主锁,对workers进行操作时会上锁
private final ReentrantLock mainLock = new ReentrantLock();

// 定义线程池内部的工作线程
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
// 当前Worker持有的线程
final Thread thread;
// 当前Worker执行的第一个任务
Runnable firstTask;
// 记录当前Worker已经完成的任务数量
volatile long completedTasks;

Worker(Runnable firstTask) {
// 将aqs中的state值由0设置为-1,禁止中断(上锁了)
setState(-1);
this.firstTask = firstTask;
// 调用工厂为当前的Worker创建一个Thread—>传入的是当前this对象
this.thread = getThreadFactory().newThread(this);
}

// 当前Worker线程启动需要执行的方法,该方法会由内部属性thread调用start方法时触发。
public void run() {
runWorker(this);
}

// 判断线程是否被独占
protected boolean isHeldExclusively() {
return getState() != 0;
}

// 尝试获取锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

// Worker线程执行的真正逻辑
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();//执行任务之前允许被打断
boolean completedAbruptly = true;
try {
// 当前worker的task不为null或者是阻塞队列不为null,worker线程会一直运行
while (task != null || (task = getTask()) != null) {
// 上锁
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 调用worker的run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}







// 初始化一个线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

// 1.若是当前工作线程数小于核心线程数, 会去创建一个核心线程。
// 2.若是当前工作线程数大于等于核心线程数,将任务放入到阻塞队列。
// 3.若是阻塞队列已满,会去创建非核心线程。
// 4.若是创建非核心线程也失败,执行拒绝策略。
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

// 1.尝试着将工作线程数加1,workers的个数。
// 2.创建一个Worker线程,并将其加入到workers数组中。
// 3.若是加入到workers数组成功,调用worker的thread的start方法,启动线程。
// 4.线程启动之后,会调用worker的run方法,而run方法又是调用runWorker(this)方法来执行的。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 自旋
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
// 将创建好的Worker加入到workers数组中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}









}

// spring线程池->ThreadPoolTaskExecutor
public class ThreadPoolTaskExecutor {
private final Object poolSizeMonitor = new Object();
// 核心线程数
private int corePoolSize = 1;
// 最大线程数
private int maxPoolSize = 2147483647;
// 非核心线程空闲最大存活时间
private int keepAliveSeconds = 60;
// 缓存队列容量
private int queueCapacity = 2147483647;
// 是否允许核心线程池超时等待(设置为false,当到达一定时间没有任务,线程池会自动关闭)
private boolean allowCoreThreadTimeOut = false;

private TaskDecorator taskDecorator;
private ThreadPoolExecutor threadPoolExecutor;

public ThreadPoolTaskExecutor() {

}

// 因为spring线程池提供了更加灵活的配置,项目中一般使用的是spring的线程池。