您好,登錄后才能下訂單哦!
概述
在進行網站爬取數據的時候,會發現很多網站都進行了反爬蟲的處理,如JS加密,Ajax加密,反Debug等方法,通過請求獲取數據和頁面展示的內容完全不同,這時候就用到Selenium技術,來模擬瀏覽器的操作,然后獲取數據。本文以一個簡單的小例子,簡述Python搭配Tkinter和Selenium進行瀏覽器的模擬操作,僅供學習分享使用,如有不足之處,還請指正。
什么是Selenium?
Selenium是一個用于Web應用程序測試的工具,Selenium測試直接運行在瀏覽器中,就像真正的用戶在操作一樣。支持的瀏覽器包括IE(7, 8, 9, 10, 11),Mozilla Firefox,Safari,Google Chrome,Opera等。Selenium支持多種操作系統,如Windows、Linux、IOS等,如果需要支持Android,則需要特殊的selenium,本文主要以IE11瀏覽器為例。
安裝Selenium
通過pip install selenium 進行安裝即可,如果速度慢,則可以使用國內的鏡像進行安裝。
涉及知識點
程序雖小,除了需要掌握的Html ,JavaScript,CSS等基礎知識外,本例涉及的Python相關知識點還是蠻多的,具體如下:
Selenium進行元素定位,主要有ID,Name,ClassName,Css Selector,Partial LinkText,LinkText,XPath,TagName等8種方式。
Selenium獲取單一元素(如:find_element_by_xpath)和獲取元素數組(如:find_elements_by_xpath)兩種方式。
Selenium元素定位后,可以給元素進行賦值和取值,或者進行相應的事件操作(如:click)。
為了防止前臺頁面卡主,本文用到了線程進行后臺操作,如果要定義一個新的線程,只需要定義一個類并繼承threading.Thread,然后重寫run方法即可。
在使用線程的過程中,為了保證線程的同步,本例用到了線程鎖,如:threading.Lock()。
本例將Selenium執行的過程信息,保存到對列中,并通過線程輸出到頁面顯示。queue默認先進先出方式。
對列通過put進行壓棧,通過get進行出棧。通過qsize()用于獲取當前對列元素個數。
為了保存Selenium執行過程中的日志,本例用到了日志模塊,為Pyhton自帶的模塊,不需要額外安裝。
Python的日志共六種級別,分別是:NOTSET,DEBUG,INFO,WARN,ERROR,FATAL,CRITICAL。
示例效果圖
本例主要針對某一配置好的商品ID進行輪詢,監控是否有貨,有貨則加入購物車,無貨則繼續輪詢,如下圖所示:
核心代碼
本例最核心的代碼,就是利用Selenium進行網站的模擬操作,如下所示:
class Smoking: """定義Smoking類""" # 瀏覽器驅動 __driver: webdriver = None # 配置幫助類 __cfg_info: dict = {} # 日志幫助類 __log_helper: LogHelper = None # 主程序目錄 __work_path: str = '' # 是否正在運行 __running: bool = False # 無貨 __no_stock = 'Currently Out of Stock' # 線程等待秒數 __wait_sec = 2 def __init__(self, work_path, cfg_info, log_helper: LogHelper): """初始化""" self.__cfg_info = cfg_info self.__log_helper = log_helper self.__work_path = work_path self.__wait_sec = int(cfg_info['wait_sec']) # 如果小于2,則等于2 self.__wait_sec = (2 if self.__wait_sec < 2 else self.__wait_sec) def checkIsExistsById(self, id): """通過ID判斷是否存在""" try: i = 0 while self.__running and i < 3: if len(self.__driver.find_elements_by_id(id)) > 0: break else: time.sleep(self.__wait_sec) i = i + 1 return len(self.__driver.find_elements_by_id(id)) > 0 except BaseException as e: return False def checkIsExistsByName(self, name): """通過名稱判斷是否存在""" try: i = 0 while self.__running and i < 3: if len(self.__driver.find_elements_by_name(name)) > 0: break else: time.sleep(self.__wait_sec) i = i + 1 return len(self.__driver.find_elements_by_name(name)) > 0 except BaseException as e: return False def checkIsExistsByPath(self, path): """通過xpath判斷是否存在""" try: i = 0 while self.__running and i < 3: if len(self.__driver.find_elements_by_xpath(path)) > 0: break else: time.sleep(self.__wait_sec) i = i + 1 return len(self.__driver.find_elements_by_xpath(path)) > 0 except BaseException as e: return False def checkIsExistsByClass(self, cls): """通過class名稱判斷是否存在""" try: i = 0 while self.__running and i < 3: if len(self.__driver.find_elements_by_class_name(cls)) > 0: break else: time.sleep(self.__wait_sec) i = i + 1 return len(self.__driver.find_elements_by_class_name(cls)) > 0 except BaseException as e: return False def checkIsExistsByLinkText(self, link_text): """判斷LinkText是否存在""" try: i = 0 while self.__running and i < 3: if len(self.__driver.find_elements_by_link_text(link_text)) > 0: break else: time.sleep(self.__wait_sec) i = i + 1 return len(self.__driver.find_elements_by_link_text(link_text)) > 0 except BaseException as e: return False def checkIsExistsByPartialLinkText(self, link_text): """判斷包含LinkText是否存在""" try: i = 0 while self.__running and i < 3: if len(self.__driver.find_elements_by_partial_link_text(link_text)) > 0: break else: time.sleep(self.__wait_sec) i = i + 1 return len(self.__driver.find_elements_by_partial_link_text(link_text)) > 0 except BaseException as e: return False # def waiting(self, *locator): # """等待完成""" # # self.__driver.switch_to.window(self.__driver.window_handles[1]) # Wait(self.__driver, 60).until(EC.visibility_of_element_located(locator)) def login(self, username, password): """登錄""" # 5. 點擊鏈接跳轉到登錄頁面 self.__driver.find_element_by_link_text('賬戶登錄').click() # 6. 輸入賬號密碼 # 判斷是否加載完成 # self.waiting((By.ID, "email")) if self.checkIsExistsById('email'): self.__driver.find_element_by_id('email').send_keys(username) self.__driver.find_element_by_id('password').send_keys(password) # 7. 點擊登錄按鈕 self.__driver.find_element_by_id('sign-in').click() def working(self, item_id): """工作狀態""" while self.__running: try: # 正常獲取信息 if self.checkIsExistsById('string'): self.__driver.find_element_by_id('string').clear() self.__driver.find_element_by_id('string').send_keys(item_id) self.__driver.find_element_by_id('string').send_keys(Keys.ENTER) # 判斷是否查詢到商品 xpath = "http://div[@class='specialty-header search']/div[@class='specialty-description']/div[" \ "@class='gt-450']/span[2] " if self.checkIsExistsByPath(xpath): count = int(self.__driver.find_element_by_xpath(xpath).text) if count < 1: time.sleep(self.__wait_sec) self.__log_helper.put('沒有查詢到item id =' + item_id + '對應的信息') continue else: time.sleep(self.__wait_sec) self.__log_helper.put('沒有查詢到item id2 =' + item_id + '對應的信息') continue # 判斷當前庫存是否有貨 xpath2 = "http://div[@class='product-list']/div[@class='product']/div[@class='price-and-detail']/div[" \ "@class='price']/span[@class='noStock'] " if self.checkIsExistsByPath(xpath2): txt = self.__driver.find_element_by_xpath(xpath2).text if txt == self.__no_stock: # 當前無貨 time.sleep(self.__wait_sec) self.__log_helper.put('查詢一次' + item_id + ',無貨') continue # 鏈接path2 xpath3 = "http://div[@class='product-list']/div[@class='product']/div[@class='imgDiv']/a" # 判斷是否加載完畢 # self.waiting((By.CLASS_NAME, "imgDiv")) if self.checkIsExistsByPath(xpath3): self.__driver.find_element_by_xpath(xpath3).click() time.sleep(self.__wait_sec) # 加入購物車 if self.checkIsExistsByClass('add-to-cart'): self.__driver.find_element_by_class_name('add-to-cart').click() self.__log_helper.put('加入購物車成功,商品item-id:' + item_id) break else: self.__log_helper.put('未找到加入購物車按鈕') else: self.__log_helper.put('沒有查詢到,可能是商品編碼不對,或者已下架') except BaseException as e: self.__log_helper.put(e) def startRun(self): """運行起來""" try: self.__running = True url: str = self.__cfg_info['url'] username = self.__cfg_info['username'] password = self.__cfg_info['password'] item_id = self.__cfg_info['item_id'] if url is None or len(url) == 0 or username is None or len(username) == 0 or password is None or len( password) == 0 or item_id is None or len(item_id) == 0: self.__log_helper.put('配置信息不全,請檢查config.cfg文件是否為空,然后再重啟') return if self.__driver is None: options = webdriver.IeOptions() options.add_argument('encoding=UTF-8') options.add_argument('Accept= text / css, * / *') options.add_argument('Accept - Language= zh - Hans - CN, zh - Hans;q = 0.5') options.add_argument('Accept - Encoding= gzip, deflate') options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko') # 2. 定義瀏覽器驅動對象 self.__driver = webdriver.Ie(executable_path=self.__work_path + r'\IEDriverServer.exe', options=options) self.run(url, username, password, item_id) except BaseException as e: self.__log_helper.put('運行過程中出錯,請重新打開再試') def run(self, url, username, password, item_id): """運行起來""" # 3. 訪問網站 self.__driver.get(url) # 4. 最大化窗口 self.__driver.maximize_window() if self.checkIsExistsByLinkText('賬戶登錄'): # 判斷是否登錄:未登錄 self.login(username, password) if self.checkIsExistsByPartialLinkText('歡迎回來'): # 判斷是否登錄:已登錄 self.__log_helper.put('登錄成功,下一步開始工作了') self.working(item_id) else: self.__log_helper.put('登錄失敗,請設置賬號密碼') def stop(self): """停止""" try: self.__running = False # 如果驅動不為空,則關閉 self.close_browser_nicely(self.__driver) if self.__driver is not None: self.__driver.quit() # 關閉后切要為None,否則啟動報錯 self.__driver = None except BaseException as e: print('Stop Failure') finally: self.__driver = None def close_browser_nicely(self, browser): try: browser.execute_script("window.onunload=null; window.onbeforeunload=null") except Exception as err: print("Fail to execute_script:'window.onunload=null; window.onbeforeunload=null'") socket.setdefaulttimeout(10) try: browser.quit() print("Close browser and firefox by calling quit()") except Exception as err: print("Fail to quit from browser, error-type:%s, reason:%s" % (type(err), str(err))) socket.setdefaulttimeout(30)
其他輔助類
日志類(LogHelper),代碼如下:
class LogHelper: """日志幫助類""" __queue: queue.Queue = None # 隊列 __logging: logging.Logger = None # 日志 __running: bool = False # 是否記錄日志 def __init__(self, log_path): """初始化類""" self.__queue = queue.Queue(1000) self.init_log(log_path) def put(self, value): """添加數據""" # 記錄日志 self.__logging.info(value) # 添加到隊列 if self.__queue.qsize() < self.__queue.maxsize: self.__queue.put(value) def get(self): """獲取數據""" if self.__queue.qsize() > 0: try: return self.__queue.get(block=False) except BaseException as e: return None else: return None def init_log(self, log_path): """初始化日志""" self.__logging = logging.getLogger() self.__logging.setLevel(logging.INFO) # 日志 rq = time.strftime('%Y%m%d%H%M', time.localtime(time.time())) log_name = log_path + rq + '.log' logfile = log_name # if not os.path.exists(logfile): # # 創建空文件 # open(logfile, mode='r') fh = logging.FileHandler(logfile, mode='a', encoding='UTF-8') fh.setLevel(logging.DEBUG) # 輸出到file的log等級的開關 # 第三步,定義handler的輸出格式 formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s") fh.setFormatter(formatter) # 第四步,將logger添加到handler里面 self.__logging.addHandler(fh) def get_running(self): # 獲取當前記錄日志的狀態 return self.__running def set_running(self, v: bool): # 設置當前記錄日志的狀態 self.__running = v
配置類(ConfigHelper)
class ConfigHelper: """初始化數據類""" __config_dir = None __dic_cfg = {} def __init__(self, config_dir): """初始化""" self.__config_dir = config_dir def ReadConfigInfo(self): """得到配置項""" parser = ConfigParser() parser.read(self.__config_dir + r"\config.cfg") section = parser.sections()[0] items = parser.items(section) self.__dic_cfg.clear() for item in items: self.__dic_cfg.__setitem__(item[0], item[1]) def getConfigInfo(self): """獲取配置信息""" if len(self.__dic_cfg) == 0: self.ReadConfigInfo() return self.__dic_cfg
線程類(MyThread)
class MyThread(threading.Thread): """后臺監控線程""" def __init__(self, tid, name, smoking: Smoking, log_helper: LogHelper): """線程初始化""" threading.Thread.__init__(self) self.threadID = tid self.name = name self.smoking = smoking self.log_helper = log_helper def run(self): print("開啟線程: " + self.name) self.log_helper.put("開啟線程: " + self.name) # 獲取鎖,用于線程同步 # lock = threading.Lock() # lock.acquire() self.smoking.startRun() # 釋放鎖,開啟下一個線程 # lock.release() print("結束線程: " + self.name) self.log_helper.put("結束線程: " + self.name)
備注
俠客行 [唐:李白]趙客縵胡纓,吳鉤霜雪明。銀鞍照白馬,颯沓如流星。
十步殺一人,千里不留行。事了拂衣去,深藏身與名。
閑過信陵飲,脫劍膝前橫。將炙啖朱亥,持觴勸侯嬴。
三杯吐然諾,五岳倒為輕。眼花耳熱后,意氣素霓生。
救趙揮金槌,邯鄲先震驚。千秋二壯士,烜赫大梁城。
縱死俠骨香,不慚世上英。誰能書閣下,白首太玄經。
到此這篇關于Python使用Selenium模擬瀏覽器自動操作的文章就介紹到這了,更多相關Python模擬瀏覽器自動操作內容請搜索億速云以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持億速云!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。