亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何實現基于Jedis+ZK的分布式序列號生成器

發布時間:2021-10-14 14:26:55 來源:億速云 閱讀:109 作者:iii 欄目:編程語言

本篇內容主要講解“如何實現基于Jedis+ZK的分布式序列號生成器”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“如何實現基于Jedis+ZK的分布式序列號生成器”吧!

部分源碼參考Jedis實現分布式鎖博客:

package com.xxx.arch.seq.utlis;

import com.xxx.arch.seq.client.redis.RedisSEQ;
import lombok.extern.slf4j.Slf4j;


/**
 * arch-seq 唯一code 獲取客戶端
 *
 * @author jdkleo
 */
@Slf4j
public class SEQUtil {

    /**
     * 生成默認KEY的UUID規則: 日期yyMMdd 6位 + 分布式seqID 10位,總共6 + 10 = 16位
     *
     * @param
     * @return
     */
    public static long getSEQ() {
        return RedisSEQ.getSEQ();
    }

    /**
     * 生成默認KEY連續的UUID,共total個
     *
     * @param total - 連續多少個
     * @return
     */
    public static long[] getSEQ(long total) {
        long value = RedisSEQ.getSEQ(total);
        return getValueArray(value, (int) total);
    }

    /**
     * 生成指定KEY的UUID規則: 日期yyMMdd 6位 + 分布式seqID 10位,總共6 + 10 = 16位
     *
     * @param seqName
     * @return
     */
    public static long getSEQ(String seqName) {
        return RedisSEQ.getSEQ(seqName, 1);
    }

    /**
     * 生成指定KEY連續的UUID,共total個
     *
     * @param seqName
     * @param total
     * @return
     */
    public static long[] getSEQ(String seqName, long total) {
        long value = RedisSEQ.getSEQ(seqName, total);
        return getValueArray(value, (int) total);
    }


    private static long[] getValueArray(long value, int total) {
        int n = total;
        long[] ret = new long[n];
        do {
            ret[n - 1] = value--;
        } while (--n > 0);
        return ret;
    }
}
package com.xxx.arch.seq.client.redis;

import com.xxx.arch.seq.client.tool.StreamCloseAble;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Redis版本SEQ(有序SEQ)
 *
 * @author zhangyang
 * @createDate 2019-01-22
 * @since 2.x
 */
@Slf4j
public class RedisSEQ extends StreamCloseAble {

    //默認的REDIS SEQ初始化狀態器KEY
    private static final String _DEFAULT_SEQ_INIT_KEY = "ARCH_SEQ_REDIS_SEQ_INIT";
    //默認的REDIS SEQ初始化狀態器VAL
    private static final String _DEFAULT_SEQ_INIT_PENDING = "pending";
    private static final String _DEFAULT_SEQ_INIT_READY = "ready";
    //SEQ初始化容器狀態
    private static volatile boolean _DEFAULT_SEQ_INIT_STATUS;

    //默認REDIS SEQ序列號的名稱
    private static final String _DEFAULT_SEQ_NAME = "ARCH_SEQ_REDIS_SEQ";

    //本地模式自增ID槽
    private final static AtomicInteger _LOCAL_INCR = new AtomicInteger(0);

    static {
        JedisConfig.JedisConn jedisConn = null;
        try {
            jedisConn = JedisConfig.getInstance().getConn();
            //if REDIS宕機或第一次:創建初始化狀態成功后,初始化redis keys(該方法可以恢復上次redis宕機數據)
            if (jedisConn.setnx(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_PENDING) == 1) {//搶到REDIS初始化鎖,并將其標記為pending狀態
                try {
                    RedisSEQTimer.getInstance().removeNotUsedKeys();
                    RedisSEQTimer.getInstance().initRedisKeys();//初始化REDIS,從ZK上讀取初始數據
                    jedisConn.set(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_READY);//初始化完成,標記為ready狀態
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                    //初始化arch.seq REDIS數據異常,有可能是ZK相關問題,也有可能是REDIS問題,請排查
                    log.error("Initialization of arch.seq REDIS data exceptions, may be ZK-related problems, may also be REDIS problems, please check redis key:{}", _DEFAULT_SEQ_INIT_KEY);
                    jedisConn.del(_DEFAULT_SEQ_INIT_KEY);
                }
            }
            //else{...} 沒搶到REDIS初始化鎖的話:不作任何處理
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            log.error("Initialization of arch.seq REDIS data exceptions, may be arch.seq's configuration is not ready");
        } finally {
            close(jedisConn);
        }
    }


    public static Long getSEQ() {
        return getSEQ(_DEFAULT_SEQ_NAME, 1);
    }

    public static Long getSEQ(long total) {
        return getSEQ(_DEFAULT_SEQ_NAME, total);
    }

    public static Long getSEQ(String seqName, long total) {
        Long result = null;
        JedisConfig.JedisConn jedisConn = null;
        try {
            //獲取redis連接
            jedisConn = JedisConfig.getInstance().getConn();
            //獲得REDIS初始化狀態不成功
            if (!tryInitReady(jedisConn)) {
                //arch.seq By REDIS版本不能正常初始化,請檢查REDIS服務。
                throw new RuntimeException("arch.seq By REDIS version cannot be initialized properly. Please check the REDIS service.");
            }
            //開啟分布式鎖
            //if (jedisConn.tryLock(seqName, 1000, 2000)) {
            try {
                String day = RedisSEQTimer.getInstance().getDayFormat();
                String incrVal = String.format("%010d", getIncrVal(jedisConn, day, seqName, total));
                result = Long.parseLong(day + incrVal);
            } catch (Exception e) {
                e.printStackTrace();
                log.warn("try lock failed,the arch.seq tool will be retry after sleep some times.");
                Thread.sleep(randTime());
                result = getSEQ(seqName, total);
            }
        } catch (Throwable e) {
            log.error(e.getMessage(), e);
            //redis生成失敗,返回本地ID:15位納秒+1位自然數輪詢
            //在獲取【自增序列號:{},序列號分布式鎖:{}】時發生了異常,系統返回了本地生成的自增序列號,不影響系統使用,但請管理員盡快協查!
            log.error("An exception occurred while acquiring self-incremental sequence number '{}', " +
                    "sequence number distributed lock '{}',The system returns the locally generated self-incremental " +
                    "sequence number, which does not affect the use of the system, but the administrator should check " +
                    "it as soon as possible.", seqName, seqName + "_LOCK");
            result = xUUID();
        } finally {
            //切記,一定要釋放分布式鎖(注:釋放鎖的同時jedisConn會自動釋放connection,無需再次CLOSE)
            if (jedisConn != null) {
                //jedisConn.unLock(seqName);
                jedisConn.close();
            }
            if (log.isDebugEnabled()) {
                log.debug(seqName + ":" + result + ", trace:\n" + getStackTrace());
            }
        }
        return result;
        //arch.seq發生了不可預測的異常,請聯系架構部處理!
        //throw new RuntimeException("arch.seq發生了不可預測的異常,請聯系架構部處理!");
    }

    private static String getStackTrace() {
        StringBuilder result = new StringBuilder();
        StackTraceElement[] element = Thread.currentThread().getStackTrace();
        for (int i = 0; i < element.length; i++) {
            result.append("\t").append(element[i]).append("\n");
        }
        return result.toString();
    }

    private static long randTime() {
        return new Random().nextInt(50) + 50;
    }

    private static boolean tryInitReady(JedisConfig.JedisConn jedisConn) throws InterruptedException {
        int times = 0;
        for (; times < 3; times++) {
            if (getSEQInitReady(jedisConn)) {
                break;
            }
            Thread.sleep(100);
        }
        return times < 3;
    }

    /**
     * 獲得SEQ初始化狀態
     *
     * @param jedisConn
     * @return
     */
    private static boolean getSEQInitReady(JedisConfig.JedisConn jedisConn) {
        if (!_DEFAULT_SEQ_INIT_STATUS) {
            synchronized (RedisSEQ.class) {
                if (!_DEFAULT_SEQ_INIT_STATUS) {
                    _DEFAULT_SEQ_INIT_STATUS = _DEFAULT_SEQ_INIT_READY.equals(jedisConn.get(_DEFAULT_SEQ_INIT_KEY));
                }
            }
        }
        return _DEFAULT_SEQ_INIT_STATUS;
    }

    /**
     * 獲得REDIS自增序列號最新值,并同步更新到ZK備份數據節點守護線程中
     *
     * @param jedisConn
     * @param day
     * @param seqName
     * @param total
     * @return
     */
    private static Long getIncrVal(JedisConfig.JedisConn jedisConn, String day, String seqName, long total) {
        String key = seqName + "_" + day;
        Long incrVal = total > 1 ? jedisConn.incr(key, total) : jedisConn.incr(key);
        if (incrVal > 9999999999L) {
            throw new RuntimeException("Exceed the maximum value,sequence:" + incrVal);
        }
        //塞到要更新的ZK隊列中
        RedisSEQTimer.getInstance().push(key, incrVal);
        return incrVal;
    }

    /**
     * 單機模式生成UUID
     *
     * @return
     */
    private static Long xUUID() {
        int rand = _LOCAL_INCR.incrementAndGet() % 10;
        String result = System.nanoTime() + "" + rand;
        return Long.parseLong(result);
    }

}
package com.xxx.arch.seq.client.redis;

import com.xxx.arch.seq.client.tool.StreamCloseAble;
import com.xxx.arch.seq.client.tool.ZkClient;
import com.xxx.arch.seq.client.zk.ZkClientUtil;
import org.apache.commons.lang3.time.DateUtils;

import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;


public class RedisSEQTimer extends StreamCloseAble {
    public static final String DAY_FORMAT_PATTERN = "yyMMdd";

    public static volatile RedisSEQTimer redisSEQTimer;

    private final ConcurrentHashMap<String, Long> REDIS_INCR_MAP = new ConcurrentHashMap<>();

    private final ZkClient _ZK_CLIENT = ZkClientUtil.getZkClient();

    private final String _DEFAULT_ZK_NAMESPACE = "/ARCH_SEQ_REDIS";

    //zk節點最大值每次遞增數
    private long _REDIS_MAXVALUE_INIT = 10_000L;

    private Timer _TIMER = new Timer(true);

    //是否處于清理狀態
    private volatile boolean _CLEAN_STATUS;

    //清理key
    private static final String _REMOVE_KEY = "ARCH_SEQ_REMOVE_KEY";

    private RedisSEQTimer() {
        super();
        //啟動zk巡查服務
        _TIMER.schedule(new TimerTask() {
            @Override
            public void run() {
                checkAndConfigure();
            }
        }, new Date(), 1 * 60 * 1000);

        //每天定時清理垃圾數據
        _TIMER.schedule(new TimerTask() {
            @Override
            public void run() {
                removeNotUsedKeys();
            }
        }, getFirstTime(), 24 * 60 * 60 * 1000);
    }


    public static RedisSEQTimer getInstance() {
        if (redisSEQTimer == null) {
            synchronized (RedisSEQTimer.class) {
                if (redisSEQTimer == null) {
                    redisSEQTimer = new RedisSEQTimer();
                }
            }
        }
        return redisSEQTimer;
    }

    /**
     * 定期更新ZK節點
     */
    private synchronized void checkAndConfigure() {
        if (_CLEAN_STATUS) {
            return;
        }
        if (REDIS_INCR_MAP.isEmpty()) {
            return;
        }
        String endDay = "_" + getDayFormat();
        List<String> notTodayKeys = new ArrayList<>();
        Set<Map.Entry<String, Long>> entrySet = REDIS_INCR_MAP.entrySet();
        for (Map.Entry<String, Long> entry : entrySet) {
            //不是今天的key不作處理
            if (!entry.getKey().endsWith(endDay)) {
                notTodayKeys.add(entry.getKey());
                return;
            }
            //將最新的值寫到zk節點上 節點格式:<ARCH_SEQ前綴>/KEY_yyMMdd
            String zkNode = _DEFAULT_ZK_NAMESPACE + "/" + entry.getKey();
            if (_ZK_CLIENT.exists(zkNode)) {
                _ZK_CLIENT.writeData(zkNode, entry.getValue());
            } else {
                try {
                    _ZK_CLIENT.createPersistent(zkNode, entry.getValue());
                } catch (RuntimeException e) {
                    //not to write log ,it's will be retry in next time.
                }
            }
        }
        ;
        if (!notTodayKeys.isEmpty()) {
            for (String key : notTodayKeys) {
                REDIS_INCR_MAP.remove(key);
            }
        }
    }

    /**
     * 刪除不再使用的KEY(包含redis和zk節點)
     */
    public synchronized void removeNotUsedKeys() {
        if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) {
            return;
        }
        _CLEAN_STATUS = true;
        JedisConfig.JedisConn jedisConn = null;
        String requestId = UUID.randomUUID().toString();
        boolean tryLock = false;
        try {
            List<String> list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE);

            //保留兩天。考慮到多個機器的時間可能不一致,如果在剛過零點刪除了昨天的sequence,另一臺機器可能還需要使用它,則會出現id重復
            Date now = new Date();
            Date yesterday = DateUtils.addDays(now, -1);
            List<String> keepDays = Arrays.asList(getDayFormat(now), getDayFormat(yesterday));

            if (list != null && !list.isEmpty()) {
                jedisConn = JedisConfig.getInstance().getConn();
                if (tryLock = jedisConn.tryLock(_REMOVE_KEY, requestId, 2000)) {
                    JedisConfig.JedisConn finalJedisConn = jedisConn;
                    for (String node : list) {
                        String dayPart = node.substring(node.length() - DAY_FORMAT_PATTERN.length());
                        if (!keepDays.contains(dayPart)) {
                            REDIS_INCR_MAP.remove(node);
                            finalJedisConn.del(node);
                            removeZkNode(node);
                        }
                    }
                }
            }
        } finally {
            _CLEAN_STATUS = false;
            if (jedisConn != null) {
                if (tryLock) {
                    jedisConn.unLock(_REMOVE_KEY, requestId);
                }
                jedisConn.close();
            }
        }
    }

    /**
     * 移除ZK節點
     *
     * @param node
     */
    private void removeZkNode(String node) {
        String path = _DEFAULT_ZK_NAMESPACE + "/" + node;
        if (_ZK_CLIENT.exists(path)) {
            try {
                _ZK_CLIENT.delete(path);
            } catch (Exception e) {
            }
        }
    }


    /**
     * 獲得每天定時任務的執行時間
     *
     * @return
     */
    private Date getFirstTime() {
        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.HOUR_OF_DAY, 24); // 24點  可以更改時間
        calendar.set(Calendar.MINUTE, getRandNum(6, 0)); // 0-5分鐘 隨機
        calendar.set(Calendar.SECOND, getRandNum(60, 0));// 0-59秒  隨機
        return calendar.getTime();
    }

    /**
     * 獲得區間隨機整數
     *
     * @param exclude - 最大數,exclude
     * @param from    - 最小數,include
     * @return
     */
    private int getRandNum(int exclude, int from) {
        return new Random().nextInt(exclude) + from;
    }


    /**
     * 將某天的KEY塞到相應隊列
     *
     * @param key - 業務KEY key_yyMMdd
     * @param val - 值
     * @return 是否成功
     */
    public synchronized void push(String key, Long val) {
        REDIS_INCR_MAP.put(key, val);
    }

    public String getDayFormat() {
        return getDayFormat(new Date());
    }

    public String getDayFormat(Date date) {
        return new SimpleDateFormat(DAY_FORMAT_PATTERN).format(date);
    }

    /**
     * 初始化redis keys
     */
    public void initRedisKeys() {
        if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) {
            return;
        }
        List<String> list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE);
        if (list != null && !list.isEmpty()) {
            Long zkVal;
            JedisConfig.JedisConn jedisConn = null;
            for (int i = 0; i < list.size(); i++) {
                zkVal = _ZK_CLIENT.readData(_DEFAULT_ZK_NAMESPACE + "/" + list.get(i));
                if (zkVal != null) {
                    String requestId = UUID.randomUUID().toString();
                    boolean tryLock = false;
                    try {
                        jedisConn = JedisConfig.getInstance().getConn();
                        //獲得鎖才更新,沒獲得鎖就放棄更新
                        if (tryLock = jedisConn.tryLock(list.get(i), requestId, 2000)) {
                            jedisConn.set(list.get(i), String.valueOf(zkVal + _REDIS_MAXVALUE_INIT));
                        }
                    } finally {
                        if (jedisConn != null) {
                            if (tryLock) {
                                jedisConn.unLock(list.get(i), requestId);
                            }
                            jedisConn.close();
                        }
                    }
                }
            }
        }
    }


}
package com.xxx.arch.seq.client.tool;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.Collections;
import java.util.List;


@Slf4j
public class ZkClient {

    private CuratorFramework client;

    public ZkClient(String serverList, int connectionTimeoutMs, int sessionTimeout) {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder()
                .connectString(serverList)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeout)
                .retryPolicy(retryPolicy)
                .build();
        client.start();
    }


    public boolean exists(String path) {
        try {
            return client.checkExists().forPath(path) != null;
        } catch (Exception e) {
            return false;
        }
    }

    public void writeData(String path, Long value) {
        try {
            client.setData().forPath(path, value.toString().getBytes());
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    public void createPersistent(String zkNode, Long value) {
        try {
            client.create().forPath(zkNode, value.toString().getBytes());
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    public List<String> getChildren(String path) {
        try {
            return client.getChildren().forPath(path);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        return Collections.emptyList();
    }

    public Long readData(String path) {
        try {
            byte[] data = client.getData().forPath(path);
            return Long.parseLong(new String(data));
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        return null;
    }

    public void delete(String path) {
        try {
            client.delete().forPath(path);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
}
package com.xxx.arch.seq.client.zk;

import com.xxx.arch.seq.client.tool.ZkClient;
import com.xxx.arch.seq.constant.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ZkClientUtil {

    private static final Logger logger = LoggerFactory.getLogger(ZkClientUtil.class);

    private static volatile ZkClient zkClient = null;

    public static ZkClient getZkClient() {
        if (zkClient == null) {
            synchronized (ZkClientUtil.class) {
                if (zkClient == null) {
                    initZkClient();
                }
            }
        }
        return zkClient;
    }

    private static void initZkClient() {
        try {
            String serverList = Constants.ARCH_SEQ_ZOOKEEPER_CONNECT_STRING;
            if (logger.isInfoEnabled()) {
                logger.info("zk cluster[" + serverList + "]");
            }
            if (serverList == null || serverList.trim().isEmpty()) {
                throw new RuntimeException("no \"arch.seq.zk-cluster.serverList\" config.used");
            } else {
                zkClient = new ZkClient(serverList, 15000, 60000);
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

}
package com.xxx.arch.seq.client.tool;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;

/**
 * Created by zhangyang on 2016/5/31.
 */
public class StreamCloseAble {
    private static Logger logger = LoggerFactory.getLogger(StreamCloseAble.class);

    /**
     * 關閉輸入輸出流
     *
     * @param closeAbles
     */
    public static void close(Closeable... closeAbles) {
        if (closeAbles == null || closeAbles.length <= 0) {
            return;
        }
        for (Closeable closeAble : closeAbles) {
            if (closeAble != null) {
                try {
                    closeAble.close();
                } catch (IOException e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    }
}

到此,相信大家對“如何實現基于Jedis+ZK的分布式序列號生成器”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

洪雅县| 泸水县| 甘德县| 南靖县| 天柱县| 长阳| 元朗区| 孟津县| 霞浦县| 连州市| 登封市| 曲沃县| 威宁| 陆良县| 谷城县| 七台河市| 高阳县| 曲水县| 桃源县| 报价| 新化县| 武安市| 芜湖市| 郯城县| 府谷县| 新安县| 永兴县| 称多县| 策勒县| 夏河县| 图木舒克市| 静海县| 同仁县| 桂平市| 乐陵市| 沾化县| 乐业县| 榆树市| 马龙县| 文登市| 乌海市|