亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java怎么自定義線程池中隊列

發布時間:2022-07-08 10:01:16 來源:億速云 閱讀:143 作者:iii 欄目:開發技術

本篇內容介紹了“Java怎么自定義線程池中隊列”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

背景

業務交互的過程中涉及到了很多關于SFTP下載的問題,因此在代碼中定義了一些線程池,使用中發現了一些問題

代碼類似如下所示:

public class ExecutorTest {
    private static ExecutorService es = new ThreadPoolExecutor(2,
            100, 1000, TimeUnit.MILLISECONDS
            , new ArrayBlockingQueue<>(10));
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            es.submit(new MyThread());
        }
    }
    static class MyThread implements Runnable {
        @Override
        public void run() {
            for (; ; ) {
                System.out.println("Thread name=" + Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

如上面的代碼所示,定義了一個初始容量為2,最大容量為100,隊列長度為10的線程池,期待的運行結果為:

Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-3
Thread name=pool-1-thread-4
Thread name=pool-1-thread-5
Thread name=pool-1-thread-6
Thread name=pool-1-thread-7
Thread name=pool-1-thread-8
Thread name=pool-1-thread-9
Thread name=pool-1-thread-10
Thread name=pool-1-thread-3
Thread name=pool-1-thread-5
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1
Thread name=pool-1-thread-4
Thread name=pool-1-thread-10
Thread name=pool-1-thread-7
Thread name=pool-1-thread-6
Thread name=pool-1-thread-9
Thread name=pool-1-thread-8
Thread name=pool-1-thread-3
Thread name=pool-1-thread-4
Thread name=pool-1-thread-1
Thread name=pool-1-thread-5
Thread name=pool-1-thread-2
Thread name=pool-1-thread-8
Thread name=pool-1-thread-6
Thread name=pool-1-thread-7
Thread name=pool-1-thread-9
Thread name=pool-1-thread-10

期待十個線程都可以運行,但實際的執行效果如下:

Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1
Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1
Thread name=pool-1-thread-2
Thread name=pool-1-thread-1

對比可以看出,用上面的方式定義線程池,最終只有兩個線程可以運行,即線程池的初始容量大小。其余線程都被阻塞到了隊列ArrayBlockingQueue<>(10)

問題分析

我們知道,Executors框架提供了幾種常見的線程池分別為:

  • newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。

  • newFixedThreadPool 創建一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待。

  • newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。

  • newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。

如果將代碼中自定義的線程池改為 :

private static ExecutorService es = Executors.newCachedThreadPool();

運行發現,提交的十個線程都可以運行

Executors.newCachedThreadPool()的源碼如下:

/**
 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available.  These pools will typically improve the performance
 * of programs that execute many short-lived asynchronous tasks.
 * Calls to {@code execute} will reuse previously constructed
 * threads if available. If no existing thread is available, a new
 * thread will be created and added to the pool. Threads that have
 * not been used for sixty seconds are terminated and removed from
 * the cache. Thus, a pool that remains idle for long enough will
 * not consume any resources. Note that pools with similar
 * properties but different details (for example, timeout parameters)
 * may be created using {@link ThreadPoolExecutor} constructors.
 *
 * @return the newly created thread pool
 */
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

通過對比發現,newCachedThreadPool使用的是 SynchronousQueue<>()而我們使用的是ArrayBlockingQueue<>(10) 因此可以很容易的發現問題出在隊列上。

問題解決

將ArrayBlockingQueue改為SynchronousQueue 問題解決,代碼如下:

public class ExecutorTest {
    private static ExecutorService es = new ThreadPoolExecutor(2,
            100, 1000, TimeUnit.MILLISECONDS
            , new SynchronousQueue<>());
    private static ExecutorService es2 = Executors.newCachedThreadPool();
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            es.submit(new MyThread());
        }
    }
    static class MyThread implements Runnable {
        @Override
        public void run() {
            for (; ; ) {
                System.out.println("Thread name=" + Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

總結

兩個隊列的UML關系圖

Java怎么自定義線程池中隊列

從圖上我們可以看到,兩個隊列都繼承了AbstractQueue實現了BlockingQueue接口,因此功能應該相似

SynchronousQueue的定義

* <p>Synchronous queues are similar to rendezvous channels used in
* CSP and Ada. They are well suited for handoff designs, in which an
* object running in one thread must sync up with an object running
* in another thread in order to hand it some information, event, or
* task.

SynchronousQueue類似于一個傳遞通道,只是通過他傳遞某個元素,并沒有任何容量,只有當第一個元素被取走,才能在給隊列添加元素。

ArrayBlockingQueue的定義

* A bounded {@linkplain BlockingQueue blocking queue} backed by an
* array.  This queue orders elements FIFO (first-in-first-out).  The
* <em>head</em> of the queue is that element that has been on the
* queue the longest time.  The <em>tail</em> of the queue is that
* element that has been on the queue the shortest time. New elements
* are inserted at the tail of the queue, and the queue retrieval
* operations obtain elements at the head of the queue.

ArrayBlockingQueue從定義來看就是一個普通的隊列,先入先出,當隊列為空時,獲取數據的線程會被阻塞,當隊列滿時,添加隊列的線程會被阻塞,直到隊列可用。

分析

從上面隊列的定義中可以看出,導致線程池沒有按照預期運行的原因不是因為隊列的問題,應該是關于線程池在提交任務時,從隊列取數據的方式不同導致的。

jdk源碼中關于線程池隊列的說明

* <dt>Queuing</dt>
*
* <dd>Any {@link BlockingQueue} may be used to transfer and hold
* submitted tasks.  The use of this queue interacts with pool sizing:
*
* <ul>
*
* <li> If fewer than corePoolSize threads are running, the Executor
* always prefers adding a new thread
* rather than queuing.</li>
*
* <li> If corePoolSize or more threads are running, the Executor
* always prefers queuing a request rather than adding a new
* thread.</li>
*
* <li> If a request cannot be queued, a new thread is created unless
* this would exceed maximumPoolSize, in which case, the task will be
* rejected.</li>

從說明中可以看到,如果正在運行的線程數必初始容量corePoolSize小,那么Executor會從創建一個新線程去執行任務,如果正在執行的線程數必corePoolSize大,那么Executor會將新提交的任務放到阻塞隊列,除非當隊列的個數超過了隊列的最大長度maxmiumPooSize。

從源碼中找到關于提交任務的方法:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

從源碼中看到 subimit實際上是調用了execute方法

execute方法的源碼:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

源碼中可以看出,提交任務時,首先會判斷正在執行的線程數是否小于corePoolSize,如果條件成立那么會直接創建線程并執行任務。如果條件不成立,且隊列沒有滿,那么將任務放到隊列,如果條件不成立但是隊列滿了,那么同樣也新創建線程并執行任務。

“Java怎么自定義線程池中隊列”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

中方县| 博客| 阿拉善右旗| 平阳县| 漯河市| 溆浦县| 广昌县| 开阳县| 漳州市| 精河县| 南雄市| 霍州市| 枣阳市| 横山县| 芷江| 海宁市| 仁寿县| 馆陶县| 鄢陵县| 江川县| 元阳县| 区。| 喀什市| 南京市| 苏尼特左旗| 南平市| 郎溪县| 巴林右旗| 陈巴尔虎旗| 泌阳县| 米易县| 谷城县| 民县| 新沂市| 修文县| 淮滨县| 深泽县| 乐昌市| 富蕴县| 徐闻县| 山丹县|