您好,登錄后才能下訂單哦!
本篇內容主要講解“Java并發編程在各主流框架中怎么應用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Java并發編程在各主流框架中怎么應用”吧!
JVM 規范定義了 Java 內存模型來屏蔽掉各種操作系統、虛擬機實現廠商和硬件的內存訪問差異,以確保 Java 程序在所有操作系統和平臺上能夠達到一致的內存訪問效果。
Java 內存模型規定所有的變量都存儲在主內存中,每個線程都有自己獨立的工作內存,工作內存保存了對應該線程使用的變量的主內存副本拷貝。 線程對這些變量的操作都在自己的工作內存中進行,不能直接操作主內存和其他工作內存中存儲的變量或者變量副本。線程間的變量傳遞需通過主內存來完成,三者的關系如下圖所示。
Java 內存模型定義了 8 種操作來完成主內存和工作內存的變量訪問,具體如下。
?read:把一個變量的值從主內存傳輸到線程的工作內存中,以便隨后的 load 動作使用。?load:把從主內存中讀取的變量值載入工作內存的變量副本中。?use:把工作內存中一個變量的值傳遞給 Java 虛擬機執行引擎。?assign:把從執行引擎接收到的變量的值賦值給工作內存中的變量。?store:把工作內存中一個變量的值傳送到主內存中,以便隨后的 write 操作。?write:工作內存傳遞過來的變量值放入主內存中。?lock:把主內存的一個變量標識為某個線程獨占的狀態。?unlock:把主內存中 一個處于鎖定狀態的變量釋放出來,被釋放后的變量才可以被其他線程鎖定。
這個概念與事務中的原子性大概一致,表明此操作是不可分割,不可中斷的,要么全部執行,要么全部不執行。Java 內存模型直接保證的原子性操作包括 read、load、use、assign、store、write、lock、unlock 這八個。
可見性是指當一個線程修改了共享變量的值,其他線程能夠立即得知這個修改。 Java 內存模型是通過在變量修改后將新值同步回主內存,在變量讀取前從主內存刷新變量值這種依賴主內存作為傳遞媒介的方式來實現可見性的,無論是普通變量還是 volatile 變量都是如此,普通變量與 volatile 變量的區別是,volatile 的特殊規則保證了新值能立即同步到主內存,以及每次使用前立即從主內存刷新。因此,可以說 volatile 保證了多線程操作時變量的可見性,而普通變量則不能保證這一點。除了 volatile 外,synchronized 也提供了可見性,synchronized 的可見性是由 “對一個變量執行 unlock 操作 之前,必須先把此變量同步回主內存中(執行 store、write 操作)” 這條規則獲得。
單線程環境下,程序會 “有序的”執行,即:線程內表現為串行語義。但是在多線程環境下,由于指令重排,并發執行的正確性會受到影響。在 Java 中使用 volatile 和 synchronized 關鍵字,可以保證多線程執行的有序性。volatile 通過加入內存屏障指令來禁止內存的重排序。synchronized 通過加鎖,保證同一時刻只有一個線程來執行同步代碼。
打開 NioEventLoop 的代碼中,有一個控制 IO 操作 和 其他任務運行比例的,用 volatile 修飾的 int 類型字段 ioRatio,代碼如下。
private volatile int ioRatio = 50;
這里為什么要用 volatile 修飾呢?我們首先對 volatile 關鍵字進行說明,然后再結合 Netty 的代碼進行分析。
關鍵字 volatile 是 Java 提供的最輕量級的同步機制,Java 內存模型對 volatile 專門定義了一些特殊的訪問規則。下面我們就看它的規則。當一個變量被 volatile 修飾后,它將具備以下兩種特性。
?線程可見性:當一個線程修改了被 volatile 修飾的變量后,無論是否加鎖,其他線程都可以立即看到最新的修改,而普通變量卻做不到這點。?禁止指令重排序優化:普通的變量僅僅保證在該方法的執行過程中所有依賴賦值結果的地方都能獲取正確的結果,而不能保證變量賦值操作的順序與程序代碼的執行順序一致。舉個簡單的例子說明下指令重排序優化問題,代碼如下。
public class ThreadStopExample { private static boolean stop; public static void main(String[] args) throws InterruptedException { Thread workThread = new Thread(new Runnable() { public void run() { int i= 0; while (!stop) { i++; try{ TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }); workThread.start(); TimeUnit.SECONDS.sleep(3); stop = true; }}
我們預期程序會在 3s 后停止,但是實際上它會一直執行下去,原因就是虛擬機對代碼進行了指令重排序和優化,優化后的指令如下。
if (!stop) While(true) ......
workThread 線程在執行重排序后的代碼時,是無法發現變量 stop 被其它線程修改的,因此無法停止運行。要解決這個問題,只要將 stop 前增加 volatile 修飾符即可。volatile 解決了如下兩個問題。第一,主線程對 stop 的修改在 workThread 線程 中可見,也就是說 workThread 線程 立即看到了其他線程對于 stop 變量 的修改。第二,禁止指令重排序,防止因為重排序導致的并發訪問邏輯混亂。
一些人認為使用 volatile 可以代替傳統鎖,提升并發性能,這個認識是錯誤的。volatile 僅僅解決了可見性的問題,但是它并不能保證互斥性,也就是說多個線程并發修改某個變量時,依舊會產生多線程問題。因此,不能靠 volatile 來完全替代傳統的鎖。根據經驗總結,volatile 最適用的場景是 “ 一個線程寫,其他線程讀 ”,如果有多個線程并發寫操作,仍然需要使用鎖或者線程安全的容器或者原子變量來代替。下面我們繼續對 Netty 的源碼做分析。上面講到了 ioRatio 被定義成 volatile,下面看看代碼為什么要這樣定義。
final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
通過代碼分析我們發現,在 NioEventLoop 線程 中,ioRatio 并沒有被修改,它是只讀操作。既然沒有修改,為什么要定義成 volatile 呢?繼續看代碼,我們發現 NioEventLoop 提供了重新設置 IO 執行時間比例的公共方法。
public void setIoRatio(int ioRatio) { if (ioRatio <= 0 || ioRatio > 100) { throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)"); } this.ioRatio = ioRatio; }
首先,NioEventLoop 線程 沒有調用該 set 方法,說明調整 IO 執行時間比例 是外部發起的操作,通常是由業務的線程調用該方法,重新設置該參數。這樣就形成了一個線程寫、一個線程讀。根據前面針對 volatile 的應用總結,此時可以使用 volatile 來代替傳統的 synchronized 關鍵字,以提升并發訪問的性能。
ThreadLocal 又稱為線程本地存儲區(Thread Local Storage,簡稱為 TLS),每個線程都有自己的私有的本地存儲區域,不同線程之間彼此不能訪問對方的 TLS 區域。使用 ThreadLocal 變量 的 set(T value)
方法 可以將數據存入該線程本地存儲區,使用 get() 方法 可以獲取到之前存入的值。
不使用 ThreadLocal。
public class SessionBean { public static class Session { private String id; public String getId() { return id; } public void setId(String id) { this.id = id; } } public Session createSession() { return new Session(); } public void setId(Session session, String id) { session.setId(id); } public String getId(Session session) { return session.getId(); } public static void main(String[] args) { //沒有使用ThreadLocal,在方法間共享session需要進行session在方法間的傳遞 new Thread(() -> { SessionBean bean = new SessionBean(); Session session = bean.createSession(); bean.setId(session, "susan"); System.out.println(bean.getId(session)); }).start(); }}
上述代碼中,session 需要在方法間傳遞才可以修改和讀取,保證線程中各方法操作的是一個。下面看一下使用 ThreadLocal 的代碼。
public class SessionBean {//定義一個靜態ThreadLocal變量session,就能夠保證各個線程有自己的一份,并且方法可以方便獲取,不用傳遞 private static ThreadLocal<Session> session = new ThreadLocal<>(); public static class Session { private String id; public String getId() { return id; } public void setId(String id) { this.id = id; } } public void createSession() { session.set(new Session()); } public void setId(String id) { session.get().setId(id); } public String getId() { return session.get().getId(); } public static void main(String[] args) { new Thread(() -> { SessionBean bean = new SessionBean(); bean.createSession(); bean.setId("susan"); System.out.println(bean.getId()); }).start(); }}
在方法的內部實現中,直接可以通過 session.get() 獲取到當前線程的 session,省掉了參數在方法間傳遞的環節。
一般,類屬性中的數據是多個線程共享的,但 ThreadLocal 類型的數據 聲明為類屬性,卻可以為每一個使用它(通過 set(T value)
方法)的線程存儲線程私有的數據,通過其源碼我們可以發現其中的原理。
public class ThreadLocal<T> { /** * 下面的 getMap()方法 傳入當前線程,獲得一個ThreadLocalMap對象,說明每一個線程維護了 * 自己的一個 map,保證讀取出來的value是自己線程的。 * * ThreadLocalMap 是ThreadLocal靜態內部類,存儲value的鍵值就是ThreadLocal本身。 * * 因此可以斷定,每個線程維護一個ThreadLocalMap的鍵值對映射Map。不同線程的Map的 key值 是一樣的, * 都是ThreadLocal,但 value 是不同的。 */ public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); }}
Spring 事務處理的設計與實現中大量使用了 ThreadLocal 類,比如,TransactionSynchronizationManager 維護了一系列的 ThreadLocal 變量,用于存儲線程私有的 事務屬性及資源。源碼如下。
/** * 管理每個線程的資源和事務同步的中心幫助程序。供資源管理代碼使用,但不供典型應用程序代碼使用。 * * 資源管理代碼應該檢查線程綁定的資源,如,JDBC連接 或 Hibernate Sessions。 * 此類代碼通常不應該將資源綁定到線程,因為這是事務管理器的職責。另一個選項是, * 如果事務同步處于活動狀態,則在首次使用時延遲綁定,以執行跨任意數量資源的事務。 */public abstract class TransactionSynchronizationManager { /** * 一般是一個線程持有一個 獨立的事務,以相互隔離地處理各自的事務。 * 所以這里使用了很多 ThreadLocal對象,為每個線程綁定 對應的事務屬性及資源, * 以便后續使用時能直接獲取。 */ private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<Map<Object, Object>>("Transactional resources"); private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations"); private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<String>("Current transaction name"); private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<Boolean>("Current transaction read-only status"); private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<Integer>("Current transaction isolation level"); private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<Boolean>("Actual transaction active"); /** * 為當前線程 綁定 對應的resource資源 */ public static void bindResource(Object key, Object value) throws IllegalStateException { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Assert.notNull(value, "Value must not be null"); Map<Object, Object> map = resources.get(); // 如果當前線程的 resources中,綁定的數據map為空,則為 resources 綁定 map if (map == null) { map = new HashMap<Object, Object>(); resources.set(map); } Object oldValue = map.put(actualKey, value); if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) { oldValue = null; } if (oldValue != null) { throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } if (logger.isTraceEnabled()) { logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" + Thread.currentThread().getName() + "]"); } } /** * 返回當前線程綁定的所有資源 */ public static Map<Object, Object> getResourceMap() { Map<Object, Object> map = resources.get(); return (map != null ? Collections.unmodifiableMap(map) : Collections.emptyMap()); }}
Mybatis 的 SqlSession 對象 也是各線程私有的資源,所以對其的管理也使用到了 ThreadLocal 類。源碼如下。
public class SqlSessionManager implements SqlSessionFactory, SqlSession { private final ThreadLocal<SqlSession> localSqlSession = new ThreadLocal<>(); public void startManagedSession() { this.localSqlSession.set(openSession()); } public void startManagedSession(boolean autoCommit) { this.localSqlSession.set(openSession(autoCommit)); } public void startManagedSession(Connection connection) { this.localSqlSession.set(openSession(connection)); } public void startManagedSession(TransactionIsolationLevel level) { this.localSqlSession.set(openSession(level)); } public void startManagedSession(ExecutorType execType) { this.localSqlSession.set(openSession(execType)); } public void startManagedSession(ExecutorType execType, boolean autoCommit) { this.localSqlSession.set(openSession(execType, autoCommit)); } public void startManagedSession(ExecutorType execType, TransactionIsolationLevel level) { this.localSqlSession.set(openSession(execType, level)); } public void startManagedSession(ExecutorType execType, Connection connection) { this.localSqlSession.set(openSession(execType, connection)); } public boolean isManagedSessionStarted() { return this.localSqlSession.get() != null; } @Override public Connection getConnection() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot get connection. No managed session is started."); } return sqlSession.getConnection(); } @Override public void clearCache() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot clear the cache. No managed session is started."); } sqlSession.clearCache(); } @Override public void commit() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot commit. No managed session is started."); } sqlSession.commit(); } @Override public void commit(boolean force) { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot commit. No managed session is started."); } sqlSession.commit(force); } @Override public void rollback() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot rollback. No managed session is started."); } sqlSession.rollback(); } @Override public void rollback(boolean force) { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot rollback. No managed session is started."); } sqlSession.rollback(force); } @Override public List<BatchResult> flushStatements() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot rollback. No managed session is started."); } return sqlSession.flushStatements(); } @Override public void close() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot close. No managed session is started."); } try { sqlSession.close(); } finally { localSqlSession.set(null); } }}
首先通過 ThreadPoolExecutor 的源碼 看一下線程池的主要參數及方法。
public class ThreadPoolExecutor extends AbstractExecutorService { /** * 核心線程數 * 當向線程池提交一個任務時,若線程池已創建的線程數小于corePoolSize,即便此時存在空閑線程, * 也會通過創建一個新線程來執行該任務,直到已創建的線程數大于或等于corePoolSize */ private volatile int corePoolSize; /** * 最大線程數 * 當隊列滿了,且已創建的線程數小于maximumPoolSize,則線程池會創建新的線程來執行任務。 * 另外,對于無界隊列,可忽略該參數 */ private volatile int maximumPoolSize; /** * 線程存活保持時間 * 當線程池中線程數 超出核心線程數,且線程的空閑時間也超過 keepAliveTime時, * 那么這個線程就會被銷毀,直到線程池中的線程數小于等于核心線程數 */ private volatile long keepAliveTime; /** * 任務隊列 * 用于傳輸和保存等待執行任務的阻塞隊列 */ private final BlockingQueue<Runnable> workQueue; /** * 線程工廠 * 用于創建新線程。threadFactory 創建的線程也是采用 new Thread() 方式,threadFactory * 創建的線程名都具有統一的風格:pool-m-thread-n(m為線程池的編號,n為線程池中線程的編號 */ private volatile ThreadFactory threadFactory; /** * 線程飽和策略 * 當線程池和隊列都滿了,再加入的線程會執行此策略 */ private volatile RejectedExecutionHandler handler; /** * 構造方法提供了多種重載,但實際上都使用了最后一個重載 完成了實例化 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } /** * 執行一個任務,但沒有返回值 */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } /** * 提交一個線程任務,有返回值。該方法繼承自其父類 AbstractExecutorService,有多種重載,這是最常用的一個。 * 通過future.get()獲取返回值(阻塞直到任務執行完) */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } /** * 關閉線程池,不再接收新的任務,但會把已有的任務執行完 */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } /** * 立即關閉線程池,已有的任務也會被拋棄 */ public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } public boolean isShutdown() { return ! isRunning(ctl.get()); }}
線程池執行流程,如下圖所示。
Executors 類 通過 ThreadPoolExecutor 封裝了 4 種常用的線程池:CachedThreadPool,FixedThreadPool,ScheduledThreadPool 和 SingleThreadExecutor。其功能如下。
1.CachedThreadPool:用來創建一個幾乎可以無限擴大的線程池(最大線程數為 Integer.MAX_VALUE),適用于執行大量短生命周期的異步任務。2.FixedThreadPool:創建一個固定大小的線程池,保證線程數可控,不會造成線程過多,導致系統負載更為嚴重。3.SingleThreadExecutor:創建一個單線程的線程池,可以保證任務按調用順序執行。4.ScheduledThreadPool:適用于執行 延時 或者 周期性 任務。
?CPU 密集型任務
盡量使用較小的線程池,一般為 CPU 核心數+1。因為 CPU 密集型任務 使得 CPU 使用率 很高,若開過多的線程數,會造成 CPU 過度切換。?IO 密集型任務
可以使用稍大的線程池,一般為 2*CPU 核心數。IO 密集型任務 CPU 使用率 并不高,因此可以讓 CPU 在等待 IO 的時候有其他線程去處理別的任務,充分利用 CPU 時間。
Tomcat 在分發 web 請求時使用了線程池來處理。
public interface BlockingQueue<E> extends Queue<E> { // 將給定元素設置到隊列中,如果設置成功返回true, 否則返回false。如果是往限定了長度的隊列中設置值,推薦使用offer()方法。 boolean add(E e); // 將給定的元素設置到隊列中,如果設置成功返回true, 否則返回false. e的值不能為空,否則拋出空指針異常。 boolean offer(E e); // 將元素設置到隊列中,如果隊列中沒有多余的空間,該方法會一直阻塞,直到隊列中有多余的空間。 void put(E e) throws InterruptedException; // 將給定元素在給定的時間內設置到隊列中,如果設置成功返回true, 否則返回false. boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; // 從隊列中獲取值,如果隊列中沒有值,線程會一直阻塞,直到隊列中有值,并且該方法取得了該值。 E take() throws InterruptedException; // 在給定的時間里,從隊列中獲取值,時間到了直接調用普通的 poll()方法,為null則直接返回null。 E poll(long timeout, TimeUnit unit) throws InterruptedException; // 獲取隊列中剩余的空間。 int remainingCapacity(); // 從隊列中移除指定的值。 boolean remove(Object o); // 判斷隊列中是否擁有該值。 public boolean contains(Object o); // 將隊列中值,全部移除,并發設置到給定的集合中。 int drainTo(Collection<? super E> c); // 指定最多數量限制將隊列中值,全部移除,并發設置到給定的集合中。 int drainTo(Collection<? super E> c, int maxElements);}
?ArrayBlockingQueue
基于數組的阻塞隊列實現,在 ArrayBlockingQueue 內部,維護了一個定長數組,以便緩存隊列中的數據對象,這是一個常用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue 內部還保存著兩個整形變量,分別標識著隊列的頭部和尾部在數組中的位置。
ArrayBlockingQueue 在生產者放入數據 和 消費者獲取數據時,都是共用同一個鎖對象,由此也意味著兩者無法真正并行運行,這點尤其不同于 LinkedBlockingQueue。ArrayBlockingQueue 和 LinkedBlockingQueue 間還有一個明顯的不同之處在于,前者在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而后者則會生成一個額外的 Node 對象。這在長時間內需要高效并發地處理大批量數據的系統中,其對于 GC 的影響還是存在一定的區別。而在創建 ArrayBlockingQueue 時,我們還可以控制對象的內部鎖是否采用公平鎖,默認采用非公平鎖。
?LinkedBlockingQueue
基于鏈表的阻塞隊列,同 ArrayListBlockingQueue 類似,其內部也維持著一個數據緩沖隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,并緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區達到最大值緩存容量時(LinkedBlockingQueue 可以通過構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對于消費者這端的處理也基于同樣的原理。而 LinkedBlockingQueue 之所以能夠高效的處理并發數據,還因為其對于生產者端和消費者端分別采用了獨立的鎖來控制數據同步,這也意味著在高并發的情況下生產者和消費者可以并行地操作隊列中的數據,以此來提高整個隊列的并發性能。
需要注意的是,如果構造一個 LinkedBlockingQueue 對象,而沒有指定其容量大小,LinkedBlockingQueue 會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大于消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。
?PriorityBlockingQueue
基于優先級的阻塞隊列(優先級的判斷通過構造函數傳入的 Compator 對象來決定),但需要注意的是 PriorityBlockingQueue 并不會阻塞數據生產者,而只會在沒有可消費的數據時,阻塞數據的消費者。因此使用的時候要特別注意,生產者生產數據的速度絕對不能快于消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。在實現 PriorityBlockingQueue 時,內部控制線程同步的鎖采用的是公平鎖。
互斥同步最主要的問題就是進行線程阻塞和喚醒所帶來的性能的額外損耗,因此這種同步被稱為阻塞同步,它屬于一種悲觀的并發策略,我們稱之為悲觀鎖。隨著硬件和操作系統指令集的發展和優化,產生了非阻塞同步,被稱為樂觀鎖。簡單地說,就是先進行操作,操作完成之后再判斷操作是否成功,是否有并發問題,如果有則進行失敗補償,如果沒有就算操作成功,這樣就從根本上避免了同步鎖的弊端。
目前,在 Java 中應用最廣泛的非阻塞同步就是 CAS。從 JDK1.5 以后,可以使用 CAS 操作,該操作由 sun.misc.Unsafe 類里的 compareAndSwapInt() 和 compareAndSwapLong() 等方法實現。通常情況下 sun.misc.Unsafe 類 對于開發者是不可見的,因此,JDK 提供了很多 CAS 包裝類 簡化開發者的使用,如 AtomicInteger。使用 Java 自帶的 Atomic 原子類,可以避免同步鎖帶來的并發訪問性能降低的問題,減少犯錯的機會。
到此,相信大家對“Java并發編程在各主流框架中怎么應用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。