亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

python對于requests的封裝方法詳解

發布時間:2020-10-06 18:31:19 來源:腳本之家 閱讀:202 作者:可昌 欄目:開發技術

由于requests是http類接口的核心,因此封裝前考慮問題比較多:

1. 對多種接口類型的支持;

2. 連接異常時能夠重連;

3. 并發處理的選擇;

4. 使用方便,容易維護;

當前并未全部實現,后期會不斷完善。重點提一下并發處理的選擇:python的并發處理機制由于存在GIL的原因,實現起來并不是很理想,綜合考慮多進程、多線程、協程,在不考慮大并發性能測試的前提下使用了多線程-線程池的形式實現。使用的是

concurrent.futures模塊。當前僅方便支持webservice接口。

# -*- coding:utf-8 -*-
 
import requests
from concurrent.futures import ThreadPoolExecutor
from Tools.Config import Config # 配置文件讀取
from Tools.Log import Log # 日志管理
from Tools.tools import decoLOG # 日志裝飾
 
'''
  功能:   Requests類
  使用方法: 
  作者:   郭可昌
  作成時間: 20180224
  更新內容:
  更新時間:
'''
class Requests(object):
  def __init__(self):
    self.session = requests.session()
    self.header = {}
    # URL默認來源于配置文件,方便不同測試環境的切換,也可以動態設定
    self.URL = Config().getURL()
    # 默認60s,可以動態設定
    self.timeout = 60
    #http連接異常的場合,重新連接的次數,默認為3,可以動態設定
    self.iRetryNum = 3
 
    self.errorMsg = ""
    # 內容 = {用例編號:響應數據}
    self.responses = {}
    # 內容 = {用例編號:異常信息}
    self.resErr={}
 
 
  # 原始post使用保留
  # bodyData: request's data
  @decoLOG
  def post(self, bodyData):
    response = None
    self.errorMsg = ""
 
    try:
      response = self.session.post(self.URL, data=bodyData.encode('utf-8'), headers=self.header, timeout=self.timeout)
      response.raise_for_status()
    except Exception as e:
      self.errorMsg = str(e)
      Log().logger.error("HTTP請求異常,異常信息:%s" % self.errorMsg)
    return response
 
 
  # 復數請求并發處理,采用線程池的形式,用例數>線程池的容量:線程池的容量為并發數,否則,用例數為并發數
  # dicDatas: {用例編號:用例數據}
  @decoLOG
  def req_all(self, dicDatas, iThreadNum=5):
 
    if len(dict(dicDatas)) < 1:
      Log().logger.error("沒有測試對象,請確認后再嘗試。。。")
      return self.responses.clear()
 
    # 請求用例集合轉換(用例編號,用例數據)
    seed = [i for i in dicDatas.items()]
    self.responses.clear()
 
    # 線程池并發執行,iThreadNum為并發數
    with ThreadPoolExecutor(iThreadNum) as executor:
      executor.map(self.req_single,seed)
 
    # 返回所有請求的響應信息({用例編號:響應數據}),http連接異常:對應None
    return self.responses
 
  # 用于單用例提交,http連接失敗可以重新連接,最大重新連接數可以動態設定
  def req_single(self, listData, reqType="post", iLoop=1):
    response = None
    # 如果達到最大重連次數,連接后提交結束
    if iLoop == self.iRetryNum:
      if reqType == "post":
        try:
          response = requests.post(self.URL, data=listData[1].encode('utf-8'), headers=self.header,
                       timeout=self.timeout)
          response.raise_for_status()
        except Exception as e:
          # 異常信息保存只在最大連接次數時進行,未達到最大連接次數,異常信息為空
          self.resErr[listData[0]] = str(e)
          Log().logger.error("HTTP請求異常,異常信息:%s【%d】" % (str(e), iLoop))
 
        self.responses[listData[0]] = response
      else:
        # for future: other request method expand
        pass
    # 未達到最大連接數,如果出現異常,則重新連接嘗試
    else:
      if reqType == "post":
        try:
          response = requests.post(self.URL, data=listData[1].encode('utf-8'), headers=self.header,
                       timeout=self.timeout)
          response.raise_for_status()
        except Exception as e:
          Log().logger.error("HTTP請求異常,異常信息:%s【%d】" % (str(e), iLoop))
          # 重連次數遞增
          iLoop += 1
          # 進行重新連接
          self.req_single(listData, reqType, iLoop)
          # 當前連接終止
          return None
        self.responses[listData[0]] = response
      else:
        # for future: other request method expand
        pass
 
  # 設定SoapAction, 快捷完成webservice接口header設定
  def setSoapAction(self, soapAction):
    self.header["SOAPAction"] = soapAction
    self.header["Content-Type"] = "text/xml;charset=UTF-8"
    self.header["Connection"] = "Keep-Alive"
    self.header["User-Agent"] = "InterfaceAutoTest-run"
 

以上這篇python對于requests的封裝方法詳解就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持億速云。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

穆棱市| 山阴县| 苏州市| 湾仔区| 清徐县| 那曲县| 云和县| 昆山市| 南部县| 永吉县| 容城县| 上杭县| 德令哈市| 彭山县| 关岭| 佳木斯市| 定襄县| 玛沁县| 霍城县| 海门市| 万载县| 北碚区| 普安县| 兴宁市| 常山县| 汪清县| 文成县| 逊克县| 武川县| 江北区| 赤水市| 静乐县| 来宾市| 南京市| 海晏县| 庐江县| 盐源县| 通渭县| 呼和浩特市| 东至县| 阜宁县|