您好,登錄后才能下訂單哦!
并發編程的目的是讓程序運行更快,但是使用并發并不定會使得程序運行更快,只有當程序的并發數量達到一定的量級的時候才能體現并發編程的優勢。所以談并發編程在高并發量的時候才有意義。雖然目前還沒有開發過高并發量的程序,但是學習并發是為了更好理解一些分布式架構。那么當程序的并發量不高,比如是單線程的程序,單線程的執行效率反而比多線程更高。這又是為什么呢?熟悉操作系統的應該知道,CPU是通過給每個線程分配時間片的方式實現多線程的。這樣,當CPU從一個任務切換到另一個任務的時候,會保存上一個任務的狀態,當執行完這個任務的時候CPU就會繼續上一個任務的狀態繼續執行。這個過程稱為上下文切換。
在Java多線程中,volatile關鍵字個synchronized關鍵字扮演了重要的角色,它們都可以實現線程的同步,但是在底層是如何實現的呢?
volatile
volatile只能保證變量對各個線程的可見性,但不能保證原子性。關于 Java語言 volatile 的使用方法就不多說了,我的建議是 除了 配合package java.util.concurrent.atomic 中的類庫,其他情況一概別用。更多的解釋 參見 這篇文章。
引子
參見如下代碼
package org.go; public class Go { volatile int i = 0; private void inc() { i++; } public static void main(String[] args) { Go go = new Go(); for (int i = 0; i < 10; i++) { new Thread(() -> { for (int j = 0; j < 1000; j++) go.inc(); }).start(); } while(Thread.activeCount()>1){ Thread.yield(); } System.out.println(go.i); } }
每次執行上述代碼結果都不同,輸出的數字總是小于10000.這是因為在進行inc()的時候,i++并不是原子操作。或許有些人會提議說用 synchronized 來同步inc() , 或者 用 package java.util.concurrent.locks 下的鎖去控制線程同步。但它們都不如下面的解決方案:
package org.go; import java.util.concurrent.atomic.AtomicInteger; public class Go { AtomicInteger i = new AtomicInteger(0); private void inc() { i.getAndIncrement(); } public static void main(String[] args) { Go go = new Go(); for (int i = 0; i < 10; i++) { new Thread(() -> { for (int j = 0; j < 1000; j++) go.inc(); }).start(); } while(Thread.activeCount()>1){ Thread.yield(); } System.out.println(go.i); } }
這時,如果你不了解 atomic 的實現,你一定會不屑的懷疑 說不定 AtomicInteger 底層就是使用鎖來實現的所以也未必高效。那么究竟是什么,我們來看看。
原子類的內部實現
無論是AtomicInteger 或者是 ConcurrentLinkedQueue的節點類ConcurrentLinkedQueue.Node,他們都有個靜態變量
private static final sun.misc.Unsafe UNSAFE;,這個類是實現原子語義的C++對象sun::misc::Unsafe的Java封裝。想看看底層實現,正好我手邊有gcc4.8的源代碼,對照本地路徑,很方便找到Github的路徑,看這里。
以接口 getAndIncrement()的實現舉例
AtomicInteger.java
private static final Unsafe unsafe = Unsafe.getUnsafe(); public final int getAndIncrement() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; } } public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }
留意這個for循環,只有在compareAndSet成功時才會返回。否則就一直compareAndSet。
調用了compareAndSet實現。此處,我注意到 Oracle JDK的實現是略有不同的,如果你查看JDK下的src,你可以看到Oracle JDK是調用的Unsafe的getAndIncrement(),但我相信Oracle JDK實現Unsafe.java的時候應該也是只調用compareAndSet,因為一個compareAndSet就可以實現增加、減少、設值的原子操作了。
Unsafe.java
public native boolean compareAndSwapInt(Object obj, long offset, int expect, int update);
通過JNI調用的C++的實現。
natUnsafe.cc
jboolean sun::misc::Unsafe::compareAndSwapInt (jobject obj, jlong offset, jint expect, jint update) { jint *addr = (jint *)((char *)obj + offset); return compareAndSwap (addr, expect, update); } static inline bool compareAndSwap (volatile jint *addr, jint old, jint new_val) { jboolean result = false; spinlock lock; if ((result = (*addr == old))) *addr = new_val; return result; }
Unsafe::compareAndSwapInt調用 static 函數 compareAndSwap。而compareAndSwap又使用spinlock作為鎖。這里的spinlock有LockGuard的意味,構造時加鎖,析構時釋放。
我們需要聚焦在spinlock里。這里是保證spinlock釋放之前都是原子操作的真正實現。
什么是spinlock
spinlock,即自旋鎖,一種循環等待(busy waiting)以獲取資源的鎖。不同于mutex的阻塞當前線程、釋放CPU資源以等待需求的資源,spinlock不會進入掛起、等待條件滿足、重新競爭CPU的過程。這意味著只有在 等待鎖的代價小于線程執行上下文切換的代價時,Spinlock才優于mutex。
natUnsafe.cc
class spinlock { static volatile obj_addr_t lock; public: spinlock () { while (! compare_and_swap (&lock, 0, 1)) _Jv_ThreadYield (); } ~spinlock () { release_set (&lock, 0); } };
以一個靜態變量 static volatile obj_addr_t lock; 作為標志位,通過C++ RAII實現一個Guard,所以所謂的鎖其實是 靜態成員變量obj_addr_t lock,C++中volatile 并不能保證同步,保證同步的是構造函數里調用的 compare_and_swap和一個static變量lock.這個lock變量是1的時候,就需要等;是0的時候,就通過原子操作把它置為1,表示自己獲得了鎖。
這里會用一個static變量實在是一個意外,如此相當于所有的無鎖結構都共用同一個變量(實際就是size_t)來區分是否加鎖。當這個變量置為1時,其他用到spinlock的都需要等。 為什么不在sun::misc::Unsafe添加一個私有變量 volatile obj_addr_t lock;,并作為構造參數傳給spinlock?這樣相當于每個UnSafe共享一個標志位,效果會不會好一些?
_Jv_ThreadYield在下面的文件里,通過系統調用sched_yield(man 2 sched_yield)讓出CPU資源。宏HAVE_SCHED_YIELD在configure里定義,意味著編譯時如果取消定義,spinlock就稱為真正意義的自旋鎖了。
posix-threads.h
inline void _Jv_ThreadYield (void) { #ifdef HAVE_SCHED_YIELD sched_yield (); #endif /* HAVE_SCHED_YIELD */ }
這個lock.h在不同平臺有著不同的實現,我們以ia64(Intel AMD x64)平臺舉例,其他的實現可以在 這里 看到。
ia64/locks.h
typedef size_t obj_addr_t; inline static bool compare_and_swap(volatile obj_addr_t *addr, obj_addr_t old, obj_addr_t new_val) { return __sync_bool_compare_and_swap (addr, old, new_val); } inline static void release_set(volatile obj_addr_t *addr, obj_addr_t new_val) { __asm__ __volatile__("" : : : "memory"); *(addr) = new_val; }
__sync_bool_compare_and_swap 是gcc內建函數,匯編指令"memory"完成內存屏障。
總之,硬件上保證多核CPU同步,而Unsafe的實現也是盡可能的高效。GCC-java的還算高效,相信Oracle 和 OpenJDK不會更差。
原子操作 和 GCC內建的原子操作
原子操作
Java的表達式以及C++的表達式,都不是原子操作,也就是說 你在代碼里:
//假設i是線程間共享的變量 i++;
在多線程環境下,i的訪問是非原子性的,實際分成如下三個操作數:
編譯器會改變執行的時序,因此執行結果可能并非所期望的。
GCC內建的原子操作
gcc內建了如下的原子操作,這些原子操作從4.1.2被加入。而之前,他們是使用內聯的匯編實現的。
type __sync_fetch_and_add (type *ptr, type value, ...) type __sync_fetch_and_sub (type *ptr, type value, ...) type __sync_fetch_and_or (type *ptr, type value, ...) type __sync_fetch_and_and (type *ptr, type value, ...) type __sync_fetch_and_xor (type *ptr, type value, ...) type __sync_fetch_and_nand (type *ptr, type value, ...) type __sync_add_and_fetch (type *ptr, type value, ...) type __sync_sub_and_fetch (type *ptr, type value, ...) type __sync_or_and_fetch (type *ptr, type value, ...) type __sync_and_and_fetch (type *ptr, type value, ...) type __sync_xor_and_fetch (type *ptr, type value, ...) type __sync_nand_and_fetch (type *ptr, type value, ...) bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...) type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...) __sync_synchronize (...) type __sync_lock_test_and_set (type *ptr, type value, ...) void __sync_lock_release (type *ptr, ...)
需要注意的是:
OpenJDK 的相關文件
下面列出一些Github上 OpenJDK9的原子操作實現,希望能幫助需要了解的人。畢竟OpenJDK比Gcc的實現應用更廣泛一些。————但終究沒有Oracle JDK的源碼,雖然據說OpenJDK與 Oracle的源碼差距很小。
AtomicInteger.java
Unsafe.java::compareAndExchangeObject
unsafe.cpp::Unsafe_CompareAndExchangeObject
oop.inline.hpp::oopDesc::atomic_compare_exchange_oop
atomic_linux_x86.hpp::Atomic::cmpxchg
inline jlong Atomic::cmpxchg (jlong exchange_value, volatile jlong* dest, jlong compare_value, cmpxchg_memory_order order) { bool mp = os::is_MP(); __asm__ __volatile__ (LOCK_IF_MP(%4) "cmpxchgq %1,(%3)" : "=a" (exchange_value) : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp) : "cc", "memory"); return exchange_value; }
這里需要給不熟悉C/C++的Java程序員提示一下,嵌入匯編指令的格式如下
__asm__ [__volatile__](assembly template//匯編模板 : [output operand list]//輸入列表 : [input operand list]//輸出列表 : [clobber list])//破壞列表
匯編模板中的%1,%3,%4對應于后面的參數列表{"r" (exchange_value),"r" (dest),"r" (mp)},參數列表以逗號分隔,從0排序。輸出參數放第一個冒號右邊,輸出參數放第二個冒號右邊。"r"表示放到通用寄存器,"a"表示寄存器EAX,有"="表示用于輸出(寫還)。cmpxchg指令隱含使用EAX寄存器即參數%2.
其他細節就不在此羅列了,Gcc的實現是把要交換的指針傳下來,對比成功后直接賦值(賦值非原子),原子性通過spinlock保證。
OpenJDK的實現是把要交換的指針傳下來,直接通過匯編指令cmpxchgq賦值,原子性通過匯編指令保證。當然gcc的spinlock底層也是通過cmpxchgq保證的。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。