您好,登錄后才能下訂單哦!
本篇內容主要講解“nacos ServiceManager中UpdatedServiceProcessor的原理和作用是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“nacos ServiceManager中UpdatedServiceProcessor的原理和作用是什么”吧!
本文主要研究一下nacos ServiceManager的UpdatedServiceProcessor
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
@Component @DependsOn("nacosApplicationContext") public class ServiceManager implements RecordListener<Service> { /** * Map<namespace, Map<group::serviceName, Service>> */ private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024); private Synchronizer synchronizer = new ServiceStatusSynchronizer(); private final Lock lock = new ReentrantLock(); @Resource(name = "consistencyDelegate") private ConsistencyService consistencyService; @Autowired private SwitchDomain switchDomain; @Autowired private DistroMapper distroMapper; @Autowired private ServerListManager serverListManager; @Autowired private PushService pushService; private final Object putServiceLock = new Object(); @PostConstruct public void init() { UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS); UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit(new UpdatedServiceProcessor()); try { Loggers.SRV_LOG.info("listen for service meta change"); consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this); } catch (NacosException e) { Loggers.SRV_LOG.error("listen for service meta change failed!"); } } //...... }
ServiceManager的init方法往UtilsAndCommons.SERVICE_UPDATE_EXECUTOR提交了UpdatedServiceProcessor任務
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
private class UpdatedServiceProcessor implements Runnable { //get changed service from other server asynchronously @Override public void run() { ServiceKey serviceKey = null; try { while (true) { try { serviceKey = toBeUpdatedServicesQueue.take(); } catch (Exception e) { Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque."); } if (serviceKey == null) { continue; } GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey)); } } catch (Exception e) { Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e); } } }
UpdatedServiceProcessor實現了Runnable方法,其run方法會不斷循環從toBeUpdatedServicesQueue獲取元素,然后使用GlobalExecutor.submitServiceUpdate提交ServiceUpdater
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
private class ServiceUpdater implements Runnable { String namespaceId; String serviceName; String serverIP; public ServiceUpdater(ServiceKey serviceKey) { this.namespaceId = serviceKey.getNamespaceId(); this.serviceName = serviceKey.getServiceName(); this.serverIP = serviceKey.getServerIP(); } @Override public void run() { try { updatedHealthStatus(namespaceId, serviceName, serverIP); } catch (Exception e) { Loggers.SRV_LOG.warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName, serverIP, e); } } }
ServiceUpdater實現了Runnable接口,其run方法執行的是updatedHealthStatus
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
@Component @DependsOn("nacosApplicationContext") public class ServiceManager implements RecordListener<Service> { /** * Map<namespace, Map<group::serviceName, Service>> */ private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024); private Synchronizer synchronizer = new ServiceStatusSynchronizer(); private final Lock lock = new ReentrantLock(); @Resource(name = "consistencyDelegate") private ConsistencyService consistencyService; @Autowired private SwitchDomain switchDomain; @Autowired private DistroMapper distroMapper; @Autowired private ServerListManager serverListManager; @Autowired private PushService pushService; private final Object putServiceLock = new Object(); //...... public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) { Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); JSONObject serviceJson = JSON.parseObject(msg.getData()); JSONArray ipList = serviceJson.getJSONArray("ips"); Map<String, String> ipsMap = new HashMap<>(ipList.size()); for (int i = 0; i < ipList.size(); i++) { String ip = ipList.getString(i); String[] strings = ip.split("_"); ipsMap.put(strings[0], strings[1]); } Service service = getService(namespaceId, serviceName); if (service == null) { return; } boolean changed = false; List<Instance> instances = service.allIPs(); for (Instance instance : instances) { boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIPAddr())); if (valid != instance.isHealthy()) { changed = true; instance.setHealthy(valid); Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}@{}{}", serviceName, (instance.isHealthy() ? "ENABLED" : "DISABLED"), instance.getIp(), instance.getPort(), instance.getClusterName()); } } if (changed) { pushService.serviceChanged(service); } StringBuilder stringBuilder = new StringBuilder(); List<Instance> allIps = service.allIPs(); for (Instance instance : allIps) { stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(","); } if (changed && Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId(), service.getName(), stringBuilder.toString()); } } //...... }
updatedHealthStatus方法會從synchronizer獲取msg,組裝ipsMap,之后通過service.allIPs()獲取instances信息,然后遍歷instances從ipsMap獲取實例的valid狀態,如果與instance的isHealthy()對不上則標記為changed,更新instance的healthy;對于changed的則通過pushService.serviceChanged(service)發布事件,最后打印日志
ServiceManager的init方法往UtilsAndCommons.SERVICE_UPDATE_EXECUTOR提交了UpdatedServiceProcessor任務
UpdatedServiceProcessor實現了Runnable方法,其run方法會不斷循環從toBeUpdatedServicesQueue獲取元素,然后使用GlobalExecutor.submitServiceUpdate提交ServiceUpdater
ServiceUpdater實現了Runnable接口,其run方法執行的是updatedHealthStatus
到此,相信大家對“nacos ServiceManager中UpdatedServiceProcessor的原理和作用是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。