亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python中aiohttp怎么使用

發布時間:2023-05-11 10:11:47 來源:億速云 閱讀:277 作者:zzz 欄目:編程語言

本篇內容介紹了“Python中aiohttp怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

1.定義

aiohttp 是一個基于 asyncio 的異步 HTTP 網絡模塊,它既提供了服務端,又提供了客戶端

2.基本使用

import aiohttp
import asyncio


async def fetch(session, url):
    # 聲明一個支持異步的上下文管理器
    async with session.get(url) as response:
        # response.text()是coroutine對象 需要加await
        return await response.text(), response.status


async def main():
    # 聲明一個支持異步的上下文管理器
    async with aiohttp.ClientSession() as session:
        html, status = await fetch(session, 'https://cuiqingcai.com')
        print(f'html: {html[:100]}...')
        print(f'status: {status}')


if __name__ == '__main__':
    #  Python 3.7 及以后,不需要顯式聲明事件循環,可以使用 asyncio.run(main())來代替最后的啟動操作
    asyncio.get_event_loop().run_until_complete(main())

3.請求類型

session.post('http://httpbin.org/post', data=b'data')
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

4.相應字段

print('status:', response.status) # 狀態碼
print('headers:', response.headers)# 響應頭
print('body:', await response.text())# 響應體
print('bytes:', await response.read())# 響應體二進制內容
print('json:', await response.json())# 響應體json數據

5.超時設置

import aiohttp
import asyncio
async def main():
   #設置 1 秒的超時 
   timeout = aiohttp.ClientTimeout(total=1)
   async with aiohttp.ClientSession(timeout=timeout) as session:
       async with session.get('https://httpbin.org/get') as response:
           print('status:', response.status)
if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

6.并發限制

import asyncio
import aiohttp
# 聲明最大并發量為5
CONCURRENCY = 5
semaphore = asyncio.Semaphore(CONCURRENCY)
URL = 'https://www.baidu.com'

session = None
async def scrape_api():
   async with semaphore:
       print('scraping', URL)
       async with session.get(URL) as response:
           await asyncio.sleep(1)
           return await response.text()
    
async def main():
   global session
   session = aiohttp.ClientSession()
   scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(10000)]
   await asyncio.gather(*scrape_index_tasks)
if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

7.實際應用

import asyncio
import aiohttp
import logging
import json
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s: %(message)s')
INDEX_URL = 'https://dynamic5.scrape.center/api/book/?limit=18&offset={offset}'
DETAIL_URL = 'https://dynamic5.scrape.center/api/book/{id}'
PAGE_SIZE = 18
PAGE_NUMBER = 100
CONCURRENCY = 5

semaphore = asyncio.Semaphore(CONCURRENCY)
session = None

async def scrape_api(url):
   async with semaphore:
       try:
           logging.info('scraping %s', url)
           async with session.get(url) as response:
               return await response.json()
       except aiohttp.ClientError:
           logging.error('error occurred while scraping %s', url, exc_info=True)

async def scrape_index(page):
   url = INDEX_URL.format(offset=PAGE_SIZE * (page - 1))
   return await scrape_api(url)

async def main():
   global session
   session = aiohttp.ClientSession()
   scrape_index_tasks = [asyncio.ensure_future(scrape_index(page)) for page in range(1, PAGE_NUMBER + 1)]
   results = await asyncio.gather(*scrape_index_tasks)
   logging.info('results %s', json.dumps(results, ensure_ascii=False, indent=2))
   

if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

“Python中aiohttp怎么使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

临邑县| 天等县| 鹤壁市| 太谷县| 谢通门县| 盐津县| 波密县| 蒲江县| 尼玛县| 年辖:市辖区| 白山市| 台中县| 平南县| 固镇县| 墨玉县| 黄冈市| 黑龙江省| 壤塘县| 文成县| 米易县| 阳城县| 临湘市| 邛崃市| 民乐县| 读书| 忻州市| 仙居县| 辉县市| 上饶市| 龙口市| 长汀县| 万年县| 孙吴县| 阳高县| 綦江县| 万安县| 阜平县| 独山县| 浙江省| 房山区| 芒康县|