您好,登錄后才能下訂單哦!
本篇內容主要講解“Spout的實現過程”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Spout的實現過程”吧!
Spout的實現過程:
· 對文件的改變進行分開的監聽,并監視目錄下有無新日志文件添加。
· 在數據得到了字段的說明后,將其轉換成tuple。
· 聲明Spout和Bolt之間的分組,并決定tuple發送給Bolt的途徑。
Spout的具體編碼在Listing Three中顯示。
Listing Three:Spout中open、nextTuple和delcareOutputFields方法的邏輯。
1. public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )
2. {
3. _collector = collector;
4. try
5. {
6. fileReader = new BufferedReader(new FileReader(new File(file)));
7. }
8. catch (FileNotFoundException e)
9. {
10. System.exit(1);
11. }
12. }
13.
14. public void nextTuple()
15. {
16. protected void ListenFile(File file)
17. {
18. Utils.sleep(2000);
19. RandomAccessFile access = null;
20. String line = null;
21. try
22. {
23. while ((line = access.readLine()) != null)
24. {
25. if (line !=null)
26. {
27. String[] fields=null;
28. if (tupleInfo.getDelimiter().equals("|")) fields = line.split("\\"+tupleInfo.getDelimiter());
29. else
30. fields = line.split (tupleInfo.getDelimiter());
31. if (tupleInfo.getFieldList().size() == fields.length) _collector.emit(new Values(fields));
32. }
33. }
34. }
35. catch (IOException ex){ }
36. }
37. }
38.
39. public void declareOutputFields(OutputFieldsDeclarer declarer)
40. {
41. String[] fieldsArr = new String [tupleInfo.getFieldList().size()];
42. for(int i=0; i<tupleInfo.getFieldList().size(); i++)
43. {
44. fieldsArr = tupleInfo.getFieldList().get(i).getColumnName();
45. }
46. declarer.declare(new Fields(fieldsArr));
47. }
declareOutputFileds()決定了tuple發射的格式,這樣的話Bolt就可以用類似的方法將tuple譯碼。Spout持續對日志文件的數據的變更進行監聽,一旦有添加Spout就會進行讀入并且發送給Bolt進行處理。
到此,相信大家對“Spout的實現過程”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。