您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關 如何使用Function Compute對表格存儲中數據做簡單清洗,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
函數計算(Function Compute) 是一個事件驅動的服務,通過函數計算,用戶無需管理服務器等運行情況,只需編寫代碼并上傳。函數計算準備計算資源,并以彈性伸縮的方式運行用戶代碼,而用戶只需根據實際代碼運行所消耗的資源進行付費。
Table Store Stream是用于獲取Table Store表中增量數據的一個數據通道,通過創建Table Store觸發器,能夠實現Table Store Stream和函數計算的自動對接,讓計算函數中自定義的程序邏輯自動處理Table Store表中發生的數據修改。
表格存儲高并發的寫入性能以及低廉的存儲成本非常適合物聯網、日志、監控數據的存儲,我們可以將數據寫入到表格存儲中,同時在函數計算中對新增的數據做簡單的清洗、轉換、聚合計算等操作,并將清洗之后的數據寫回到表格存儲的結果表中,并對原始明細數據及結果數據提供實時訪問。
下面,我們使用函數計算對表格存儲中的數據做簡單的清洗,并寫入到結果表中。
我們假設寫入的為日志數據,包括三個基礎字段:
字段名稱 | 類型 | 含義 |
---|---|---|
id | 整型 | 日志id |
level | 整型 | 日志的等級,越大表明等級越高 |
message | 字符串 | 日志的內容 |
我們需要將 level>1 的日志寫入到另外一張數據表中,用作專門的查詢。
在表格存儲的控制臺創建表格存儲實例(__本次以 華東2 distribute-test 為例__),并創建源表(__source_data__)及結果表(__result__),主鍵為均 __id (整型)__,由于表格存儲是 schemafree 結構,無需預先定義其他屬性列字段。
觸發器功能需要先開啟數據表的Stream功能,才能在函數計算中處理寫入表格存儲中的增量數據。
Stream記錄過期時長 為通過 StreamAPI 能夠讀取到的增量數據的最長時間。
由于觸發器只能綁定現有的函數,故先到函數計算的控制臺上在同region創建服務及函數。
在函數計算的控制臺上創建服務及處理函數,我們繼續使用華東2節點。
1.在華東2節點創建服務。
2.創建函數依次選擇:空白函數——不創建觸發器。
函數名稱為:etl_test,選擇 python2.7 環境,在線編輯代碼
函數入口為:etl_test.handler
代碼稍后編輯,點擊下一步。
3.進行服務授權
由于函數計算需要將運行中的日志寫入到日志服務中,同時,需要對表格存儲的表進行讀寫,故需要對函數計算進行授權,為方便起見,我們先添加 AliyunOTSFullAccess 與 __AliyunLogFullAccess __權限,實際生產中,建議根據權限最小原則來添加權限。
4.點擊授權完成,并創建函數。
5.修改函數代碼。
創建好函數之后,點擊對應的函數
—代碼執行
,編輯代碼并保存,其中,INSTANCE_NAME(表格存儲的實例名稱)、REGION(使用的區域)需要根據情況進行修改:
使用示例代碼如下:
#!/usr/bin/env python # -*- coding: utf-8 -*- import cbor import json import tablestore as ots INSTANCE_NAME = 'distribute-test' REGION = 'cn-shanghai' ENDPOINT = 'http://%s.%s.ots-internal.aliyuncs.com'%(INSTANCE_NAME, REGION) RESULT_TABLENAME = 'result' def _utf8(input): return str(bytearray(input, "utf-8")) def get_attrbute_value(record, column): attrs = record[u'Columns'] for x in attrs: if x[u'ColumnName'] == column: return x['Value'] def get_pk_value(record, column): attrs = record[u'PrimaryKey'] for x in attrs: if x['ColumnName'] == column: return x['Value'] #由于已經授權了AliyunOTSFullAccess權限,此處獲取的credentials具有訪問表格存儲的權限 def get_ots_client(context): creds = context.credentials client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken) return client def save_to_ots(client, record): id = int(get_pk_value(record, 'id')) level = int(get_attrbute_value(record, 'level')) msg = get_attrbute_value(record, 'message') pk = [(_utf8('id'), id),] attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),] row = ots.Row(pk, attr) client.put_row(RESULT_TABLENAME, row) def handler(event, context): records = cbor.loads(event) #records = json.loads(event) client = get_ots_client(context) for record in records['Records']: level = int(get_attrbute_value(record, 'level')) if level > 1: save_to_ots(client, record) else: print "Level <= 1, ignore."
對表格存儲 Stream 數據的格式詳情請參考Stream 數據處理
1.回到表格存儲的實例管理頁面,點擊表 source_data 后的 使用觸發器 按鈕,進入觸發器綁定界面,點擊使用已有函數計算
, 選擇剛創建的服務及函數,勾選 表格存儲發送事件通知的權限
, 進行確定。
2.綁定成功之后,能夠看到如下的信息:
1.向 source_data 表中寫入數據。
2.在 result 表中查詢清洗后的數據
點擊 result 表的數據管理頁面,會查詢到剛寫入到 source_data 中的數據。 當然,向 soure_data 寫入level <=1的數據將不會同步到 result 表中
看完上述內容,你們對 如何使用Function Compute對表格存儲中數據做簡單清洗有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。