您好,登錄后才能下訂單哦!
nacos中notifyConfigInfo有什么用,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/controller/CommunicationController.java
@Controller @RequestMapping(Constants.COMMUNICATION_CONTROLLER_PATH) public class CommunicationController { private final DumpService dumpService; private final LongPollingService longPollingService; private String trueStr = "true"; @Autowired public CommunicationController(DumpService dumpService, LongPollingService longPollingService) { this.dumpService = dumpService; this.longPollingService = longPollingService; } /** * 通知配置信息改變 */ @RequestMapping(value = "/dataChange", method = RequestMethod.GET) @ResponseBody public Boolean notifyConfigInfo(HttpServletRequest request, HttpServletResponse response, @RequestParam("dataId") String dataId, @RequestParam("group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam(value = "tag", required = false) String tag) { dataId = dataId.trim(); group = group.trim(); String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED); long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified); String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP); String isBetaStr = request.getHeader("isBeta"); if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) { dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true); } else { dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp); } return true; } //...... }
notifyConfigInfo方法主要是執行dumpService.dump方法,只是是否beta調用的dump方法不同
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java
@Service public class DumpService { @Autowired private Environment env; @Autowired PersistService persistService; @PostConstruct public void init() { LogUtil.defaultLog.warn("DumpService start"); DumpProcessor processor = new DumpProcessor(this); DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this); DumpAllBetaProcessor dumpAllBetaProcessor = new DumpAllBetaProcessor(this); DumpAllTagProcessor dumpAllTagProcessor = new DumpAllTagProcessor(this); dumpTaskMgr = new TaskManager( "com.alibaba.nacos.server.DumpTaskManager"); dumpTaskMgr.setDefaultTaskProcessor(processor); dumpAllTaskMgr = new TaskManager( "com.alibaba.nacos.server.DumpAllTaskManager"); dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor); //...... } /** * 全量dump間隔 */ static final int DUMP_ALL_INTERVAL_IN_MINUTE = 6 * 60; /** * 全量dump間隔 */ static final int INITIAL_DELAY_IN_MINUTE = 6 * 60; private TaskManager dumpTaskMgr; private TaskManager dumpAllTaskMgr; private static final Logger log = LoggerFactory.getLogger(DumpService.class); static final AtomicInteger FINISHED = new AtomicInteger(); static final int INIT_THREAD_COUNT = 10; int total = 0; private final static String TRUE_STR = "true"; private final static String BETA_TABLE_NAME = "config_info_beta"; private final static String TAG_TABLE_NAME = "config_info_tag"; Boolean isQuickStart = false; private int retentionDays = 30; //...... public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) { String groupKey = GroupKey2.getKey(dataId, group, tenant); dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta)); } public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp) { dump(dataId, group, tenant, tag, lastModified, handleIp, false); } public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp, boolean isBeta) { String groupKey = GroupKey2.getKey(dataId, group, tenant); dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta)); } //...... }
dump方法最后是往dumpTaskMgr添加DumpTask;dumpTaskMgr的defaultTaskProcessor為dumpProcessor
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/manager/TaskManager.java
public final class TaskManager implements TaskManagerMBean { private static final Logger log = LogUtil.defaultLog; private final ConcurrentHashMap<String, AbstractTask> tasks = new ConcurrentHashMap<String, AbstractTask>(); private final ConcurrentHashMap<String, TaskProcessor> taskProcessors = new ConcurrentHashMap<String, TaskProcessor>(); private TaskProcessor defaultTaskProcessor; Thread processingThread; private final AtomicBoolean closed = new AtomicBoolean(true); private String name; class ProcessRunnable implements Runnable { @Override public void run() { while (!TaskManager.this.closed.get()) { try { Thread.sleep(100); TaskManager.this.process(); } catch (Throwable e) { } } } } ReentrantLock lock = new ReentrantLock(); Condition notEmpty = this.lock.newCondition(); public TaskManager() { this(null); } public AbstractTask getTask(String type) { return this.tasks.get(type); } public TaskProcessor getTaskProcessor(String type) { return this.taskProcessors.get(type); } @SuppressWarnings("PMD.AvoidManuallyCreateThreadRule") public TaskManager(String name) { this.name = name; if (null != name && name.length() > 0) { this.processingThread = new Thread(new ProcessRunnable(), name); } else { this.processingThread = new Thread(new ProcessRunnable()); } this.processingThread.setDaemon(true); this.closed.set(false); this.processingThread.start(); } //...... /** * 將任務加入到任務Map中 * * @param type * @param task */ public void addTask(String type, AbstractTask task) { this.lock.lock(); try { AbstractTask oldTask = tasks.put(type, task); MetricsMonitor.getDumpTaskMonitor().set(tasks.size()); if (null != oldTask) { task.merge(oldTask); } } finally { this.lock.unlock(); } } protected void process() { for (Map.Entry<String, AbstractTask> entry : this.tasks.entrySet()) { AbstractTask task = null; this.lock.lock(); try { // 獲取任務 task = entry.getValue(); if (null != task) { if (!task.shouldProcess()) { // 任務當前不需要被執行,直接跳過 continue; } // 先將任務從任務Map中刪除 this.tasks.remove(entry.getKey()); MetricsMonitor.getDumpTaskMonitor().set(tasks.size()); } } finally { this.lock.unlock(); } if (null != task) { // 獲取任務處理器 TaskProcessor processor = this.taskProcessors.get(entry.getKey()); if (null == processor) { // 如果沒有根據任務類型設置的處理器,使用默認處理器 processor = this.getDefaultTaskProcessor(); } if (null != processor) { boolean result = false; try { // 處理任務 result = processor.process(entry.getKey(), task); } catch (Throwable t) { log.error("task_fail", "處理task失敗", t); } if (!result) { // 任務處理失敗,設置最后處理時間 task.setLastProcessTime(System.currentTimeMillis()); // 將任務重新加入到任務Map中 this.addTask(entry.getKey(), task); } } } } if (tasks.isEmpty()) { this.lock.lock(); try { this.notEmpty.signalAll(); } finally { this.lock.unlock(); } } } //...... }
TaskManager的addTask方法往tasks添加AbstractTask;其構造器啟動了ProcessRunnable,其run方法主要是執行TaskManager.this.process()方法;該方法會遍歷tasks,取出任務,然后通過TaskProcessor的process方法來執行任務
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpTask.java
class DumpProcessor implements TaskProcessor { DumpProcessor(DumpService dumpService) { this.dumpService = dumpService; } @Override public boolean process(String taskType, AbstractTask task) { DumpTask dumpTask = (DumpTask)task; String[] pair = GroupKey2.parseKey(dumpTask.groupKey); String dataId = pair[0]; String group = pair[1]; String tenant = pair[2]; long lastModified = dumpTask.lastModified; String handleIp = dumpTask.handleIp; boolean isBeta = dumpTask.isBeta; String tag = dumpTask.tag; if (isBeta) { // beta發布,則dump數據,更新beta緩存 ConfigInfo4Beta cf = dumpService.persistService.findConfigInfo4Beta(dataId, group, tenant); boolean result; if (null != cf) { result = ConfigService.dumpBeta(dataId, group, tenant, cf.getContent(), lastModified, cf.getBetaIps()); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified, cf.getContent().length()); } } else { result = ConfigService.removeBeta(dataId, group, tenant); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0); } } return result; } else { if (StringUtils.isBlank(tag)) { ConfigInfo cf = dumpService.persistService.findConfigInfo(dataId, group, tenant); if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) { if (null != cf) { AggrWhitelist.load(cf.getContent()); } else { AggrWhitelist.load(null); } } if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) { if (null != cf) { ClientIpWhiteList.load(cf.getContent()); } else { ClientIpWhiteList.load(null); } } if (dataId.equals(SwitchService.SWITCH_META_DATAID)) { if (null != cf) { SwitchService.load(cf.getContent()); } else { SwitchService.load(null); } } boolean result; if (null != cf) { result = ConfigService.dump(dataId, group, tenant, cf.getContent(), lastModified); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified, cf.getContent().length()); } } else { result = ConfigService.remove(dataId, group, tenant); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0); } } return result; } else { ConfigInfo4Tag cf = dumpService.persistService.findConfigInfo4Tag(dataId, group, tenant, tag); // boolean result; if (null != cf) { result = ConfigService.dumpTag(dataId, group, tenant, tag, cf.getContent(), lastModified); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified, cf.getContent().length()); } } else { result = ConfigService.removeTag(dataId, group, tenant, tag); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0); } } return result; } } } final DumpService dumpService; }
DumpProcessor實現了TaskProcessor接口,其process方法主要是根據不同條件執行ConfigService.dump或者remove方法
看完上述內容,你們掌握nacos中notifyConfigInfo有什么用的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。