您好,登錄后才能下訂單哦!
這篇文章主要講解了“如何理解多線程的并發問題”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“如何理解多線程的并發問題”吧!
為什么多線程同時訪問(讀寫)同個變量,會有并發問題?
Java 內存模型規定了所有的變量都存儲在主內存中,每條線程有自己的工作內存。
線程的工作內存中保存了該線程中用到的變量的主內存副本拷貝,線程對變量的所有操作都必須在工作內存中進行,而不能直接讀寫主內存。
線程訪問一個變量,首先將變量從主內存拷貝到工作內存,對變量的寫操作,不會馬上同步到主內存。
不同的線程之間也無法直接訪問對方工作內存中的變量,線程間變量的傳遞均需要自己的工作內存和主存之間進行數據同步進行。
Java 內存模型(JMM) 作用于工作內存(本地內存)和主存之間數據同步過程,它規定了如何做數據同步以及什么時候做數據同步,如下圖。
原子性:在一個操作中,CPU 不可以在中途暫停然后再調度,即不被中斷操作,要么執行完成,要么就不執行。
可見性:多個線程訪問同一個變量時,一個線程修改了這個變量的值,其他線程能夠立即看得到修改的值。
有序性:程序執行的順序按照代碼的先后順序執行。
下面結合不同場景分析解決并發問題的處理方式。
保證可見性,不保證原子性
當寫一個volatile變量時,JVM會把本地內存的變量強制刷新到主內存中
這個寫操作導致其他線程中的緩存無效,其他線程讀,會從主內存讀。volatile的寫操作對其它線程實時可見。
禁止指令重排序指令重排序是指編譯器和處理器為了優化程序性能對指令進行排序的一種手段,需要遵守一定規則:
不會對存在依賴關系的指令重排序,例如 a = 1;b = a; a 和b存在依賴關系,不會被重排序
不能影響單線程下的執行結果。比如:a=1;b=2;c=a+b這三個操作,前兩個操作可以重排序,但是c=a+b不會被重排序,因為要保證結果是3
對于一個變量,只有一個線程執行寫操作,其它線程都是讀操作,這時候可以用 volatile 修飾這個變量。
public class TestInstance { private static volatile TestInstance mInstance; public static TestInstance getInstance(){ //1 if (mInstance == null){ //2 synchronized (TestInstance.class){ //3 if (mInstance == null){ //4 mInstance = new TestInstance(); //5 } } } return mInstance; } 復制代碼
}
假如沒有用volatile,并發情況下會出現問題,線程A執行到注釋5 new TestInstance() 的時候,分為如下幾個幾步操作:
分配內存
初始化對象
mInstance 指向內存
這時候如果發生指令重排,執行順序是132,執行到第3的時候,線程B剛好進來了,并且執行到注釋2,這時候判斷mInstance 不為空,直接使用一個未初始化的對象。所以使用volatile關鍵字來禁止指令重排序。
在JVM底層volatile是采用 內存屏障 來實現的,內存屏障會提供3個功能:
它確保指令重排序時不會把其后面的指令排到內存屏障之前的位置,也不會把前面的指令排到內存屏障的后面;即在執行到內存屏障這句指令時,在它前面的操作已經全部完成;
它會強制將緩存的修改操作立即寫到主內存
寫操作會導致其它CPU中的緩存行失效,寫之后,其它線程的讀操作會從主內存讀。
**volatile 只能保證可見性,不能保證原子性。**寫操作對其它線程可見,但是不能解決多個線程同時寫的問題。
多個線程同時寫一個變量。
例如售票,余票是100張,窗口A和窗口B同時各賣出一張票, 假如余票變量用 volatile 修飾,是有問題的。
A窗口獲取余票是100,B窗口獲取余票也是100,A賣出一張變成99,刷新回主內存,同時B賣出一張變成99,也刷新回主內存,會導致最終主內存余票是99而不是98。
前面說到 volatile 的局限性,就是多個線程同時寫的情況,這種情況一般可以使用 Synchronized 。
Synchronized 可以保證同一時刻,只有一個線程可執行某個方法或某個代碼塊。
public class SynchronizedTest { public static void main(String[] args) { synchronized (SynchronizedTest.class) { System.out.println("123"); } method(); } private static void method() { } } 復制代碼
將這段代碼先用 javac 命令編譯,再 java p -v SynchronizedTest.class 命令查看字節碼,部分字節碼如下
public static void main(java.lang.String[]); descriptor: ([Ljava/lang/String;)V flags: ACC_PUBLIC, ACC_STATIC Code: stack=2, locals=3, args_size=1 0: ldc #2 // class com/lanshifu/opengldemo/test/SynchronizedTest 2: dup 3: astore_1 4: monitorenter 5: getstatic #3 // Field java/lang/System.out:Ljava/io/PrintStream; 8: ldc #4 // String 123 10: invokevirtual #5 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 13: aload_1 14: monitorexit 15: goto 23 18: astore_2 19: aload_1 20: monitorexit 21: aload_2 22: athrow 23: invokestatic #6 // Method method:()V 26: return 復制代碼
可以看到 4: monitorenter 和 14: monitorexit ,中間是打印的語句。
執行同步代碼塊,首先會執行 monitorenter 指令,然后執行同步代碼塊中的代碼,退出同步代碼塊的時候會執行 monitorexit 指令 。
使用Synchronized進行同步,其關鍵就是必須要對對象的監視器monitor進行獲取,當線程獲取monitor后才能繼續往下執行,否則就進入同步隊列,線程狀態變成BLOCK,同一時刻只有一個線程能夠獲取到monitor,當監聽到monitorexit被調用,隊列里就有一個線程出隊,獲取monitor。
每個對象擁有一個計數器,當線程獲取該對象鎖后,計數器就會加一,釋放鎖后就會將計數器減一,所以只要這個鎖的計數器大于0,其它線程訪問就只能等待。
大家對Synchronized的理解可能就是重量級鎖,但是Java1.6對 Synchronized 進行了各種優化之后,有些情況下它就并不那么重,Java1.6 中為了減少獲得鎖和釋放鎖帶來的性能消耗而引入的偏向鎖和輕量級鎖。
偏向鎖:大多數情況下,鎖不僅不存在多線程競爭,而且總是由同一線程多次獲得,為了讓線程獲得鎖的代價更低而引入了偏向鎖。
當一個線程A訪問加了同步鎖的代碼塊時,會在對象頭中存 儲當前線程的id,后續這個線程進入和退出這段加了同步鎖的代碼塊時,不需要再次加鎖和釋放鎖。
輕量級鎖:在偏向鎖情況下,如果線程B也訪問了同步代碼塊,比較對象頭的線程id不一樣,會升級為輕量級鎖,并且通過自旋的方式來獲取輕量級鎖。
重量級鎖:如果線程A和線程B同時訪問同步代碼塊,則輕量級鎖會升級為重量級鎖,線程A獲取到重量級鎖的情況下,線程B只能入隊等待,進入BLOCK狀態。
不能設置鎖超時時間
不能通過代碼釋放鎖
容易造成死鎖
上面說到 Synchronized 的缺點,不能設置鎖超時時間和不能通過代碼釋放鎖, ReentranLock就可以解決這個問題。
在多個條件變量和高度競爭鎖的地方,用ReentrantLock更合適,ReentrantLock還提供了 Condition ,對線程的等待和喚醒等操作更加靈活,一個ReentrantLock可以有多個Condition實例,所以更有擴展性。
lock 和 unlock
ReentrantLock reentrantLock = new ReentrantLock(); System.out.println("reentrantLock->lock"); reentrantLock.lock(); try { System.out.println("睡眠2秒..."); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { reentrantLock.unlock(); System.out.println("reentrantLock->unlock"); } 復制代碼
實現可定時的鎖請求:tryLock
public static void main(String[] args) { ReentrantLock reentrantLock = new ReentrantLock(); Thread thread1 = new Thread_tryLock(reentrantLock); thread1.setName("thread1"); thread1.start(); Thread thread2 = new Thread_tryLock(reentrantLock); thread2.setName("thread2"); thread2.start(); } static class Thread_tryLock extends Thread { ReentrantLock reentrantLock; public Thread_tryLock(ReentrantLock reentrantLock) { this.reentrantLock = reentrantLock; } @Override public void run() { try { System.out.println("try lock:" + Thread.currentThread().getName()); boolean tryLock = reentrantLock.tryLock(3, TimeUnit.SECONDS); if (tryLock) { System.out.println("try lock success :" + Thread.currentThread().getName()); System.out.println("睡眠一下:" + Thread.currentThread().getName()); Thread.sleep(5000); System.out.println("醒了:" + Thread.currentThread().getName()); } else { System.out.println("try lock 超時 :" + Thread.currentThread().getName()); } } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("unlock:" + Thread.currentThread().getName()); reentrantLock.unlock(); } } } 復制代碼
打印的日志:
try lock:thread1 try lock:thread2 try lock success :thread2 睡眠一下:thread2 try lock 超時 :thread1 unlock:thread1 Exception in thread "thread1" java.lang.IllegalMonitorStateException at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151) at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261) at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457) at com.lanshifu.demo_module.test.lock.ReentranLockTest$Thread_tryLock.run(ReentranLockTest.java:60) 醒了:thread2 unlock:thread2 復制代碼
上面演示了 trtLock 的使用, trtLock 設置獲取鎖的等待時間,超過3秒直接返回失敗,可以從日志中看到結果。 有異常是因為thread1獲取鎖失敗,不應該調用unlock。
public static void main(String[] args) { Thread_Condition thread_condition = new Thread_Condition(); thread_condition.setName("測試Condition的線程"); thread_condition.start(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } thread_condition.singal(); } static class Thread_Condition extends Thread { @Override public void run() { await(); } private ReentrantLock lock = new ReentrantLock(); public Condition condition = lock.newCondition(); public void await() { try { System.out.println("lock"); lock.lock(); System.out.println(Thread.currentThread().getName() + ":我在等待通知的到來..."); condition.await();//await 和 signal 對應 //condition.await(2, TimeUnit.SECONDS); //設置等待超時時間 System.out.println(Thread.currentThread().getName() + ":等到通知了,我繼續執行>>>"); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("unlock"); lock.unlock(); } } public void singal() { try { System.out.println("lock"); lock.lock(); System.out.println("我要通知在等待的線程,condition.signal()"); condition.signal();//await 和 signal 對應 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("unlock"); lock.unlock(); } } } 復制代碼
運行打印日志
lock 測試Condition的線程:我在等待通知的到來... lock 我要通知在等待的線程,condition.signal() unlock 測試Condition的線程:等到通知了,我繼續執行>>> unlock 復制代碼
上面演示了 Condition的 await 和 signal 使用,前提要先lock。
ReentrantLock 構造函數傳true表示公平鎖。
公平鎖表示線程獲取鎖的順序是按照線程加鎖的順序來分配的,即先來先得的順序。而非公平鎖就是一種鎖的搶占機制,是隨機獲得鎖的,可能會導致某些線程一致拿不到鎖,所以是不公平的。
ReentrantLock使用lock和unlock來獲得鎖和釋放鎖
unlock要放在finally中,這樣正常運行或者異常都會釋放鎖
使用condition的await和signal方法之前,必須調用lock方法獲得對象監視器
通過上面分析,并發嚴重的情況下,使用鎖顯然效率低下,因為同一時刻只能有一個線程可以獲得鎖,其它線程只能乖乖等待。
Java提供了并發包解決這個問題,接下來介紹并發包里一些常用的數據結構。
我們都知道HashMap是線程不安全的數據結構,HashTable則在HashMap基礎上,get方法和put方法加上Synchronized修飾變成線程安全,不過在高并發情況下效率底下,最終被 ConcurrentHashMap 替代。
ConcurrentHashMap 采用分段鎖,內部默認有16個桶,get和put操作,首先將key計算hashcode,然后跟16取余,落到16個桶中的一個,然后每個桶中都加了鎖(ReentrantLock),桶中是HashMap結構(數組加鏈表,鏈表過長轉紅黑樹)。
所以理論上最多支持16個線程同時訪問。
鏈表結構的阻塞隊列,內部使用多個ReentrantLock
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } /** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } 復制代碼
源碼不貼太多,簡單說一下 LinkBlockingQueue 的邏輯:
從隊列獲取數據,如果隊列中沒有數據,會調用 notEmpty.await(); 進入等待。
在放數據進去隊列的時候會調用 notEmpty.signal(); ,通知消費者,1中的等待結束,喚醒繼續執行。
從隊列里取到數據的時候會調用 notFull.signal(); ,通知生產者繼續生產。
在put數據進入隊列的時候,如果判斷隊列中的數據達到最大值,那么會調用 notFull.await(); ,等待消費者消費掉,也就是等待3去取數據并且發出 notFull.signal(); ,這時候生產者才能繼續生產。
LinkBlockingQueue 是典型的生產者消費者模式,源碼細節就不多說。
內部采用CAS(compare and swap)保證原子性
舉一個int自增的例子
AtomicInteger atomicInteger = new AtomicInteger(0); atomicInteger.incrementAndGet();//自增 復制代碼
源碼看一下
/** * Atomically increments by one the current value. * * @return the updated value */ public final int incrementAndGet() { return U.getAndAddInt(this, VALUE, 1) + 1; } 復制代碼
U 是 Unsafe,看下 Unsafe#getAndAddInt
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } 復制代碼
通過 compareAndSwapInt 保證原子性。
面試中問到多線程并發問題,可以這么答:
當只有一個線程寫,其它線程都是讀的時候,可以用 volatile 修飾變量
當多個線程寫,那么一般情況下并發不嚴重的話可以用 Synchronized ,Synchronized并不是一開始就是重量級鎖,在并發不嚴重的時候,比如只有一個線程訪問的時候,是偏向鎖;當多個線程訪問,但不是同時訪問,這時候鎖升級為輕量級鎖;當多個線程同時訪問,這時候升級為重量級鎖。所以在并發不是很嚴重的情況下,使用Synchronized是可以的。不過Synchronized有局限性,比如不能設置鎖超時,不能通過代碼釋放鎖。
ReentranLock 可以通過代碼釋放鎖,可以設置鎖超時。
高并發下,Synchronized、ReentranLock 效率低,因為同一時刻只有一個線程能進入同步代碼塊,如果同時有很多線程訪問,那么其它線程就都在等待鎖。這個時候可以使用并發包下的數據結構,例如 ConcurrentHashMap , LinkBlockingQueue ,以及原子性的數據結構如: AtomicInteger 。
感謝各位的閱讀,以上就是“如何理解多線程的并發問題”的內容了,經過本文的學習后,相信大家對如何理解多線程的并發問題這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。