您好,登錄后才能下訂單哦!
如何進行KubernetesClientException資源版本太舊的分析,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
公司目前在基于k8s做調度(基于io.fabric8:kubernetes-client:4.2.0),在運行的過程中,遇到了如下問題:
DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - WebSocket close received. code: 1000, reason: DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Submitting reconnect task to the executor [scheduleReconnect|Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Scheduling reconnect task [reconnectAttempt|Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Connecting websocket ... io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@700f518a 199 - 2020-11-17T06:39:13.874Z -[merlion-k8s-backend]-[merlion-k8s-backend-6b4cc44855-s6wnq]: 06:39:13.873 [OkHttp https://10.25.61.82:6443/...] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - WebSocket successfully opened WARN PodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.) io.fabric8.kubernetes.client.KubernetesClientException: too old resource version: 135562761 (135563127) at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:254)[kubernetes-client-4.2.2.jar:?] at okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [okhttp-3.12.0.jar:?] at okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [okhttp-3.12.0.jar:?] at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [okhttp-3.12.0.jar:?] at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [okhttp-3.12.0.jar:?] at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [okhttp-3.12.0.jar:?] at okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [okhttp-3.12.0.jar:?] at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [okhttp-3.12.0.jar:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_191] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_191] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]
單憑這個問題其實沒什么,但是代碼中是:
watchConnection = kubernetesClient.pods() .withLabel(MERLION_TASK_LABEL, applicationId) // .withResourceVersion(resourceVersion) .watch(new TaskPodsWatcher())
因為我們已經注釋掉了withResourceVersion(resourceVersion)
,(如果沒有注釋掉,說明我們的代碼中設置的resourceVersion太小)但是還會報too old resource version
直接跳轉到WatchConnectionManager onClosed 如下:
@Override public void onClosed(WebSocket webSocket, int code, String reason) { logger.debug("WebSocket close received. code: {}, reason: {}", code, reason); if (forceClosed.get()) { logger.debug("Ignoring onClose for already closed/closing websocket"); return; } if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) { closeEvent(new KubernetesClientException("Connection unexpectedly closed")); return; } scheduleReconnect(); }
對于onclosed的解釋是
/** * Invoked when both peers have indicated that no more messages will be transmitted and the * connection has been successfully released. No further calls to this listener will be made. */ public void onClosed(WebSocket webSocket, int code, String reason) { }
說明由于長時間沒有event的傳輸,導致該connect被釋放了,從而導致WebSocket 被關閉了(這種在任務不是很多的情況下發生的概率很大),從而進行了重聯操作scheduleReconnect,而該方法調用了runWatch():
executor.schedule(new NamedRunnable("reconnectAttempt") { @Override public void execute() { try { runWatch(); reconnectPending.set(false); } catch (Exception e) { // An unexpected error occurred and we didn't even get an onFailure callback. logger.error("Exception in reconnect", e); webSocketRef.set(null); closeEvent(new KubernetesClientException("Unhandled exception in reconnect attempt", e)); close(); } } }, nextReconnectInterval(), TimeUnit.MILLISECONDS); }
而在runWatch()方法中,我們又調用了
if (this.resourceVersion.get() != null) { httpUrlBuilder.addQueryParameter("resourceVersion", this.resourceVersion.get()); }
而this.resourceVersion 值的設置在 public void onMessage(WebSocket webSocket, String message) 方法中:
WatchEvent event = readWatchEvent(message); Object object = event.getObject(); if (object instanceof HasMetadata) { @SuppressWarnings("unchecked") T obj = (T) object; // Dirty cast - should always be valid though resourceVersion.set(((HasMetadata) obj).getMetadata().getResourceVersion()); Watcher.Action action = Watcher.Action.valueOf(event.getType()); watcher.eventReceived(action, obj); } else if (object instanceof KubernetesResourceList) { @SuppressWarnings("unchecked") KubernetesResourceList list = (KubernetesResourceList) object; // Dirty cast - should always be valid though resourceVersion.set(list.getMetadata().getResourceVersion()); Watcher.Action action = Watcher.Action.valueOf(event.getType()); List<HasMetadata> items = list.getItems(); if (items != null) { for (HasMetadata item : items) { watcher.eventReceived(action, (T) item); } }
也就是說,假如說如果重聯的時候距離上次設置resourceVersion超過了etc保留的最小resourceVersion的話,就會報too old resource version錯誤:
通過網上查詢kubernetes-too-old-resource-version,該Kubernetes Client team memeber 提到了:
Fabric8 does not handle it with plain watch. But it is handling it in SharedInformer API, see ReflectorWatcher. I would recommend using informer API when writing operators since it's better than plain list and watch
也就是說,我們可以用SharedInformer api來實現,而watch機制處理不了這種情況,所以我們可以用SharedInformer實現,截止到2020年11月16日,我們獲取到kubernetes-client最新版本,kubernetes-client:4.13.0,編碼實現:
val sharedInformerFactory = kubernetesClient.informers() val podInformer = sharedInformerFactory .sharedIndexInformerFor(classOf[Pod], classOf[PodList], new OperationContext().withNamespace("test"), 30 * 1000L) podInformer.addEventHandler(new ResourceEventHandler[Pod] { override def onAdd(obj: Pod): Unit = { eventReceived(obj, "ADD") } override def onDelete(obj: Pod, deletedFinalStateUnknown: Boolean): Unit = { eventReceived(obj, "DELETE") } override def onUpdate(oldObj: Pod, newObj: Pod): Unit = { eventReceived(newObj, "UPDATE") } private def idShouldUpdate(pod: Pod): Boolean = { pod.getMetadata.getLabels.getOrDefault(MERLION_TASK_LABEL, "") == applicationId } private def eventReceived(pod: Pod, action: String): Unit = { if (idShouldUpdate(pod)) { val podName = pod.getMetadata.getName logger.info(s"Received job pod update for pod named $podName, action ${action}") snapshotsStore.updatePod(pod) } } }) sharedInformerFactory.startAllRegisteredInformers() }
其中SharedInformerFactory的機制和k8s Informer機制一樣的,能夠保證消息的可靠性, 其中最主要的是ReflectorWatcher和Reflector和DefaultSharedIndexInformer,我們簡單的分析一下:
public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, long resyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners) { this.resyncCheckPeriodMillis = resyncPeriod; this.defaultEventHandlerResyncPeriod = resyncPeriod; this.processor = new SharedProcessor<>(); this.indexer = new Cache(); DeltaFIFO<T> fifo = new DeltaFIFO<>(Cache::metaNamespaceKeyFunc, this.indexer); this.controller = new Controller<>(apiTypeClass, fifo, listerWatcher, this::handleDeltas, processor::shouldResync, resyncCheckPeriodMillis, context, eventListeners); controllerThread = new Thread(controller::run, "informer-controller-" + apiTypeClass.getSimpleName()); }
DefaultSharedIndexInformer 中,用DeltaFIFO作為event的存儲,而this::handleDeltas的調用是在Controller作為this.queue.pop 的參數processFunc函數被調用,也就是說這個函數來消費fifo里面的event,如下:
private void processLoop() throws Exception { while (true) { try { this.queue.pop(this.processFunc); } catch (InterruptedException t) { log.error("DefaultController#processLoop got interrupted {}", t.getMessage(), t); return; } catch (Exception e) { log.error("DefaultController#processLoop recovered from crashing {} ", e.getMessage(), e); throw e; } }
而queue也是DeltaFIFO的形參傳進來的,也就是說queue就是fifo,而fifo里面的數據從哪里來呢?在controller::run函數中:
if (fullResyncPeriod > 0) { reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, fullResyncPeriod); } else { reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, DEFAULT_PERIOD); } reflector.listAndWatch()
將會調用reflector.listAndWatch()方法,該方法進行類似k8s的list-watch機制,如下:
public void listAndWatch() throws Exception { try { log.info("Started ReflectorRunnable watch for {}", apiTypeClass); reListAndSync(); resyncExecutor.scheduleWithFixedDelay(this::reListAndSync, 0L, resyncPeriodMillis, TimeUnit.MILLISECONDS); startWatcher(); } catch (Exception exception) { store.isPopulated(false); throw new RejectedExecutionException("Error while starting ReflectorRunnable watch", exception); } }
reListAndSync進行全量event數據的拉取,startWatcher進行watch,獲取增量event數據,那這個watch是什么呢?如下:
watch.set( listerWatcher.watch(new ListOptionsBuilder() .withWatch(Boolean.TRUE).withResourceVersion(lastSyncResourceVersion.get()).withTimeoutSeconds(null).build(), operationContext.getNamespace(), operationContext, watcher) )
這里的watcher在reflector的構造函數中初始化
watcher = new ReflectorWatcher<>(store, lastSyncResourceVersion, this::startWatcher, this::reListAndSync);
而ReflectorWatcher是繼承自Watcher,所以也有對應的eventReceived方法和onClose方法,如下:
@Override public void eventReceived(Action action, T resource) { if (action == null) { final String errorMessage = String.format("Unrecognized event %s", resource.getMetadata().getName()); log.error(errorMessage); throw new KubernetesClientException(errorMessage); } log.info("Event received {}", action.name()); switch (action) { case ERROR: final String errorMessage = String.format("ERROR event for %s", resource.getMetadata().getName()); log.error(errorMessage); throw new KubernetesClientException(errorMessage); case ADDED: store.add(resource); break; case MODIFIED: store.update(resource); break; case DELETED: store.delete(resource); break; } lastSyncResourceVersion.set(resource.getMetadata().getResourceVersion()); log.debug("{}#Receiving resourceVersion {}", resource.getKind(), lastSyncResourceVersion.get()); } @Override public void onClose(KubernetesClientException exception) { log.error("Watch closing"); Optional.ofNullable(exception) .map(e -> { log.debug("Exception received during watch", e); return exception; }) .map(KubernetesClientException::getStatus) .map(Status::getCode) .filter(c -> c.equals(HttpURLConnection.HTTP_GONE)) .ifPresent(c -> onHttpGone.run()); onClose.run(); }
在eventReceived方法中,所有消息的都會添加到store中也就是fifo的queue中,在onClose方法中,我們看到如果HTTP_GONE,也就是too old resource version的話,會進行onHttpGone.run(),也會進行onClose.run(),而 onHttpGone就是Reflector的reListAndSync函數,onClose是Reflector的startWatcher函數,也就是說一旦該watcher被關閉,就會重新進行watch。
在kubernetes-client:4.6.2中,WatchConnectionManager onMessage 對于HTTP_GONE的處理是不一樣的,如下:
if (status.getCode() == HTTP_GONE) { logger.info("The resource version {} no longer exists. Scheduling a reconnect.", resourceVersion.get()); resourceVersion.set(null); scheduleReconnect(); } else { logger.error("Error received: {}", status.toString()); }
一旦發生了HTTP_GONE,,會把resourceVersion設置為null,也就是獲取最新的event,而且會立即重聯,而在4.13.0版本和4.2.0版本,是不會立即重聯,而是讓用戶去處理的。
看完上述內容,你們掌握如何進行KubernetesClientException資源版本太舊的分析的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。