您好,登錄后才能下訂單哦!
下文主要給大家帶來設計、實現分布式爬蟲系統的實踐解析,希望這些文字能夠帶給大家實際用處,這也是我編輯這篇文章的主要目的。好了,廢話不多說,大家直接看下文吧。
在不用爬蟲框架的情況,經過多方學習,嘗試實現了一個分布式爬蟲系統,并且可以將數據保存到不同地方,類似MySQL、HBase等。
基于面向接口的編碼思想來開發,因此這個系統具有一定的擴展性,有興趣的朋友直接看一下代碼,就能理解其設計思想,雖然代碼目前來說很多地方還是比較緊耦合,但只要花些時間和精力,很多都是可抽取出來并且可配置化的。
因為時間的關系,我只寫了京東和蘇寧易購兩個網站的爬蟲,但是完全可以實現不同網站爬蟲的隨機調度,基于其代碼結構,再寫國美、天貓等的商品爬取,難度不大,但是估計需要花很多時間和精力。因為在解析網頁的數據時,實際上需要花很多時間,比如我在爬取蘇寧易購商品的價格時,價格是異步獲取的,并且其api是一長串的數字組合,我花了幾個小時的時間才發現其規律,當然也承認,我的經驗不足。
這個系統的設計,除了基本的數據爬取以外,更關注以下幾個方面的問題:
下面會針對這個系統來做一個整體的基本介紹,其實我在代碼中都有非常詳細的注釋,有興趣的朋友可以參考一下代碼,最后我會給出一些我爬蟲時的數據分析。
另外需要注意的是,這個爬蟲系統是基于Java實現的,但是語言本身仍然不是最重要的,有興趣的朋友可以嘗試用Python實現。
整體系統架構如下:
所以從上面的架構可以看出,整個系統主要分為三個部分:
爬蟲系統就是用來爬取數據的,因為系統設計為分布式,因此,爬蟲程序本身可以運行在不同的云服務器節點上。
url調度系統核心在于url倉庫,所謂的url倉庫其實就是用Redis保存了需要爬取的url列表,并且在我們的url調度器中根據一定的策略來消費其中的url,從這個角度考慮,url倉庫其實也是一個url隊列。
監控報警系統主要是對爬蟲節點進行監控,雖然并行執行的爬蟲節點中的某一個掛掉了對整體數據爬取本身沒有影響(只是降低了爬蟲的速度),但是我們還是希望知道能夠主動接收到節點掛掉的通知,而不是被動地發現。
下面將會針對以上三個方面并結合部分代碼片段來對整個系統的設計思路做一些基本的介紹,對系統完整實現有濃厚興趣的朋友可以直接參考源代碼。
(說明:zookeeper監控屬于監控報警系統,url調度器屬于URL調度系統)
爬蟲系統是一個獨立運行的進程,我們把我們的爬蟲系統打包成jar包,然后分發到不同的節點上執行,這樣并行爬取數據可以提高爬蟲的效率。
加入隨機IP代理主要是為了反反爬蟲,因此如果有一個IP代理庫,并且可以在構建http客戶端時可以隨機地使用不同的代理,那么對我們進行反反爬蟲則會有很大的幫助。
在系統中使用IP代理庫,需要先在文本文件中添加可用的代理地址信息:
# IPProxyRepository.txt
58.60.255.104:8118
219.135.164.245:3128
27.44.171.27:9999
219.135.164.245:3128
58.60.255.104:8118
58.252.6.165:9000
......
需要注意的是,上面的代理IP是我在西刺代理上拿到的一些代理IP,不一定可用,建議是自己花錢購買一批代理IP,這樣可以節省很多時間和精力去尋找代理IP。
然后在構建http客戶端的工具類中,當第一次使用工具類時,會把這些代理IP加載進內存中,加載到Java的一個HashMap:
// IP地址代理庫Map
private static Map<String, Integer> IPProxyRepository = new HashMap<>();
private static String[] keysArray = null; // keysArray是為了方便生成隨機的代理對象
/**
* 初次使用時使用靜態代碼塊將IP代理庫加載進set中
*/
static {
InputStream in = HttpUtil.class.getClassLoader().getResourceAsStream("IPProxyRepository.txt"); // 加載包含代理IP的文本
// 構建緩沖流對象
InputStreamReader isr = new InputStreamReader(in);
BufferedReader bfr = new BufferedReader(isr);
String line = null;
try {
// 循環讀每一行,添加進map中
while ((line = bfr.readLine()) != null) {
String[] split = line.split(":"); // 以:作為分隔符,即文本中的數據格式應為192.168.1.1:4893
String host = split[0];
int port = Integer.valueOf(split[1]);
IPProxyRepository.put(host, port);
}
Set<String> keys = IPProxyRepository.keySet();
keysArray = keys.toArray(new String[keys.size()]); // keysArray是為了方便生成隨機的代理對象
} catch (IOException e) {
e.printStackTrace();
}
}
之后,在每次構建http客戶端時,都會先到map中看是否有代理IP,有則使用,沒有則不使用代理:
CloseableHttpClient httpClient = null;
HttpHost proxy = null;
if (IPProxyRepository.size() > 0) { // 如果ip代理地址庫不為空,則設置代理
proxy = getRandomProxy();
httpClient = HttpClients.custom().setProxy(proxy).build(); // 創建httpclient對象
} else {
httpClient = HttpClients.custom().build(); // 創建httpclient對象
}
HttpGet request = new HttpGet(url); // 構建htttp get請求
......
隨機代理對象則通過下面的方法生成:
/**
* 隨機返回一個代理對象
*
* @return
*/
public static HttpHost getRandomProxy() {
// 隨機獲取host:port,并構建代理對象
Random random = new Random();
String host = keysArray[random.nextInt(keysArray.length)];
int port = IPProxyRepository.get(host);
HttpHost proxy = new HttpHost(host, port); // 設置http代理
return proxy;
}
這樣,通過上面的設計,基本就實現了隨機IP代理器的功能,當然,其中還有很多可以完善的地方,比如,當使用這個IP代理而請求失敗時,是否可以把這一情況記錄下來,當超過一定次數時,再將其從代理庫中刪除,同時生成日志供開發人員或運維人員參考,這是完全可以實現的,不過我就不做這一步功能了。
網頁下載器就是用來下載網頁中的數據,主要基于下面的接口開發:
/**
* 網頁數據下載
*/
public interface IDownload {
/**
* 下載給定url的網頁數據
* @param url
* @return
*/
public Page download(String url);
}
基于此,在系統中只實現了一個http get的下載器,但是也可以完成我們所需要的功能了:
/**
* 數據下載實現類
*/
public class HttpGetDownloadImpl implements IDownload {
@Override
public Page download(String url) {
Page page = new Page();
String content = HttpUtil.getHttpContent(url); // 獲取網頁數據
page.setUrl(url);
page.setContent(content);
return page;
}
}
網頁解析器就是把下載的網頁中我們感興趣的數據解析出來,并保存到某個對象中,供數據存儲器進一步處理以保存到不同的持久化倉庫中,其基于下面的接口進行開發:
/**
* 網頁數據解析
*/
public interface IParser {
public void parser(Page page);
}
網頁解析器在整個系統的開發中也算是比較重頭戲的一個組件,功能不復雜,主要是代碼比較多,針對不同的商城不同的商品,對應的解析器可能就不一樣了,因此需要針對特別的商城的商品進行開發,因為很顯然,京東用的網頁模板跟蘇寧易購的肯定不一樣,天貓用的跟京東用的也肯定不一樣,所以這個完全是看自己的需要來進行開發了,只是說,在解析器開發的過程當中會發現有部分重復代碼,這時就可以把這些代碼抽象出來開發一個工具類了。
目前在系統中爬取的是京東和蘇寧易購的手機商品數據,因此與就寫了這兩個實現類:
/**
* 解析京東商品的實現類
*/
public class JDHtmlParserImpl implements IParser {
......
}
/**
* 蘇寧易購網頁解析
*/
public class SNHtmlParserImpl implements IParser {
......
}
數據存儲器主要是將網頁解析器解析出來的數據對象保存到不同的,而對于本次爬取的手機商品,數據對象是下面一個Page對象:
/**
* 網頁對象,主要包含網頁內容和商品數據
*/
public class Page {
private String content; // 網頁內容
private String id; // 商品Id
private String source; // 商品來源
private String brand; // 商品品牌
private String title; // 商品標題
private float price; // 商品價格
private int commentCount; // 商品評論數
private String url; // 商品地址
private String imgUrl; // 商品圖片地址
private String params; // 商品規格參數
private List<String> urls = new ArrayList<>(); // 解析列表頁面時用來保存解析的商品url的容器
}
對應的,在MySQL中,表數據結構如下:
-- ----------------------------
-- Table structure for phone
-- ----------------------------
DROP TABLE IF EXISTS `phone`;
CREATE TABLE `phone` (
`id` varchar(30) CHARACTER SET armscii8 NOT NULL COMMENT '商品id',
`source` varchar(30) NOT NULL COMMENT '商品來源,如jd suning gome等',
`brand` varchar(30) DEFAULT NULL COMMENT '手機品牌',
`title` varchar(255) DEFAULT NULL COMMENT '商品頁面的手機標題',
`price` float(10,2) DEFAULT NULL COMMENT '手機價格',
`comment_count` varchar(30) DEFAULT NULL COMMENT '手機評論',
`url` varchar(500) DEFAULT NULL COMMENT '手機詳細信息地址',
`img_url` varchar(500) DEFAULT NULL COMMENT '圖片地址',
`params` text COMMENT '手機參數,json格式存儲',
PRIMARY KEY (`id`,`source`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
而在HBase中的表結構則為如下:
## cf1 存儲 id source price comment brand url
## cf2 存儲 title params imgUrl
create 'phone', 'cf1', 'cf2'
## 在HBase shell中查看創建的表
hbase(main):135:0> desc 'phone'
Table phone is ENABLED
phone
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf1', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK
_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE =>
'65536', REPLICATION_SCOPE => '0'}
{NAME => 'cf2', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK
_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE =>
'65536', REPLICATION_SCOPE => '0'}
2 row(s) in 0.0350 seconds
即在HBase中建立了兩個列族,分別為cf1、cf2,其中cf1用來保存id source price comment brand url字段信息,cf2用來保存title params imgUrl字段信息。
不同的數據存儲用的是不同的實現類,但是其都是基于下面同一個接口開發的:
/**
* 商品數據的存儲
*/
public interface IStore {
public void store(Page page);
}
然后基于此開發了MySQL的存儲實現類、HBase的存儲實現類還有控制臺的輸出實現類,如MySQL的存儲實現類,其實就是簡單的數據插入語句:
/**
* 使用dbc數據庫連接池將數據寫入mysql表中
*/
public class MySQLStoreImpl implements IStore {
private QueryRunner queryRunner = new QueryRunner(DBCPUtil.getDataSource());
@Override
public void store(Page page) {
String sql = "insert into phone(id, source, brand, title, price, comment_count, url, img_url, params) values(?, ?, ?, ?, ?, ?, ?, ?, ?)";
try {
queryRunner.update(sql, page.getId(),
page.getSource(),
page.getBrand(),
page.getTitle(),
page.getPrice(),
page.getCommentCount(),
page.getUrl(),
page.getImgUrl(),
page.getParams());
} catch (SQLException e) {
e.printStackTrace();
}
}
}
而HBase的存儲實現類,則是HBase Java API的常用插入語句代碼:
......
// cf1:price
Put pricePut = new Put(rowKey);
// 必須要做是否為null判斷,否則會有空指針異常
pricePut.addColumn(cf1, "price".getBytes(), page.getPrice() != null ? String.valueOf(page.getPrice()).getBytes() : "".getBytes());
puts.add(pricePut);
// cf1:comment
Put commentPut = new Put(rowKey);
commentPut.addColumn(cf1, "comment".getBytes(), page.getCommentCount() != null ? String.valueOf(page.getCommentCount()).getBytes() : "".getBytes());
puts.add(commentPut);
// cf1:brand
Put brandPut = new Put(rowKey);
brandPut.addColumn(cf1, "brand".getBytes(), page.getBrand() != null ? page.getBrand().getBytes() : "".getBytes());
puts.add(brandPut);
......
當然,至于要將數據存儲在哪個地方,在初始化爬蟲程序時,是可以手動選擇的:
// 3.注入存儲器
iSpider.setStore(new HBaseStoreImpl());
目前還沒有把代碼寫成可以同時存儲在多個地方,按照目前代碼的架構,要實現這一點也比較簡單,修改一下相應代碼就好了。實際上,是可以先把數據保存到MySQL中,然后通過Sqoop導入到HBase中,詳細操作可以參考我寫的Sqoop文章。
仍然需要注意的是,如果確定需要將數據保存到HBase中,請保證你有可用的集群環境,并且需要將如下配置文檔添加到classpath下:
core-site.xml
hbase-site.xml
hdfs-site.xml
對大數據感興趣的同學可以折騰一下這一點,如果之前沒有接觸過的,直接使用MySQL存儲就好了,只需要在初始化爬蟲程序時注入MySQL存儲器即可:
// 3.注入存儲器
iSpider.setStore(new MySQLStoreImpl());
URL調度系統是實現整個爬蟲系統分布式的橋梁與關鍵,正是通過URL調度系統的使用,才使得整個爬蟲系統可以較為高效(Redis作為存儲)隨機地獲取url,并實現整個系統的分布式。
通過架構圖可以看出,所謂的URL倉庫不過是Redis倉庫,即在我們的系統中使用Redis來保存url地址列表,正是這樣,才能保證我們的程序實現分布式,只要保存了url是唯一的,這樣不管我們的爬蟲程序有多少個,最終保存下來的數據都是只有唯一一份的,而不會重復,是通過這樣來實現分布式的。
同時url倉庫中的url地址在獲取時的策略是通過隊列的方式來實現的,待會通過URL調度器的實現即可知道。
另外,在我們的url倉庫中,主要保存了下面的數據:
Redis的數據類型為list。
種子URL是持久化存儲的,一定時間后,由URL定時器通過種子URL獲取URL,并將其注入到我們的爬蟲程序需要使用的高優先級URL隊列中,這樣就可以保存我們的爬蟲程序可以源源不斷地爬取數據而不需要中止程序的執行。
Redis的數據類型為set。
什么是高優先級URL隊列?其實它就是用來保存列表url的。
那么什么是列表url呢?
說白了就是一個列表中含有多個商品,以京東為列,我們打開一個手機列表為例:該地址中包含的不是一個具體商品的url,而是包含了多個我們需要爬取的數據(手機商品)的列表,通過對每個高級url的解析,我們可以獲取到非常多的具體商品url,而具體的商品url,就是低優先url,其會保存到低優先級URL隊列中。
那么以這個系統為例,保存的數據類似如下:
jd.com.higher
--https://list.jd.com/list.html?cat=9987,653,655&page=1
...
suning.com.higher
--https://list.suning.com/0-20006-0.html
...
Redis的數據類型為set。
低優先級URL其實就是具體某個商品的URL,如下面一個手機商品:
通過下載該url的數據,并對其進行解析,就能夠獲取到我們想要的數據。
那么以這個系統為例,保存的數據類似如下:
jd.com.lower
--https://item.jd.com/23545806622.html
...
suning.com.lower
--https://product.suning.com/0000000000/690128156.html
...
所謂url調度器,其實說白了就是url倉庫java代碼的調度策略,不過因為其核心在于調度,所以將其放到URL調度器中來進行說明,目前其調度基于以下接口開發:
/**
* url 倉庫
* 主要功能:
* 向倉庫中添加url(高優先級的列表,低優先級的商品url)
* 從倉庫中獲取url(優先獲取高優先級的url,如果沒有,再獲取低優先級的url)
*
*/
public interface IRepository {
/**
* 獲取url的方法
* 從倉庫中獲取url(優先獲取高優先級的url,如果沒有,再獲取低優先級的url)
* @return
*/
public String poll();
/**
* 向高優先級列表中添加商品列表url
* @param highUrl
*/
public void offerHigher(String highUrl);
/**
* 向低優先級列表中添加商品url
* @param lowUrl
*/
public void offerLower(String lowUrl);
}
其基于Redis作為URL倉庫的實現如下:
/**
* 基于Redis的全網爬蟲,隨機獲取爬蟲url:
*
* Redis中用來保存url的數據結構如下:
* 1.需要爬取的域名集合(存儲數據類型為set,這個需要先在Redis中添加)
* key
* spider.website.domains
* value(set)
* jd.com suning.com gome.com
* key由常量對象SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY 獲得
* 2.各個域名所對應的高低優先url隊列(存儲數據類型為list,這個由爬蟲程序解析種子url后動態添加)
* key
* jd.com.higher
* jd.com.lower
* suning.com.higher
* suning.com.lower
* gome.com.higher
* gome.come.lower
* value(list)
* 相對應需要解析的url列表
* key由隨機的域名 + 常量 SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX或者SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX獲得
* 3.種子url列表
* key
* spider.seed.urls
* value(list)
* 需要爬取的數據的種子url
* key由常量SpiderConstants.SPIDER_SEED_URLS_KEY獲得
*
* 種子url列表中的url會由url調度器定時向高低優先url隊列中
*/
public class RandomRedisRepositoryImpl implements IRepository {
/**
* 構造方法
*/
public RandomRedisRepositoryImpl() {
init();
}
/**
* 初始化方法,初始化時,先將redis中存在的高低優先級url隊列全部刪除
* 否則上一次url隊列中的url沒有消耗完時,再停止啟動跑下一次,就會導致url倉庫中有重復的url
*/
public void init() {
Jedis jedis = JedisUtil.getJedis();
Set<String> domains = jedis.smembers(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY);
String higherUrlKey;
String lowerUrlKey;
for(String domain : domains) {
higherUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX;
lowerUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX;
jedis.del(higherUrlKey, lowerUrlKey);
}
JedisUtil.returnJedis(jedis);
}
/**
* 從隊列中獲取url,目前的策略是:
* 1.先從高優先級url隊列中獲取
* 2.再從低優先級url隊列中獲取
* 對應我們的實際場景,應該是先解析完列表url再解析商品url
* 但是需要注意的是,在分布式多線程的環境下,肯定是不能完全保證的,因為在某個時刻高優先級url隊列中
* 的url消耗完了,但實際上程序還在解析下一個高優先級url,此時,其它線程去獲取高優先級隊列url肯定獲取不到
* 這時就會去獲取低優先級隊列中的url,在實際考慮分析時,這點尤其需要注意
* @return
*/
@Override
public String poll() {
// 從set中隨機獲取一個頂級域名
Jedis jedis = JedisUtil.getJedis();
String randomDomain = jedis.srandmember(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); // jd.com
String key = randomDomain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX; // jd.com.higher
String url = jedis.lpop(key);
if(url == null) { // 如果為null,則從低優先級中獲取
key = randomDomain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX; // jd.com.lower
url = jedis.lpop(key);
}
JedisUtil.returnJedis(jedis);
return url;
}
/**
* 向高優先級url隊列中添加url
* @param highUrl
*/
@Override
public void offerHigher(String highUrl) {
offerUrl(highUrl, SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX);
}
/**
* 向低優先url隊列中添加url
* @param lowUrl
*/
@Override
public void offerLower(String lowUrl) {
offerUrl(lowUrl, SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX);
}
/**
* 添加url的通用方法,通過offerHigher和offerLower抽象而來
* @param url 需要添加的url
* @param urlTypeSuffix url類型后綴.higher或.lower
*/
public void offerUrl(String url, String urlTypeSuffix) {
Jedis jedis = JedisUtil.getJedis();
String domain = SpiderUtil.getTopDomain(url); // 獲取url對應的頂級域名,如jd.com
String key = domain + urlTypeSuffix; // 拼接url隊列的key,如jd.com.higher
jedis.lpush(key, url); // 向url隊列中添加url
JedisUtil.returnJedis(jedis);
}
}
通過代碼分析也是可以知道,其核心就在如何調度url倉庫(Redis)中的url。
一段時間后,高優先級URL隊列和低優先URL隊列中的url都會被消費完,為了讓程序可以繼續爬取數據,同時減少人為的干預,可以預先在Redis中插入種子url,之后定時讓URL定時器從種子url中取出url定存放到高優先級URL隊列中,以此達到程序定時不間斷爬取數據的目的。
url消費完畢后,是否需要循環不斷爬取數據根據個人業務需求而不同,因此這一步不是必需的,只是也提供了這樣的操作。因為事實上,我們需要爬取的數據也是每隔一段時間就會更新的,如果希望我們爬取的數據也跟著定時更新,那么這時定時器就有非常重要的作用了。不過需要注意的是,一旦決定需要循環重復爬取數據,則在設計存儲器實現時需要考慮重復數據的問題,即重復數據應該是更新操作,目前在我設計的存儲器不包括這個功能,有興趣的朋友可以自己實現,只需要在插入數據前判斷數據庫中是否存在該數據即可。
另外需要注意的一點是,URL定時器是一個獨立的進程,需要單獨啟動。
定時器基于Quartz實現,下面是其job的代碼:
/**
* 每天定時從url倉庫中獲取種子url,添加進高優先級列表
*/
public class UrlJob implements Job {
// log4j日志記錄
private Logger logger = LoggerFactory.getLogger(UrlJob.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
/**
* 1.從指定url種子倉庫獲取種子url
* 2.將種子url添加進高優先級列表
*/
Jedis jedis = JedisUtil.getJedis();
Set<String> seedUrls = jedis.smembers(SpiderConstants.SPIDER_SEED_URLS_KEY); // spider.seed.urls Redis數據類型為set,防止重復添加種子url
for(String seedUrl : seedUrls) {
String domain = SpiderUtil.getTopDomain(seedUrl); // 種子url的頂級域名
jedis.sadd(domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX, seedUrl);
logger.info("獲取種子:{}", seedUrl);
}
JedisUtil.returnJedis(jedis);
// System.out.println("Scheduler Job Test...");
}
}
調度器的實現如下:
/**
* url定時調度器,定時向url對應倉庫中存放種子url
*
* 業務規定:每天凌晨1點10分向倉庫中存放種子url
*/
public class UrlJobScheduler {
public UrlJobScheduler() {
init();
}
/**
* 初始化調度器
*/
public void init() {
try {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
// 如果沒有以下start方法的執行,則是不會開啟任務的調度
scheduler.start();
String name = "URL_SCHEDULER_JOB";
String group = "URL_SCHEDULER_JOB_GROUP";
JobDetail jobDetail = new JobDetail(name, group, UrlJob.class);
String cronExpression = "0 10 1 * * ?";
Trigger trigger = new CronTrigger(name, group, cronExpression);
// 調度任務
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
UrlJobScheduler urlJobScheduler = new UrlJobScheduler();
urlJobScheduler.start();
}
/**
* 定時調度任務
* 因為我們每天要定時從指定的倉庫中獲取種子url,并存放到高優先級的url列表中
* 所以是一個不間斷的程序,所以不能停止
*/
private void start() {
while (true) {
}
}
}
監控報警系統的加入主要是為了讓使用者可以主動發現節點宕機,而不是被動地發現,因為實際中爬蟲程序可能是持續不斷運行的,并且我們會在多個節點上部署我們的爬蟲程序,因此很有必要對節點進行監控,并且在節點出現問題時可以及時發現并修正,需要注意的是,監控報警系統是一個獨立的進程,需要單獨啟動。
首先需要先在zookeeper中創建一個/ispider
節點:
[zk: localhost:2181(CONNECTED) 1] create /ispider ispider
Created /ispider
監控報警系統的開發主要依賴于zookeeper實現,監控程序對zookeeper下面的這個節點目錄進行監聽:
[zk: localhost:2181(CONNECTED) 0] ls /ispider
[]
爬蟲程序啟動時會在該節點目錄下注冊一個臨時節點目錄:
[zk: localhost:2181(CONNECTED) 0] ls /ispider
[192.168.43.166]
當節點出現宕機時,該臨時節點目錄就會被zookeeper刪除
[zk: localhost:2181(CONNECTED) 0] ls /ispider
[]
同時因為我們監聽了節點目錄/ispider
,所以當zookeeper刪除其下的節點目錄時(或增加一個節點目錄),zookeeper會給我們的監控程序發送通知,即我們的監控程序會得到回調,這樣便可以在回調程序中執行報警的系統動作,從而完成監控報警的功能。
可以使用zookeeper原生的Java API,我在另外寫的一個RPC框架(底層基于Netty實現遠程通信)中就是使用原生的API,不過顯然代碼會復雜很多,并且本身需要對zookeeper有更多的學習和了解,這樣用起來才會容易一些。
所以為了降低開發的難度,這里使用第三方封裝的API,即curator,來進行zookeeper客戶端程序的開發。
在啟動爬蟲系統時,我們的程序都會啟動一個zookeeper客戶端來向zookeeper來注冊自身的節點信息,主要是ip地址,并在/ispider
節點目錄以創建一個以該爬蟲程序所在的節點IP地址命名的節點,如/ispider/192.168.43.116
,實現的代碼如下:
/**
* 注冊zk
*/
private void registerZK() {
String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";
int baseSleepTimeMs = 1000;
int maxRetries = 3;
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
CuratorFramework curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);
curator.start();
String ip = null;
try {
// 向zk的具體目錄注冊 寫節點 創建節點
ip = InetAddress.getLocalHost().getHostAddress();
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/ispider/" + ip, ip.getBytes());
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
應該注意到的是,我們創建的節點為臨時節點,要想實現監控報警功能,必須要為臨時節點。
首先需要先監聽zookeeper中的一個節點目錄,在我們的系統中,設計是監聽/ispider
這個節點目錄:
public SpiderMonitorTask() {
String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";
int baseSleepTimeMs = 1000;
int maxRetries = 3;
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);
curator.start();
try {
previousNodes = curator.getChildren().usingWatcher(this).forPath("/ispider");
} catch (Exception e) {
e.printStackTrace();
}
}
在上面注冊了zookeeper中的watcher
,也就是接收通知的回調程序,在該程序中,執行我們報警的邏輯:
/**
* 這個方法,當監控的zk對應的目錄一旦有變動,就會被調用
* 得到當前最新的節點狀態,將最新的節點狀態和初始或者上一次的節點狀態作比較,那我們就知道了是由誰引起的節點變化
* @param event
*/
@Override
public void process(WatchedEvent event) {
try {
List<String> currentNodes = curator.getChildren().usingWatcher(this).forPath("/ispider");
// HashSet<String> previousNodesSet = new HashSet<>(previousNodes);
if(currentNodes.size() > previousNodes.size()) { // 最新的節點服務,超過之前的節點服務個數,有新的節點增加進來
for(String node : currentNodes) {
if(!previousNodes.contains(node)) {
// 當前節點就是新增節點
logger.info("----有新的爬蟲節點{}新增進來", node);
}
}
} else if(currentNodes.size() < previousNodes.size()) { // 有節點掛了 發送告警郵件或者短信
for(String node : previousNodes) {
if(!currentNodes.contains(node)) {
// 當前節點掛掉了 得需要發郵件
logger.info("----有爬蟲節點{}掛掉了", node);
MailUtil.sendMail("有爬蟲節點掛掉了,請人工查看爬蟲節點的情況,節點信息為:", node);
}
}
} // 掛掉和新增的數目一模一樣,上面是不包括這種情況的,有興趣的朋友可以直接實現包括這種特殊情況的監控
previousNodes = currentNodes; // 更新上一次的節點列表,成為最新的節點列表
} catch (Exception e) {
e.printStackTrace();
}
// 在原生的API需要再做一次監控,因為每一次監控只會生效一次,所以當上面發現變化后,需要再監聽一次,這樣下一次才能監聽到
// 但是在使用curator的API時則不需要這樣做
}
當然,判斷節點是否掛掉,上面的邏輯還是存在一定的問題的,按照上面的邏輯,假如某一時刻新增節點和刪除節點事件同時發生,那么其就不能判斷出來,所以如果需要更精準的話,可以將上面的程序代碼修改一下。
使用模板代碼就可以了,不過需要注意的是,在使用時,發件人的信息請使用自己的郵箱。
下面是爬蟲節點掛掉時接收到的郵件:
實際上,如果購買了短信服務,那么通過短信API也可以向我們的手機發送短信。
因為前面在介紹這個系統的時候也提到了,我只寫了京東和蘇寧易購的網頁解析器,所以接下來也就是爬取其全網的手機商品數據。
需要確保Redis、Zookeeper服務可用,另外如果需要使用HBase來存儲數據,需要確保Hadoop集群中的HBase可用,并且相關配置文件已經加入到爬蟲程序的classpath中。
還有一點需要注意的是,URL定時器和監控報警系統是作為單獨的進程來運行的,并且也是可選的。
進行了兩次爬取,分別嘗試將數據保存到MySQL和HBase中,給出如下數據情況。
mysql> select count(*) from phone;
+----------+
| count(*) |
+----------+
| 12052 |
+----------+
1 row in set
mysql> select count(*) from phone where source='jd.com';
+----------+
| count(*) |
+----------+
| 9578 |
+----------+
1 row in set
mysql> select count(*) from phone where source='suning
.com';
+----------+
| count(*) |
+----------+
| 2474 |
+----------+
1 row in set
在可視化工具中查看數據情況:
hbase(main):225:0* count 'phone'
Current count: 1000, row: 11155386088_jd.com
Current count: 2000, row: 136191393_suning.com
Current count: 3000, row: 16893837301_jd.com
Current count: 4000, row: 19036619855_jd.com
Current count: 5000, row: 1983786945_jd.com
Current count: 6000, row: 1997392141_jd.com
Current count: 7000, row: 21798495372_jd.com
Current count: 8000, row: 24154264902_jd.com
Current count: 9000, row: 25687565618_jd.com
Current count: 10000, row: 26458674797_jd.com
Current count: 11000, row: 617169906_suning.com
Current count: 12000, row: 769705049_suning.com
12348 row(s) in 1.5720 seconds
=> 12348
在HDFS中查看數據情況:
京東手機的列表大概有160多頁,每個列表有60個商品數據,所以總量在9600左右,我們的數據基本是符合的,后面通過日志分析其實可以知道,一般丟失的數據為連接超時導致的,所以在選取爬蟲的環境時,更建議在網絡環境好的主機上進行,同時如果可以有IP代理地址庫就更好了,另外對于連接超時的情況,其實是可以進一步在我們的程序中加以控制,一旦出現爬取數據失敗的url,可以將其加入到重試url隊列中,目前這一點功能我是沒有做,有興趣的同學可以試一下。
再來看看蘇寧的,其有100頁左右的手機列表,每頁也是60個商品數據,所以總量在6000左右。但可以看到,我們的數據卻只有3000這樣的數量級(缺少的依然是頻繁爬取造成的連接失敗問題),這是為什么呢?
這是因為,打開蘇寧的某個列表頁面后,其是先加載30個商品,當鼠標向下滑動時,才會通過另外的API去加載其它的30個商品數據,每一個列表頁面都是如此,所以,實際上,我們是缺少了一半的商品數據沒有爬取。知道這個原因之后,實現也不難,但是因為時間關系,我就沒有做了,有興趣的朋友折騰一下吧。
在我們的爬蟲系統中,每個關鍵的地方,如網頁下載、數據解析等都是有打logger的,所以通過日志,可以大概分析出相關的時間參數。
2018-04-01 21:26:03 [pool-1-thread-1] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網頁:https://list.jd.com/list.html?cat=9987,653,655&page=1,消耗時長:590 ms,代理信息:null:null
2018-04-01 21:26:03 [pool-1-thread-1] [cn.xpleaf.spider.core.parser.Impl.JDHtmlParserImpl] [INFO] - 解析列表頁面:https://list.jd.com/list.html?cat=9987,653,655&page=1, 消耗時長:46ms
2018-04-01 21:26:03 [pool-1-thread-3] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析列表頁面:https://list.suning.com/0-20006-0.html, 消耗時長:49ms
2018-04-01 21:26:04 [pool-1-thread-5] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網頁:https://item.jd.com/6737464.html,消耗時長:219 ms,代理信息:null:null
2018-04-01 21:26:04 [pool-1-thread-2] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網頁:https://list.jd.com/list.html?cat=9987,653,655&page=2&sort=sort_rank_asc&trans=1&JL=6_0_0,消耗時長:276 ms,代理信息:null:null
2018-04-01 21:26:04 [pool-1-thread-4] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網頁:https://list.suning.com/0-20006-99.html,消耗時長:300 ms,代理信息:null:null
2018-04-01 21:26:04 [pool-1-thread-4] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析列表頁面:https://list.suning.com/0-20006-99.html, 消耗時長:4ms
......
2018-04-01 21:27:49 [pool-1-thread-3] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網頁:https://club.jd.com/comment/productCommentSummaries.action?referenceIds=23934388891,消耗時長:176 ms,代理信息:null:null
2018-04-01 21:27:49 [pool-1-thread-3] [cn.xpleaf.spider.core.parser.Impl.JDHtmlParserImpl] [INFO] - 解析商品頁面:https://item.jd.com/23934388891.html, 消耗時長:413ms
2018-04-01 21:27:49 [pool-1-thread-2] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網頁:https://review.suning.com/ajax/review_satisfy/general-00000000010017793337-0070079092-----satisfy.htm,消耗時長:308 ms,代理信息:null:null
2018-04-01 21:27:49 [pool-1-thread-2] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析商品頁面:https://product.suning.com/0070079092/10017793337.html, 消耗時長:588ms
......
平均下來,下載一個商品網頁數據的時間在200~500毫秒不等,當然這個還需要取決于當時的網絡情況。
另外,如果想要真正計算爬取一個商品的數據,可以通過日志下面的數據來計算:
在我的主機上(CPU:E5 10核心,內存:32GB,分別開啟1個虛擬機和3個虛擬機),情況如下:
節點數 | 每節點線程數 | 商品數量 | 時間 |
---|---|---|---|
1 | 5 | 京東+蘇寧易購近13000個商品數據 | 141分鐘 |
3 | 5 | 京東+蘇寧易購近13000個商品數據 | 65分鐘 |
可以看到,當使用3個節點時,時間并不會相應地縮小為原來的1/3,這是因為此時影響爬蟲性能的問題主要是網絡問題,節點數量多,線程數量大,網絡請求也多,但是帶寬一定,并且在沒有使用代理的情況,請求頻繁,連接失敗的情況也會增多,對時間也有一定的影響,如果使用隨機代理庫,情況將會好很多。
但可以肯定的是,在橫向擴展增加爬蟲節點之后,確實可以大大縮小我們的爬蟲時間,這也是分布式爬蟲系統的好處。
在整個爬蟲系統的設計中,主要使用下面的策略來達到反反爬蟲的目的:
對于以上關于設計、實現分布式爬蟲系統的實踐解析,大家是不是覺得非常有幫助。如果需要了解更多內容,請繼續關注我們的行業資訊,相信你會喜歡上這些內容的。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。