/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue. At any point, at most * {@code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks. The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}. */ publicstatic ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { returnnewThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, newLinkedBlockingQueue<Runnable>(), threadFactory); }
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available. These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to {@code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using {@link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */ publicstatic ExecutorService newCachedThreadPool() { returnnewThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, newSynchronousQueue<Runnable>()); }
/** * Creates a work-stealing thread pool using all available processors as its target parallelism level. */ publicstatic ExecutorService newWorkStealingPool() { returnnewForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
publicinterfaceRejectedExecutionHandler { /** * Method that may be invoked by a {@link ThreadPoolExecutor} when * {@link ThreadPoolExecutor#execute execute} cannot accept a * task. This may occur when no more threads or queue slots are * available because their bounds would be exceeded, or upon * shutdown of the Executor. */ voidrejectedExecution(Runnable r, ThreadPoolExecutor executor); }
publicvoidexecute(Runnable command) { if (command == null) thrownewNullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ intc= ctl.get(); //小于核心线程数 if (workerCountOf(c) < corePoolSize) { //创建核心线程执行任务 if (addWorker(command, true)) return; c = ctl.get(); } //线程池是运行状态 && 添加任务到队列成功 if (isRunning(c) && workQueue.offer(command)) { intrecheck= ctl.get(); // 如果线程池没有运行,移除掉这个任务,执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); // 没有线程,创建线程执行任务 elseif (workerCountOf(recheck) == 0) addWorker(null, false); }// 线程池停止或任务队列满的情况 elseif (!addWorker(command, false))//创建非核心线程 // 说明线程池已经关闭或达到最大线程数,执行拒绝策略 reject(command); }
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 以当前 Worker 创建线程,所以线程启动后会调用 Worker#run() 方法 this.thread = getThreadFactory().newThread(this); }