您好,登錄后才能下訂單哦!
這篇“基于FLink如何實現實時安全檢測”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“基于FLink如何實現實時安全檢測”文章吧。
針對一個內部系統,如郵件系統,公司員工的訪問行為日志,存放于kafka,我們希望對于一個用戶賬號在同一個IP下,任意的3分鐘時間內,連續登錄郵件系統20次失敗,下一次登錄成功,這種場景能夠及時獲取并推送到企業微信某個指定的安全接口人。kafka中的數據,能夠通過某個關鍵字,區分當前網絡訪問是否一次登錄事件,且有訪問時間(也就是事件時間)。在解析到符合需求的用戶賬號之后,第一時間進行企業微信告警推送,并將其這段時間內的訪問行為,寫入下游ElasticSearch。
Flink-1.14.4
Java8
ElasticSearch-7.3.2
Kafka-2.12_2.8.1
IP和賬號皆為測試使用。
{ "user": "wangxm", "client_ip": "110.68.6.182", "source": "login", "loginname": "wangxm@test.com", "IP": "110.8.148.58", "timestamp": "17:58:12", "@timestamp": "2022-04-20T09:58:13.647Z", "ip": "110.7.231.25", "clienttype": "POP3", "result": "success", "@version": "1" }
上述場景,可考慮使用FlinkCEP及Flink的滑動窗口進行實現。由于本人在采用FlinkCEP的方案進行代碼編寫調試后,發現并不能滿足,因此改用滑動窗口進行實現。
主入口類,創建了flink環境、設置了基礎參數,創建了kafkaSource,接入消息后,進行了映射、過濾,并設置了水位線,進行了分組,之后設置了滑動窗口,在窗口內進行了事件統計,將復合條件的事件收集返回并寫入ElasticSearch。
針對map、filter、keyBy、window等算子,都單獨進行了編寫,后面會一一列出來。
package com.data.dev.flink.mailTopic.main; import com.data.dev.common.javabean.BaseBean; import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm; import com.data.dev.elasticsearch.ElasticSearchInfo; import com.data.dev.elasticsearch.SinkToEs; import com.data.dev.flink.FlinkEnv; import com.data.dev.flink.mailTopic.OperationForLoginFailCheck.*; import com.data.dev.kafka.KafkaSourceBuilder; import com.data.dev.key.ConfigurationKey; import com.data.dev.utils.TimeUtils; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.time.Duration; /** * Flink處理在3分鐘內連續登錄失敗20次后登錄成功的場景 * 采用滑動窗口來實現 * @author wangxiaomin 2022-06-01 */ @Slf4j public class MailMsg extends BaseBean { /** * Flink作業名稱 */ public static final String JobName = "告警采集平臺——連續登錄失敗后登錄成功告警"; /** * Kafka消息名 */ public static final String KafkaSourceName = "Kafka Source for AlarmPlatform About Mail Topic"; public MailMsg(){ log.info("初始化滑動窗口場景告警程序"); } /** * 執行邏輯統計場景,實現告警推送 */ public static void execute(){ //① 創建Flink執行環境并設置checkpoint等必要的參數 StreamExecutionEnvironment env = FlinkEnv.getFlinkEnv(); KafkaSource<String> kafkaSource = KafkaSourceBuilder.getKafkaSource(ConfigurationKey.KAFKA_MAIL_TOPIC_NAME,ConfigurationKey.KAFKA_MAIL_CONSUMER_GROUP_ID) ; DataStreamSource<String> kafkaMailMsg = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(10)), KafkaSourceName); //② 篩選登錄消息,創建初始登錄事件流 SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginMapDs = kafkaMailMsg.map(new MsgToBeanMapper()).name("Map算子加工"); SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginFilterDs = loginMapDs.filter(new MailMsgForLoginFilter()).name("Filter算子加工"); //③ 設置水位線 WatermarkStrategy<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> watermarkStrategy = WatermarkStrategy.<com.data.dev.common.javabean.kafkaMailTopic.MailMsg>forBoundedOutOfOrderness(Duration.ofMinutes(1)) .withTimestampAssigner((mailMsg, timestamp) -> TimeUtils.switchUTCToBeijingTimestamp(mailMsg.getTimestamp_datetime())); SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginWmDs = loginFilterDs.assignTimestampsAndWatermarks(watermarkStrategy.withIdleness(Duration.ofMinutes(3))).name("增加水位線"); //④ 設置主鍵 KeyedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String> loginKeyedDs = loginWmDs.keyBy(new LoginKeySelector()); //⑥ 轉化為滑動窗口 WindowedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String, TimeWindow> loginWindowDs = loginKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(180L),Time.seconds(90L))); //⑦ 在窗口內進行邏輯統計 SingleOutputStreamOperator<MailMsgAlarm> loginWindowsDealDs = loginWindowDs.process(new WindowProcessFuncImpl()).name("窗口處理邏輯"); //⑧ 將結果轉化為通用DataStream<String>格式 SingleOutputStreamOperator<String> resultDs = loginWindowsDealDs.map(new AlarmMsgToStringMapper()).name("窗口結果轉化為標準格式"); //⑨ 將最終結果寫入ES resultDs.addSink(SinkToEs.getEsSinkBuilder(ElasticSearchInfo.ES_LOGIN_FAIL_INDEX_NAME,ElasticSearchInfo.ES_INDEX_TYPE_DEFAULT).build()); //⑩ 提交Flink集群進行執行 FlinkEnv.envExec(env,JobName); } }
package com.data.dev.flink.mailTopic.OperationForLoginFailCheck; import com.alibaba.fastjson.JSON; import com.data.dev.common.javabean.BaseBean; import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.MapFunction; /** * 邏輯統計場景告警推送ES消息體 * @author wangxiaoming-ghq 2022-06-01 */ @Slf4j public class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> { @Override public String map(MailMsgAlarm mailMsgAlarm) throws Exception { return JSON.toJSONString(mailMsgAlarm); } }
package com.data.dev.flink.mailTopic.OperationForLoginFailCheck; import com.data.dev.common.javabean.BaseBean; import com.data.dev.common.javabean.kafkaMailTopic.MailMsg; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.FilterFunction; /** * ② 消費mail主題的消息,過濾其中login的事件 * @author wangxiaoming-ghq 2022-06-01 */ @Slf4j public class MailMsgForLoginFilter extends BaseBean implements FilterFunction<MailMsg> { @Override public boolean filter(MailMsg mailMsg) { if("login".equals(mailMsg.getSource())) { log.info("篩選原始的login事件:【" + mailMsg + "】"); } return "login".equals(mailMsg.getSource()); } }
package com.data.dev.flink.mailTopic.OperationForLoginFailCheck; import com.data.dev.common.javabean.BaseBean; import com.data.dev.common.javabean.kafkaMailTopic.MailMsg; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.java.functions.KeySelector; /** * CEP 編程,需要進行key選取 */ @Slf4j public class LoginKeySelector extends BaseBean implements KeySelector<MailMsg, String> { @Override public String getKey(MailMsg mailMsg) { return mailMsg.getUser() + "@" + mailMsg.getClient_ip(); } }
這里我們主要考慮使用一個事件列表,用來存儲每一個窗口期內得到的連續登錄,當檢測到登陸失敗的事件,即存入事件列表中,之后判斷下一次登錄失敗事件,如果檢測到登錄成功事件,但此時登錄失敗的次數不足20次,則清空loginEventList,等待下一次檢測。一旦符合窗口內連續登錄失敗超過20次且下一次登錄成功這個事件,則清空此時的loginEventList并將當前登錄成功的事件進行告警推送。
package com.data.dev.flink.mailTopic.OperationForLoginFailCheck; import com.data.dev.common.javabean.kafkaMailTopic.MailMsg; import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm; import com.data.dev.utils.HttpUtils; import com.data.dev.utils.IPUtils; import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.io.Serializable; import java.util.ArrayList; import java.util.List; /** * 滑動窗口內復雜事件解析邏輯實現 * @author wangxiaoming-ghq 2022-06-01 */ @Slf4j public class WindowProcessFuncImpl extends ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow> implements Serializable { @Override public void process(String key, ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow>.Context context, Iterable<MailMsg> iterable, Collector<MailMsgAlarm> collector) { List<MailMsg> loginEventList = new ArrayList<>(); MailMsgAlarm mailMsgAlarm; for (MailMsg mailMsg : iterable) { log.info("收集到的登錄事件【" + mailMsg + "】"); if (mailMsg.getResult().equals("fail")) { //開始檢測當前窗口內的事件,并將失敗的事件收集到loginEventList log.info("開始檢測當前窗口內的事件,并將失敗的事件收集到loginEventList"); loginEventList.add(mailMsg); } else if (mailMsg.getResult().equals("success") && loginEventList.size() < 20) {//如果檢測到登錄成功事件,但此時登錄失敗的次數不足20次,則清空loginEventList,等待下一次檢測 log.info("檢測到登錄成功事件,但此時登錄失敗的次數為【" + loginEventList.size() + "】不足20次,清空loginEventList,等待下一次檢測"); loginEventList.clear(); } else if (mailMsg.getResult().equals("success") && loginEventList.size() >= 20) { mailMsgAlarm = getMailMsgAlarm(loginEventList,mailMsg); log.info("檢測到登錄成功的事件,此時窗口內連續登錄失敗的次數為【" + mailMsgAlarm.getFailTimes() + "】"); //一旦符合窗口內連續登錄失敗超過20次且下一次登錄成功這個事件,則清空此時的loginEventList并將當前登錄成功的事件進行告警推送; loginEventList.clear(); doAlarmPush(mailMsgAlarm); collector.collect(mailMsgAlarm);//將當前登錄成功的事件進行收集上報 } else { log.info(mailMsg.getUser() + "當前已連續:【" + loginEventList.size() + "】 次登錄失敗"); } } } /** * 2022年6月17日15:03:06 * @param eventList:當前窗口內的事件列表 * @param eventCurrent:當前登錄成功的事件 * @return mailMsgAlarm:告警消息體 */ public static MailMsgAlarm getMailMsgAlarm(List<MailMsg> eventList,MailMsg eventCurrent){ String alarmKey = eventCurrent.getUser() + "@" + eventCurrent.getClient_ip(); String loginFailStartTime = eventList.get(0).getTimestamp_datetime(); String loginSuccessTime = eventCurrent.getTimestamp_datetime(); int loginFailTimes = eventList.size(); MailMsgAlarm mailMsgAlarm = new MailMsgAlarm(); mailMsgAlarm.setMailMsg(eventCurrent); mailMsgAlarm.setAlarmKey(alarmKey); mailMsgAlarm.setStartTime(loginFailStartTime); mailMsgAlarm.setEndTime(loginSuccessTime); mailMsgAlarm.setFailTimes(loginFailTimes); return mailMsgAlarm; } /** * 2022年6月17日14:47:53 * @param mailMsgAlarm :當前構建的需要告警的事件 */ public void doAlarmPush(MailMsgAlarm mailMsgAlarm){ String userKey = mailMsgAlarm.getAlarmKey(); String clientIp = mailMsgAlarm.mailMsg.getClient_ip(); boolean isWhiteListIp = IPUtils.isWhiteListIp(clientIp); if(isWhiteListIp){//如果是白名單IP,不告警 log.info("當前登錄用戶【" + userKey + "】屬于白名單IP"); }else { //IP歸屬查詢結果、企業微信推送告警 String user = HttpUtils.getUserByClientIp(clientIp); HttpUtils.pushAlarmMsgToWechatWork(user,mailMsgAlarm.toString()); } } }
package com.data.dev.flink.mailTopic.OperationForLoginFailCheck; import com.alibaba.fastjson.JSON; import com.data.dev.common.javabean.BaseBean; import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.MapFunction; /** * 邏輯統計場景告警推送ES消息體 * @author wangxiaoming-ghq 2022-06-01 */ @Slf4j public class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> { @Override public String map(MailMsgAlarm mailMsgAlarm) throws Exception { return JSON.toJSONString(mailMsgAlarm); } }
package com.data.dev.elasticsearch; import com.data.dev.common.javabean.BaseBean; import com.data.dev.key.ConfigurationKey; import com.data.dev.key.ElasticSearchKey; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 2022年6月17日15:15:06 * @author wangxiaoming-ghq * Flink流計算結果寫入ES公共方法 */ @Slf4j public class SinkToEs extends BaseBean { public static final long serialVersionUID = 2L; private static final HashMap<String,String> ES_PROPS_MAP = ConfigurationKey.getApplicationProps(); private static final String HOST = ES_PROPS_MAP.get(ConfigurationKey.ES_HOST); private static final String PASSWORD = ES_PROPS_MAP.get(ConfigurationKey.ES_PASSWORD); private static final String USERNAME = ES_PROPS_MAP.get(ConfigurationKey.ES_USERNAME); private static final String PORT = ES_PROPS_MAP.get(ConfigurationKey.ES_PORT); /** * 2022年6月17日15:17:55 * 獲取ES連接信息 * @return esInfoMap:ES連接信息持久化 */ public static HashMap<String,String > getElasticSearchInfo(){ log.info("獲取ES連接信息:【 " + "HOST="+HOST + "PORT="+PORT+"USERNAME="+USERNAME+"PASSWORD=********" + " 】"); HashMap<String,String> esInfoMap = new HashMap<>(); esInfoMap.put(ElasticSearchKey.HOST,HOST); esInfoMap.put(ElasticSearchKey.PASSWORD,PASSWORD); esInfoMap.put(ElasticSearchKey.USERNAME,USERNAME); esInfoMap.put(ElasticSearchKey.PORT,PORT); return esInfoMap; } /** * @param esIndexName:寫入索引名稱 * @param esType:寫入索引類型 * @return ElasticsearchSink.Builder<String>:構建器 */ public static ElasticsearchSink.Builder<String> getEsSinkBuilder(String esIndexName,String esType){ HashMap<String, String> esInfoMap = getElasticSearchInfo(); List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost(String.valueOf(esInfoMap.get(ElasticSearchKey.HOST)), Integer.parseInt(esInfoMap.get(ElasticSearchKey.PORT)), "http")); ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction<String>() { public IndexRequest createIndexRequest() { Map<String, String> json = new HashMap<>(); //log.info("寫入ES的data:【"+json+"】"); IndexRequest index = Requests.indexRequest() .index(esIndexName) .type(esType) .source(json); return index; } @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest()); } } ); //定義es的連接配置 帶用戶名密碼 RestClientFactory restClientFactory = restClientBuilder -> { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials( AuthScope.ANY, new UsernamePasswordCredentials( String.valueOf(esInfoMap.get(ElasticSearchKey.USERNAME)), String.valueOf(esInfoMap.get(ElasticSearchKey.PASSWORD)) ) ); restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { httpAsyncClientBuilder.disableAuthCaching(); return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); }); }; esSinkBuilder.setRestClientFactory(restClientFactory); return esSinkBuilder; } }
package com.data.dev.common.javabean.kafkaMailTopic; import com.data.dev.common.javabean.BaseBean; import lombok.Data; import java.util.Objects; /** * @author wangxiaoming-ghq 2022-05-15 * 邏輯統計場景告警事件 */ @Data public class MailMsgAlarm extends BaseBean { /** * 當前登錄成功的事件 */ public MailMsg mailMsg; /** * 當前捕獲的告警主鍵:username@client_ip */ public String alarmKey; /** * 第一次登錄失敗的事件時間 */ public String startTime; /** * 連續登錄失敗后下一次登錄成功的事件時間 */ public String endTime; /** * 連續登錄失敗的次數 */ public int failTimes; @Override public String toString() { return "{" + " 'mailMsg_login_success':'" + mailMsg + "'" + ", 'alarmKey':'" + alarmKey + "'" + ", 'start_login_time_in3min':'" +startTime + "'" + ", 'end_login_time_in3min':'" +endTime + "'" + ", 'login_fail_times':'" +failTimes + "'" + "}"; } public MailMsgAlarm() { } @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof MailMsgAlarm)) return false; MailMsgAlarm that = (MailMsgAlarm) o; return getFailTimes() == that.getFailTimes() && getMailMsg().equals(that.getMailMsg()) && getAlarmKey().equals(that.getAlarmKey()) && getStartTime().equals(that.getStartTime()) && getEndTime().equals(that.getEndTime()); } @Override public int hashCode() { return Objects.hash(getMailMsg(), getAlarmKey(), getStartTime(), getEndTime(), getFailTimes()); } }
package com.data.dev.common.javabean.kafkaMailTopic; import com.data.dev.common.javabean.BaseBean; import lombok.Data; import java.util.Objects; /** * { * "user": "wangxm", * "client_ip": "110.68.6.182", * "source": "login", * "loginname": "wangxm@test.com", * "IP": "110.8.148.58", * "timestamp": "17:58:12", * "@timestamp": "2022-04-20T09:58:13.647Z", * "ip": "110.7.231.25", * "clienttype": "POP3", * "result": "success", * "@version": "1" * } * * user登錄用戶 * client_ip 來源ip * source 類型 * loginname 登錄用戶郵箱地址 * ip 目標前端ip * timestamp 發送時間 * @timestamp 發送日期時間 * IP 郵件日志發送來源IP * clienttype 客戶端登錄類型 * result 登錄狀態 */ @Data public class MailMsg extends BaseBean { public String user; public String client_ip; public String source; public String loginName; public String mailSenderSourceIp; public String timestamp_time; public String timestamp_datetime; public String ip; public String clientType; public String result; public String version; public MailMsg() { } public MailMsg(String user, String client_ip, String source, String loginName, String mailSenderSourceIp, String timestamp_time, String timestamp_datetime, String ip, String clientType, String result, String version) { this.user = user; this.client_ip = client_ip; this.source = source; this.loginName = loginName; this.mailSenderSourceIp = mailSenderSourceIp; this.timestamp_time = timestamp_time; this.timestamp_datetime = timestamp_datetime; this.ip = ip; this.clientType = clientType; this.result = result; this.version = version; } @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof MailMsg)) return false; MailMsg mailMsg = (MailMsg) o; return getUser().equals(mailMsg.getUser()) && getClient_ip().equals(mailMsg.getClient_ip()) && getSource().equals(mailMsg.getSource()) && getLoginName().equals(mailMsg.getLoginName()) && getMailSenderSourceIp().equals(mailMsg.getMailSenderSourceIp()) && getTimestamp_time().equals(mailMsg.getTimestamp_time()) && getTimestamp_datetime().equals(mailMsg.getTimestamp_datetime()) && getIp().equals(mailMsg.getIp()) && getClientType().equals(mailMsg.getClientType()) && getResult().equals(mailMsg.getResult()) && getVersion().equals(mailMsg.getVersion()); } @Override public int hashCode() { return Objects.hash(getUser(), getClient_ip(), getSource(), getLoginName(), getMailSenderSourceIp(), getTimestamp_time(), getTimestamp_datetime(), getIp(), getClientType(), getResult(), getVersion()); } @Override public String toString() { return "{" + " 'user':'" + user + "'" + ", 'client_ip':'" + client_ip + "'" + ", 'source':'" + source + "'" + ", 'loginName':'" + loginName + "'" + ", 'IP':'" + mailSenderSourceIp + "'" + ", 'timestamp':'" + timestamp_time + "'" + ", '@timestamp':'" + timestamp_datetime + "'" + ", 'ip':'" + "'" + ", 'clientType':'" + clientType + "'" + ", 'result':'" + result + "'" + ", 'version':'" + version + "'" + "}"; } }
以上就是關于“基于FLink如何實現實時安全檢測”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。