亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

大數據開發中Flink-CEP怎么用

發布時間:2021-11-23 14:37:57 來源:億速云 閱讀:171 作者:小新 欄目:大數據

這篇文章主要為大家展示了“大數據開發中Flink-CEP怎么用”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“大數據開發中Flink-CEP怎么用”這篇文章吧。

總結就是:輸入-規則-輸出

就是單事件的自關聯,其實匹配的也是時間序列的

大數據開發中Flink-CEP怎么用

定義基礎

(1)定義 復合事件處理(Complex Event Processing,CEP)是一種基于動態環境中事件流的分析技術,事件在這 里通常是有意義的狀態變化,通過分析事件間的關系,利用過濾、關聯、聚合等技術,根據事件間的時序關系和聚合 關系制定檢測規則,持續地從事件流中查詢出符合要求的事件序列,最終分析得到更復雜的復合事件

(2)特征 CEP的特征如下: 目標:從有序的簡單事件流中發現一些高階特征; 輸入:一個或多個簡單事件構成的事件流; 處 理:識別簡單事件之間的內在聯系,多個符合一定規則的簡單事件構成復雜事件; 輸出:滿足規則的復雜事件

(3)功能

CEP用于分析低延遲、頻繁產生的不同來源的事件流。CEP可以幫助在復雜的、不相關的時間流中找出有 意義的模式和復雜的關系,以接近實時或準實時的獲得通知或組織一些行為。 CEP支持在流上進行模式匹配,根據模 式的條件不同,分為連續的條件或不連續的條件;模式的條件允許有時間的限制,當條件范圍內沒有達到滿足的條件 時,會導致模式匹配超時。 看起來很簡單,但是它有很多不同的功能: ① 輸入的流數據,盡快產生結果; ② 在2個 事件流上,基于時間進行聚合類的計算; ③ 提供實時/準實時的警告和通知; ④ 在多樣的數據源中產生關聯分析模 式; ⑤ 高吞吐、低延遲的處理 市場上有多種CEP的解決方案,例如Spark、Samza、Beam等,但他們都沒有提供專 門的庫支持。然而,Flink提供了專門的CEP庫。

(4)主要組件 Flink為CEP提供了專門的Flink CEP library

它包含如下組件:Event Stream、Pattern定義、Pattern檢測和生成Alert。 首先,開發人員要在DataStream流上定義出模 式條件,之后Flink CEP引擎進行模式檢測,必要時生成警告。

CEP里面的模式API

(1)個體模式(Individual Patterns) 組成復雜規則的每一個單

獨的模式定義,就是個體模式。

start.times(3).where(_.behavior.startsWith(‘fav’))

(2)組合模式(Combining Patterns,也叫模式序列) 很多個體模式組合起來,就形成了整個的模式序列。 模式序列

必須以一個初始模式開始:

val start = Pattern.begin(‘start’)

(3)模式組(Group of Pattern) 將一個模式序列作為條件嵌套在個體模式里,成為一組模式

個體模式

個體模式包括單例模式和循環模式。單例模式只接收一個事件,而循環模式可以接收多個事件,

(1)量詞 可以在一個個體模式后追加量詞,也就是指定循環次數。

// 匹配出現4次
start.time(4)
// 匹配出現0次或4次
start.time(4).optional
// 匹配出現2、3或4次
start.time(2,4)
// 匹配出現2、3或4次,并且盡可能多地重復匹配
start.time(2,4).greedy
// 匹配出現1次或多次
start.oneOrMore
// 匹配出現0、2或多次,并且盡可能多地重復匹配
start.timesOrMore(2).optional.greedy

(2)條件 每個模式都需要指定觸發條件,作為模式是否接受事件進入的判斷依據。CEP中的個體模式主要通過調 用.where()、.or()和.until()來指定條件。按不同的調用方式,可以分成以下幾類: ① 簡單條件 通過.where()方法對事 件中的字段進行判斷篩選,決定是否接收該事件

start.where(event=>event.getName.startsWith(“foo”))

② 組合條件 將簡單的條件進行合并;or()方法表示或邏輯相連,where的直接組合就相當于與and。 Pattern.where(event => …/some condition/).or(event => /or condition/) ③ 終止條件 如果使用了oneOrMore或者oneOrMore.optional,建議使用.until()作為終止條件,以便清理狀態。 ④ 迭代條件 能夠對模式之前所有接收的事件進行處理;調用.where((value,ctx) => {…}),可以調用 ctx.getEventForPattern(“name”)

模式序列

大數據開發中Flink-CEP怎么用

(1)嚴格近鄰

所有事件按照嚴格的順序出現,中間沒有任何不匹配的事件,由.next()指定。例如對于模式“a next b”,事件序列“a,c,b1,b2”沒有匹配。 (2)寬松近鄰 允許中間出現不匹配的事件,由.followedBy()指定。例如對于模 式“a followedBy b”,事件序列“a,c,b1,b2”匹配為{a,b1}。 (3)非確定性寬松近鄰 進一步放寬條件,之前已經匹配過 的事件也可以再次使用,由.followedByAny()指定。例如對于模式“a followedByAny b”,事件序列“a,c,b1,b2”匹配為 {ab1},{a,b2}。 除了以上模式序列外,還可以定義“不希望出現某種近鄰關系”: .notNext():不想讓某個事件嚴格緊 鄰前一個事件發生。 .notFollowedBy():不想讓某個事件在兩個事件之間發生。 需要注意:

  • 所有模式序列必須以.begin()開始;

  • 模式序列不能以.notFollowedBy()結束;

  • “not”類型的模式不能被optional所修飾;

  • 可以為模式指定時間約束,用來要求在多長時間內匹配有效。 next.within(Time.seconds(10))

模式的檢測

定要查找的模式序列后,就可以將其應用于輸入流以檢測潛在匹配。調用CEP.pattern(),給定輸入流和模式,就能 得到一個PatternStream。

val input:DataStream[Event] = …
val pattern:Pattern[Event,_] = …
val patternStream:PatternStream[Event]=CEP.pattern(input,pattern)

匹配事件的提取

創建PatternStream之后,就可以應用select或者flatSelect方法,從檢測到的事件序列中提取事件了。 select()方法 需要輸入一個select function作為參數,每個成功匹配的事件序列都會調用它。 select()以一個 Map[String,Iterable[IN]]來接收匹配到的事件序列,其中key就是每個模式的名稱,而value就是所有接收到的事件的 Iterable類型。

def selectFn(pattern : Map[String,Iterable[IN]]):OUT={
  val startEvent = pattern.get(“start”).get.next
  val endEvent = pattern.get(“end”).get.next
  OUT(startEvent, endEvent)
}

flatSelect通過實現PatternFlatSelectFunction實現與select相似的功能。唯一的區別就是flatSelect方法可以返回多條 記錄,它通過一個Collector[OUT]類型的參數來將要輸出的數據傳遞到下游

超時事件的提取

當一個模式通過within關鍵字定義了檢測窗口時間時,部分事件序列可能因為超過窗口長度而被丟棄;為了能夠處理

這些超時的部分匹配,select和flatSelect API調用允許指定超時處理程序。

Flink CEP 開發流程:

  1. DataSource 中的數據轉換為 DataStream;

  2. 定義 Pattern,并將 DataStream 和 Pattern 組合轉換為 PatternStream;

  3. PatternStream 經過 select、process 等算子轉換為 DataStraem;

  4. 再次轉換的 DataStream 經過處理后,sink 到目標庫。

select方法:

SingleOutputStreamOperator<PayEvent> result = patternStream.select(orderTimeoutOutput, new

  PatternTimeoutFunction<PayEvent, PayEvent>() {
  
  @Override
  
  public PayEvent timeout(Map<String, List<PayEvent>> map, long l) throws Exception {
  
  return map.get("begin").get(0);
  
  }

}, new PatternSelectFunction<PayEvent, PayEvent>() {

@Override

public PayEvent select(Map<String, List<PayEvent>> map) throws Exception {

return map.get("pay").get(0);

}

});

對檢測到的模式序列應用選擇函數。對于每個模式序列,調用提供的{@link PatternSelectFunction}。模式選擇函數

只能產生一個結果元素。

對超時的部分模式序列應用超時函數。對于每個部分模式序列,調用提供的{@link PatternTimeoutFunction}。模式

超時函數只能產生一個結果元素。

您可以在使用相同的{@link OutputTag}進行select操作的{@link SingleOutputStreamOperator}上獲得由{@link

SingleOutputStreamOperator}生成的{@link SingleOutputStreamOperator}生成的超時數據流。

@param timedOutPartialMatchesTag 標識端輸出超時模式的@link OutputTag}

@param patternTimeoutFunction 為超時的每個部分模式序列調用的模式超時函數。

@param patternSelectFunction 為每個檢測到的模式序列調用的模式選擇函數。

@param 產生的超時元素的類型

@param 結果元素的類型

return {@link DataStream},其中包含產生的元素和在邊輸出中產生的超時元素。

DataStream<PayEvent> sideOutput = result.getSideOutput(orderTimeoutOutput);

獲取{@link DataStream},該{@link DataStream}包含由操作發出到指定{@link OutputTag}的邊輸出的元素

Flink CEP 開發流程

  1. DataSource 中的數據轉換為 DataStream;watermark、keyby

  2. 定義 Pattern,并將 DataStream 和 Pattern 組合轉換為 PatternStream;

  3. PatternStream 經過 sele ct、process 等算子轉換為 DataStream;

  4. 再次轉換的 DataStream 經過處理后,sink 到目標庫

CEP實現的主要原理

FlinkCEP在運行時會將用戶的邏輯轉化成這樣的一個NFA Graph (nfa對象) 所以有限狀態機的工作過程,就是從開始狀態,根據不同的輸入,自動進行狀態轉換的過程

大數據開發中Flink-CEP怎么用

上圖中的狀態機的功能,是檢測二進制數是否含有偶數個 0。從圖上可以看出,輸入只有 1 和 0 兩種。從 S1 狀態開 始,只有輸入 0 才會轉換到 S2 狀態,同樣 S2 狀態下只有輸入 0 才會轉換到 S1。所以,二進制數輸入完畢,如果滿 足最終狀態,也就是最后停在 S1 狀態,那么輸入的二進制數就含有偶數個 0 大數據開發

以上是“大數據開發中Flink-CEP怎么用”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

尼玛县| 民勤县| 都江堰市| 扎兰屯市| 广元市| 宁化县| 读书| 新密市| 专栏| 吴江市| 潼关县| 佛冈县| 凤城市| 衢州市| 长顺县| 海原县| 丰台区| 克什克腾旗| 胶南市| 通海县| 平果县| 江口县| 高青县| 瑞安市| 台州市| 尉犁县| 墨竹工卡县| 博客| 平塘县| 平南县| 南澳县| 双鸭山市| 扎鲁特旗| 延川县| 武义县| 东兰县| 天长市| 德钦县| 斗六市| 明溪县| 西林县|