您好,登錄后才能下訂單哦!
本篇內容介紹了“zookeeper分布式鎖實現的方法是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
一。為何使用分布式鎖?
當應用服務器數量超過1臺,對相同數據的訪問可能造成訪問沖突(特別是寫沖突)。單純使用關系數據庫比如MYSQL的應用可以借助于事務來實現鎖,也可以使用版本號等實現樂觀鎖,最大的缺陷就是可用性降低(性能差)。對于GLEASY這種滿足大規模并發訪問請求的應用來說,使用數據庫事務來實現數據庫就有些捉襟見肘了。另外對于一些不依賴數據庫的應用,比如分布式文件系統,為了保證同一文件在大量讀寫操作情況下的正確性,必須引入分布式鎖來約束對同一文件的并發操作。
二。對分布式鎖的要求
1.高性能(分布式鎖不能成為系統的性能瓶頸)
2.避免死鎖(拿到鎖的結點掛掉不會導致其它結點永遠無法繼續)
3.支持鎖重入
三。方案1,基于zookeeper的分布式鎖
/** * DistributedLockUtil.java * 分布式鎖工廠類,所有分布式請求都由該工廠類負責 **/ public class DistributedLockUtil { private static Object schemeLock = new Object(); private static Object mutexLock = new Object(); private static Map<String,Object> mutexLockMap = new ConcurrentHashMap(); private String schema; private Map<String,DistributedReentrantLock> cache = new ConcurrentHashMap<String,DistributedReentrantLock>(); private static Map<String,DistributedLockUtil> instances = new ConcurrentHashMap(); public static DistributedLockUtil getInstance(String schema){ DistributedLockUtil u = instances.get(schema); if(u==null){ synchronized(schemeLock){ u = instances.get(schema); if(u == null){ u = new DistributedLockUtil(schema); instances.put(schema, u); } } } return u; } private DistributedLockUtil(String schema){ this.schema = schema; } private Object getMutex(String key){ Object mx = mutexLockMap.get(key); if(mx == null){ synchronized(mutexLock){ mx = mutexLockMap.get(key); if(mx==null){ mx = new Object(); mutexLockMap.put(key,mx); } } } return mx; } private DistributedReentrantLock getLock(String key){ DistributedReentrantLock lock = cache.get(key); if(lock == null){ synchronized(getMutex(key)){ lock = cache.get(key); if(lock == null){ lock = new DistributedReentrantLock(key,schema); cache.put(key, lock); } } } return lock; } public void reset(){ for(String s : cache.keySet()){ getLock(s).unlock(); } } /** * 嘗試加鎖 * 如果當前線程已經擁有該鎖的話,直接返回false,表示不用再次加鎖,此時不應該再調用unlock進行解鎖 * * @param key * @return * @throws InterruptedException * @throws KeeperException */ public LockStat lock(String key) throws InterruptedException, KeeperException{ if(getLock(key).isOwner()){ return LockStat.NONEED; } getLock(key).lock(); return LockStat.SUCCESS; } public void clearLock(String key) throws InterruptedException, KeeperException{ synchronized(getMutex(key)){ DistributedReentrantLock l = cache.get(key); l.clear(); cache.remove(key); } } public void unlock(String key,LockStat stat) throws InterruptedException, KeeperException{ unlock(key,stat,false); } public void unlock(String key,LockStat stat,boolean keepalive) throws InterruptedException, KeeperException{ if(stat == null) return; if(LockStat.SUCCESS.equals(stat)){ DistributedReentrantLock lock = getLock(key); boolean hasWaiter = lock.unlock(); if(!hasWaiter && !keepalive){ synchronized(getMutex(key)){ lock.clear(); cache.remove(key); } } } } public static enum LockStat{ NONEED, SUCCESS } }
/** *DistributedReentrantLock.java *本地線程之間鎖爭用,先使用虛擬機內部鎖機制,減少結點間通信開銷 */ public class DistributedReentrantLock { private static final Logger logger = Logger.getLogger(DistributedReentrantLock.class); private ReentrantLock reentrantLock = new ReentrantLock(); private WriteLock writeLock; private long timeout = 3*60*1000; private final Object mutex = new Object(); private String dir; private String schema; private final ExitListener exitListener = new ExitListener(){ @Override public void execute() { initWriteLock(); } }; private synchronized void initWriteLock(){ logger.debug("初始化writeLock"); writeLock = new WriteLock(dir,new LockListener(){ @Override public void lockAcquired() { synchronized(mutex){ mutex.notify(); } } @Override public void lockReleased() { } },schema); if(writeLock != null && writeLock.zk != null){ writeLock.zk.addExitListener(exitListener); } synchronized(mutex){ mutex.notify(); } } public DistributedReentrantLock(String dir,String schema) { this.dir = dir; this.schema = schema; initWriteLock(); } public void lock(long timeout) throws InterruptedException, KeeperException { reentrantLock.lock();//多線程競爭時,先拿到第一層鎖 try{ boolean res = writeLock.trylock(); if(!res){ synchronized(mutex){ mutex.wait(timeout); } if(writeLock == null || !writeLock.isOwner()){ throw new InterruptedException("鎖超時"); } } }catch(InterruptedException e){ reentrantLock.unlock(); throw e; }catch(KeeperException e){ reentrantLock.unlock(); throw e; } } public void lock() throws InterruptedException, KeeperException { lock(timeout); } public void destroy() throws KeeperException { writeLock.unlock(); } public boolean unlock(){ if(!isOwner()) return false; try{ writeLock.unlock(); reentrantLock.unlock();//多線程競爭時,釋放最外層鎖 }catch(RuntimeException e){ reentrantLock.unlock();//多線程競爭時,釋放最外層鎖 throw e; } return reentrantLock.hasQueuedThreads(); } public boolean isOwner() { return reentrantLock.isHeldByCurrentThread() && writeLock.isOwner(); } public void clear() { writeLock.clear(); } }
/** *WriteLock.java *基于zk的鎖實現 *一個最簡單的場景如下: *1.結點A請求加鎖,在特定路徑下注冊自己(會話自增結點),得到一個ID號1 *2.結點B請求加鎖,在特定路徑下注冊自己(會話自增結點),得到一個ID號2 *3.結點A獲取所有結點ID,判斷出來自己是最小結點號,于是獲得鎖 *4.結點B獲取所有結點ID,判斷出來自己不是最小結點,于是監聽小于自己的最大結點(結點A)變更事件 *5.結點A拿到鎖,處理業務,處理完,釋放鎖(刪除自己) *6.結點B收到結點A變更事件,判斷出來自己已經是最小結點號,于是獲得鎖。 */ public class WriteLock extends ZkPrimative { private static final Logger LOG = Logger.getLogger(WriteLock.class); private final String dir; private String id; private LockNode idName; private String ownerId; private String lastChildId; private byte[] data = {0x12, 0x34}; private LockListener callback; public WriteLock(String dir,String schema) { super(schema,true); this.dir = dir; } public WriteLock(String dir,LockListener callback,String schema) { this(dir,schema); this.callback = callback; } public LockListener getLockListener() { return this.callback; } public void setLockListener(LockListener callback) { this.callback = callback; } public synchronized void unlock() throws RuntimeException { if(zk == null || zk.isClosed()){ return; } if (id != null) { try { zk.delete(id, -1); } catch (InterruptedException e) { LOG.warn("Caught: " + e, e); //set that we have been interrupted. Thread.currentThread().interrupt(); } catch (KeeperException.NoNodeException e) { // do nothing } catch (KeeperException e) { LOG.warn("Caught: " + e, e); throw (RuntimeException) new RuntimeException(e.getMessage()). initCause(e); }finally { if (callback != null) { callback.lockReleased(); } id = null; } } } private class LockWatcher implements Watcher { public void process(WatchedEvent event) { LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + event.getState() + " type " + event.getType()); try { trylock(); } catch (Exception e) { LOG.warn("Failed to acquire lock: " + e, e); } } } private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) throws KeeperException, InterruptedException { List<String> names = zookeeper.getChildren(dir, false); for (String name : names) { if (name.startsWith(prefix)) { id = dir + "/" + name; if (LOG.isDebugEnabled()) { LOG.debug("Found id created last time: " + id); } break; } } if (id == null) { id = zookeeper.create(dir + "/" + prefix, data, acl, EPHEMERAL_SEQUENTIAL); if (LOG.isDebugEnabled()) { LOG.debug("Created id: " + id); } } } public void clear() { if(zk == null || zk.isClosed()){ return; } try { zk.delete(dir, -1); } catch (Exception e) { LOG.error("clear error: " + e,e); } } public synchronized boolean trylock() throws KeeperException, InterruptedException { if(zk == null){ LOG.info("zk 是空"); return false; } if (zk.isClosed()) { LOG.info("zk 已經關閉"); return false; } ensurePathExists(dir); LOG.debug("id:"+id); do { if (id == null) { long sessionId = zk.getSessionId(); String prefix = "x-" + sessionId + "-"; idName = new LockNode(id); LOG.debug("idName:"+idName); } if (id != null) { List<String> names = zk.getChildren(dir, false); if (names.isEmpty()) { LOG.warn("No children in: " + dir + " when we've just " + "created one! Lets recreate it..."); id = null; } else { SortedSet<LockNode> sortedNames = new TreeSet<LockNode>(); for (String name : names) { sortedNames.add(new LockNode(dir + "/" + name)); } ownerId = sortedNames.first().getName(); LOG.debug("all:"+sortedNames); SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName); LOG.debug("less than me:"+lessThanMe); if (!lessThanMe.isEmpty()) { LockNode lastChildName = lessThanMe.last(); lastChildId = lastChildName.getName(); if (LOG.isDebugEnabled()) { LOG.debug("watching less than me node: " + lastChildId); } Stat stat = zk.exists(lastChildId, new LockWatcher()); if (stat != null) { return Boolean.FALSE; } else { LOG.warn("Could not find the" + " stats for less than me: " + lastChildName.getName()); } } else { if (isOwner()) { if (callback != null) { callback.lockAcquired(); } return Boolean.TRUE; } } } } } while (id == null); return Boolean.FALSE; } public String getDir() { return dir; } public boolean isOwner() { return id != null && ownerId != null && id.equals(ownerId); } public String getId() { return this.id; } }
使用本方案實現的分布式鎖,可以很好地解決鎖重入的問題,而且使用會話結點來避免死鎖;性能方面,根據筆者自測結果,加鎖解鎖各一次算是一個操作,本方案實現的分布式鎖,TPS大概為2000-3000,性能比較一般
“zookeeper分布式鎖實現的方法是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。