您好,登錄后才能下訂單哦!
preface
流式數據的監控,以下主要是從算法的呈現出發,提供一種python的實現思路
其中:
1.python是2.X版本
2.提供兩種實現思路,一是基于matplotlib的animation,一是基于matplotlib的ion
話不多說,先了解大概的效果,如下:
一、一點構思
在做此流數據輸出可視化前,一直在搗鼓nupic框架,其內部HTM算法主要是一種智能的異常檢測算法,是目前AI框架中垂直領域下的一股清流,但由于其實現的例子對應的流數據展示并非我想要的,故此借鑒后自己重新寫了一個,主要是達到三個目的,一是展示真實數據的波動,二是展示各波動的異常得分,三是羅列異常的點。
上述的輸出結構并非重點,重點是其實時更新的機制,了解后即可自行定義。另,js對于這種流數據展示應該不難,所以本文主要立足的是算法的呈現角度以及python的實現。
二、matplotlib animation實現思路
http://matplotlib.org/api/animation_api.html 鏈接是matplotlib animation的官方api文檔
(一)、骨架與實時更新
animation翻譯過來就是動畫,其動畫展示核心主要有三個:1是動畫的骨架先搭好,就是圖像的邊邊框框這些,2是更新的過程,即傳入實時數據時圖形的變化方法,3是FuncAnimation方法結尾。
下面以一個小例子做進一步說明:
1.對于動畫的骨架:
# initial the figure. x = [] y = [] fig = plt.figure(figsize=(18, 8), facecolor="white") ax1 = fig.add_subplot(111) p1, = ax1.plot(x, y, line, color="red")
以上分別對應初始化空數據,初始化圖形大小和背景顏色,插入子圖(三個數字分別表示幾行幾列第幾個位置),初始化圖形(數據為空)。
import numpy as np x = np.arange(0, 1000, 1) y = np.random.normal(100, 10, 1000)
隨機生成一些作圖數據,下面定義update過程。
2.對于更新過程:
def update(i): x.append(xs[i]) y.append(ys[i]) ax1.set_xlim(min(x),max(x)+1) ax1.set_ylim(min(y),max(y)+1) p1.set_data(x,y) ax1.figure.canvas.draw() return p1
上述定義更新函數,參數i為每輪迭代從FuncAnimation方法frames參數傳進來的數值,frames參數的指定下文會進一步說,x/y通過相應更新之后,對圖形的x/y軸大小做相應的重設,再把數據通過set_data傳進圖形,注意ax1和p1的區別,最后再把上述的變化通過draw()方法繪制到界面上,返回p1給FuncAnimation方法。
3.對于FuncAnimation方法:
ani = FuncAnimation(fig=fig,func=update,frames=len(xs),interval=1) plt.show()
FuncAnimation方法主要是與update函數做交互,將frames參數對應的數據逐條傳進update函數,再由update函數返回的圖形覆蓋FuncAnimation原先的圖形,fig參數即為一開始對應的參數,interval為每次更新的時間間隔,還有其他一些參數如blit=True控制圖形精細,當界面較多子圖時,為True可以使得看起來不會太卡,關鍵是frames參數,下面是官方給出的注釋:
可為迭代數,可為函數,也可為空,上面我指定為數組的長度,其迭代則從0開始到最后該數值停止。
該例子最終呈現的效果如下:
了解大概的實現,細節就不在這里多說了。
(二)、animation的優缺點
animation的繪制的結果相比于下文的ion會更加的細膩,主要體現在FuncAnimation方法的一些參數的控制上。但是缺點也是明顯,就是必須先有指定的數據或者指定的數據大小,顯然這樣對于預先無法知道數據的情況沒法處理。所以換一種思路,在matplotlib ion打開的模式下,每次往模板插入數據都會進行相應的更新,具體看第二部分。
三、matplotlib ion實現思路
(一)、實時更新
matplotlib ion的實現也主要是三個核心,1是打開ion,2是實時更新機制,3是呈現在界面上。
1.對于打開ion:
ion全稱是 interactive on(交互打開),其意為打開一個圖形的交互接口,之后每次繪圖都在之前打開的面板上操作,舉個例子:
import matplotlib.pyplot as plt plt.ion() fig = plt.figure() ax1 = fig.add_subplot(111) line, = ax1.plot(t, v, line, color="r")
打開交互接口,初始化圖形。
2.對于實時更新機制:
import numpy as np ys = np.random.normal(100, 10, 1000) def p(a, b): t.append(a) v.append(b) ax1.set_xlim(min(t), max(t) + 1) ax1.set_ylim(min(v), max(v) + 1) line.set_data(t, v) plt.pause(0.001) ax1.figure.canvas.draw() for i in xrange(len(ys)): p(i, ys[i])
隨機生成一組數據,定義作圖函數p(包含pause表示暫定時延,最好有,防止界面卡死),傳入數據實時更新。
3.對于界面最終呈現
plt.ioff() plt.show()
ioff是關閉交互模式,就像open打開文件產生的句柄,最好也有個close關掉。
最終效果如下:
(二)、ion的優缺點
animation可以在細節上控制比ion更加細膩,這也是ion沒有的一點,但是單就無需預先指定數據這一點,ion也無疑是能把流數據做得更加好。
四、最后
貼一下兩種方法在最開始那種圖的做法,ion我定義成類,這樣每次調用只需穿入參數就可以。
animation版本
# _*_ coding:utf-8 _*_ import os import csv import datetime import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation from matplotlib.dates import DateFormatter import matplotlib.ticker as ticker # read the file filePath = os.path.join(os.getcwd(), "data/anomalyDetect_output.csv") file = open(filePath, "r") allData = csv.reader(file) # skip the first three columns allData.next() allData.next() allData.next() # cache the data data = [line for line in allData] # for i in data: print i # take out the target value timestamp = [line[0] for line in data] value = [line[1:] for line in data] # format the time style 2016-12-01 00:00:00 def timestampFormat(t): result = datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S") return result # take out the data timestamp = map(timestampFormat, timestamp) value_a = [float(x[0]) for x in value] predict_a = [float(x[1]) for x in value] anomalyScore_a = [float(x[2]) for x in value] # initial the size of the figure fig = plt.figure(figsize=(18, 8), facecolor="white") fig.subplots_adjust(left=0.06, right=0.70) ax1 = fig.add_subplot(2, 1, 1) ax2 = fig.add_subplot(2, 1, 2) ax3 = fig.add_axes([0.8, 0.1, 0.2, 0.8], frameon=False) # initial plot p1, = ax1.plot_date([], [], fmt="-", color="red", label="actual") ax1.legend(loc="upper right", frameon=False) ax1.grid(True) p2, = ax2.plot_date([], [], fmt="-", color="red", label="anomaly score") ax2.legend(loc="upper right", frameon=False) ax2.axhline(0.8, color='black', lw=2) # add the x/y label ax2.set_xlabel("date time") ax2.set_ylabel("anomaly score") ax1.set_ylabel("value") # add the table in ax3 col_labels = ["date time", 'actual value', 'predict value', 'anomaly score'] ax3.text(0.05, 0.99, "anomaly value table", size=12) ax3.set_xticks([]) ax3.set_yticks([]) # axis format dateFormat = DateFormatter("%m/%d %H:%M") ax1.xaxis.set_major_formatter(ticker.FuncFormatter(dateFormat)) ax2.xaxis.set_major_formatter(ticker.FuncFormatter(dateFormat)) # define the initial function def init(): p1.set_data([], []) p2.set_data([], []) return p1, p2 # initial data for the update function x1 = [] x2 = [] x1_2 = [] y1_2 = [] x1_3 = [] y1_3 = [] y1 = [] y2 = [] highlightList = [] turnOn = True tableValue = [[0, 0, 0, 0]] # update function def stream(i): # update the main graph(contains actual value and predicted value) # add the data global turnOn, highlightList, ax3 x1.append(timestamp[i]) y1.append(value_a[i]) # update the axis minAxis = max(x1) - datetime.timedelta(days=1) ax1.set_xlim(minAxis, max(x1)) ax1.set_ylim(min(y1), max(y1)) ax1.figure.canvas.draw() p1.set_data(x1, y1) # update the anomaly graph(contains anomaly score) x2.append(timestamp[i]) y2.append(anomalyScore_a[i]) ax2.set_xlim(minAxis, max(x2)) ax2.set_ylim(min(y2), max(y2)) # update the scatter if anomalyScore_a[i] >= 0.8: x1_3.append(timestamp[i]) y1_3.append(value_a[i]) ax1.scatter(x1_3, y1_3, s=50, color="black") # update the high light if anomalyScore_a[i] >= 0.8: highlightList.append(i) turnOn = True else: turnOn = False if len(highlightList) != 0 and turnOn is False: ax2.axvspan(timestamp[min(highlightList)] - datetime.timedelta(minutes=10), timestamp[max(highlightList)] + datetime.timedelta(minutes=10), color='r', edgecolor=None, alpha=0.2) highlightList = [] turnOn = True p2.set_data(x2, y2) # add the table in ax3 # update the anomaly tabel if anomalyScore_a[i] >= 0.8: ax3.remove() ax3 = fig.add_axes([0.8, 0.1, 0.2, 0.8], frameon=False) ax3.text(0.05, 0.99, "anomaly value table", size=12) ax3.set_xticks([]) ax3.set_yticks([]) tableValue.append([timestamp[i].strftime("%Y-%m-%d %H:%M:%S"), value_a[i], predict_a[i], anomalyScore_a[i]]) if len(tableValue) >= 40: tableValue.pop(0) ax3.table(cellText=tableValue, colWidths=[0.35] * 4, colLabels=col_labels, loc=1, cellLoc="center") return p1, p2 # main animated function anim = FuncAnimation(fig, stream, init_func=init, frames=len(timestamp), interval=0) plt.show() file.close()
ion版本
#! /usr/bin/python import os import csv import datetime import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation from matplotlib.dates import DateFormatter import matplotlib.ticker as ticker class streamDetectionPlot(object): """ Anomaly plot output. """ # initial the figure parameters. def __init__(self): # Turn matplotlib interactive mode on. plt.ion() # initial the plot variable. self.timestamp = [] self.actualValue = [] self.predictValue = [] self.anomalyScore = [] self.tableValue = [[0, 0, 0, 0]] self.highlightList = [] self.highlightListTurnOn = True self.anomalyScoreRange = [0, 1] self.actualValueRange = [0, 1] self.predictValueRange = [0, 1] self.timestampRange = [0, 1] self.anomalyScatterX = [] self.anomalyScatterY = [] # initial the figure. global fig fig = plt.figure(figsize=(18, 8), facecolor="white") fig.subplots_adjust(left=0.06, right=0.70) self.actualPredictValueGraph = fig.add_subplot(2, 1, 1) self.anomalyScoreGraph = fig.add_subplot(2, 1, 2) self.anomalyValueTable = fig.add_axes([0.8, 0.1, 0.2, 0.8], frameon=False) # define the initial plot method. def initPlot(self): # initial two lines of the actualPredcitValueGraph. self.actualLine, = self.actualPredictValueGraph.plot_date(self.timestamp, self.actualValue, fmt="-", color="red", label="actual value") self.predictLine, = self.actualPredictValueGraph.plot_date(self.timestamp, self.predictValue, fmt="-", color="blue", label="predict value") self.actualPredictValueGraph.legend(loc="upper right", frameon=False) self.actualPredictValueGraph.grid(True) # initial two lines of the anomalyScoreGraph. self.anomalyScoreLine, = self.anomalyScoreGraph.plot_date(self.timestamp, self.anomalyScore, fmt="-", color="red", label="anomaly score") self.anomalyScoreGraph.legend(loc="upper right", frameon=False) self.baseline = self.anomalyScoreGraph.axhline(0.8, color='black', lw=2) # set the x/y label of the first two graph. self.anomalyScoreGraph.set_xlabel("datetime") self.anomalyScoreGraph.set_ylabel("anomaly score") self.actualPredictValueGraph.set_ylabel("value") # configure the anomaly value table. self.anomalyValueTableColumnsName = ["timestamp", "actual value", "expect value", "anomaly score"] self.anomalyValueTable.text(0.05, 0.99, "Anomaly Value Table", size=12) self.anomalyValueTable.set_xticks([]) self.anomalyValueTable.set_yticks([]) # axis format. self.dateFormat = DateFormatter("%m/%d %H:%M") self.actualPredictValueGraph.xaxis.set_major_formatter(ticker.FuncFormatter(self.dateFormat)) self.anomalyScoreGraph.xaxis.set_major_formatter(ticker.FuncFormatter(self.dateFormat)) # define the output method. def anomalyDetectionPlot(self, timestamp, actualValue, predictValue, anomalyScore): # update the plot value of the graph. self.timestamp.append(timestamp) self.actualValue.append(actualValue) self.predictValue.append(predictValue) self.anomalyScore.append(anomalyScore) # update the x/y range. self.timestampRange = [min(self.timestamp), max(self.timestamp)+datetime.timedelta(minutes=10)] self.actualValueRange = [min(self.actualValue), max(self.actualValue)+1] self.predictValueRange = [min(self.predictValue), max(self.predictValue)+1] # update the x/y axis limits self.actualPredictValueGraph.set_ylim( min(self.actualValueRange[0], self.predictValueRange[0]), max(self.actualValueRange[1], self.predictValueRange[1]) ) self.actualPredictValueGraph.set_xlim( self.timestampRange[1] - datetime.timedelta(days=1), self.timestampRange[1] ) self.anomalyScoreGraph.set_xlim( self.timestampRange[1]- datetime.timedelta(days=1), self.timestampRange[1] ) self.anomalyScoreGraph.set_ylim( self.anomalyScoreRange[0], self.anomalyScoreRange[1] ) # update the two lines of the actualPredictValueGraph. self.actualLine.set_xdata(self.timestamp) self.actualLine.set_ydata(self.actualValue) self.predictLine.set_xdata(self.timestamp) self.predictLine.set_ydata(self.predictValue) # update the line of the anomalyScoreGraph. self.anomalyScoreLine.set_xdata(self.timestamp) self.anomalyScoreLine.set_ydata(self.anomalyScore) # update the scatter. if anomalyScore >= 0.8: self.anomalyScatterX.append(timestamp) self.anomalyScatterY.append(actualValue) self.actualPredictValueGraph.scatter( self.anomalyScatterX, self.anomalyScatterY, s=50, color="black" ) # update the highlight of the anomalyScoreGraph. if anomalyScore >= 0.8: self.highlightList.append(timestamp) self.highlightListTurnOn = True else: self.highlightListTurnOn = False if len(self.highlightList) != 0 and self.highlightListTurnOn is False: self.anomalyScoreGraph.axvspan( self.highlightList[0] - datetime.timedelta(minutes=10), self.highlightList[-1] + datetime.timedelta(minutes=10), color="r", edgecolor=None, alpha=0.2 ) self.highlightList = [] self.highlightListTurnOn = True # update the anomaly value table. if anomalyScore >= 0.8: # remove the table and then replot it self.anomalyValueTable.remove() self.anomalyValueTable = fig.add_axes([0.8, 0.1, 0.2, 0.8], frameon=False) self.anomalyValueTableColumnsName = ["timestamp", "actual value", "expect value", "anomaly score"] self.anomalyValueTable.text(0.05, 0.99, "Anomaly Value Table", size=12) self.anomalyValueTable.set_xticks([]) self.anomalyValueTable.set_yticks([]) self.tableValue.append([ timestamp.strftime("%Y-%m-%d %H:%M:%S"), actualValue, predictValue, anomalyScore ]) if len(self.tableValue) >= 40: self.tableValue.pop(0) self.anomalyValueTable.table(cellText=self.tableValue, colWidths=[0.35] * 4, colLabels=self.anomalyValueTableColumnsName, loc=1, cellLoc="center" ) # plot pause 0.0001 second and then plot the next one. plt.pause(0.0001) plt.draw() def close(self): plt.ioff() plt.show()
下面是ion版本的調用:
graph = stream_detection_plot.streamDetectionPlot() graph.initPlot() for i in xrange(len(timestamp)): graph.anomalyDetectionPlot(timestamp[i],value_a[i],predict_a[i],anomalyScore_a[i]) graph.close()
具體為實例化類,初始化圖形,傳入數據作圖,關掉。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。