亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

python3多進程和協程處理MySQL數據講義

發布時間:2020-04-24 17:39:08 來源:億速云 閱讀:308 作者:三月 欄目:MySQL數據庫

下文內容主要給大家帶來python3多進程和協程處理MySQL數據講義,這里所講到的知識,與書籍略有不同,都是億速云專業技術人員在與用戶接觸過程中,總結出來的,具有一定的經驗分享價值,希望給廣大讀者帶來幫助。

python3的多進程 + 協程處理MySQL的數據,主要邏輯是拉取MySQL的數據,然后使用flashtext匹配關鍵字,在存回MySQL,代碼如下(async_mysql.py):

python3多進程和協程處理MySQL數據講義

import time
import asyncio
import random
from concurrent.futures import ProcessPoolExecutor as Pool

import aiomysql
from flashtext import KeywordProcessor
import click

class AttrDict(dict):
    """可以用"."獲取屬性,沒有該屬性時返回None的字典"""
    def __getattr__(self, name):
        try:
            return self[name]
        except KeyError:
            return None

    def __setattr__(self, name, value):
        self[name] = value

class AttrDictCursor(aiomysql.DictCursor):
    """繼承aiomysql的字典cursor"""
    dict_type = AttrDict

class MultiProcessMysql(object):
    """用多進程和協程處理MySQL數據"""

    def __init__(self, workers=2, pool=10, start=0, end=2000):
        """第一段的參數需要跟隨需求變動"""
        self.host = "192.168.0.34"
        self.port = 3306
        self.user = "root"
        self.password = "root"
        self.db = "mydb"
        self.origin_table = "judgment_main_etl"  # main
        self.dest_table = "laws_finance1"
        self.s_sql = f"select uuid, court_idea, judge_result, reason, plt_claim, dft_rep, crs_exm from {self.origin_table} where %s<=id and id<%s;"
        self.i_sql = f"insert into {self.dest_table} (uuid, title, reason, keyword) values (%s, %s, %s, %s)"

        self.pool = pool    # 協程數和MySQL連接數
        self.aionum = self.pool
        self.step = 2000  # 一次性從MySQL拉取的行數
        self.workers = workers  # 進程數
        self.start = start  # MySQL開始的行數
        self.end = end  # MySQL結束的行數

        self.keyword = ['非法經營支付業務', '網絡洗錢', '資金池', '支付牌照', '清潔算', '網絡支付', '網上支付', '移動支付', '聚合支付', '保本保息', '擔保交易', '供應鏈金融', '網貸', '網絡借貸', '網絡投資', '虛假標的', '自融', '資金池', '關聯交易', '龐氏騙局', '網絡金融理財', '線上投資理財', '互聯網私募', '互聯網股權', '非法集資', '合同欺詐', '眾籌投資', '股權轉讓', '互聯網債權轉讓', '資本自融', '投資騙局', '洗錢', '非法集資', '網絡傳銷', '虛擬幣泡沫', '網絡互助金融', '金融欺詐', '網上銀行', '信用卡盜刷', '網絡釣魚', '信用卡信息竊取', '網上洗錢', '洗錢詐騙', '數字簽名更改', '支付命令竊取', '金融詐騙', '引誘投資', '隱瞞項目信息', '風險披露', '夸大收益', '詐騙保險金', '非法經營保險業務', '侵占客戶資金', '征信報告竊取', '金融詐騙', '破壞金融管理']
        self.kp = KeywordProcessor()    # flashtext是一個文本匹配包,在關鍵詞數量大時速度遠大于re
        self.kp.add_keywords_from_list(self.keyword)

    async def createMysqlPool(self, loop):
        """每個進程要有獨立的pool,所以不綁定self"""
        pool = await aiomysql.create_pool(
            loop=loop, host=self.host, port=self.port, user=self.user,
            password=self.password, db=self.db, maxsize=self.pool,
            charset='utf8', cursorclass=AttrDictCursor
        )
        return pool

    def cutRange(self, start, end, times):
        """將數據區間分段"""
        partition = (end - start) // times
        ranges = []
        tmp_end = start
        while tmp_end < end:
            tmp_end += partition
            # 剩下的不足以再分
            if (end - tmp_end) < partition:
                tmp_end = end
            ranges.append((start, tmp_end))
            start = tmp_end
        return ranges

    async def findKeyword(self, db, start, end):
        """從MySQL數據中匹配出關鍵字"""
        # 隨機休息一定時間,防止數據同時到達,同時處理, 應該是一部分等待,一部分處理
        await asyncio.sleep(random.random() * self.workers * 2)
        print("coroutine start")
        async with db.acquire() as conn:
            async with conn.cursor() as cur:
                while start < end:
                    tmp_end = start + self.step
                    if tmp_end > end:
                        tmp_end = end
                    print("aio start: %s, end: %s" % (start, tmp_end))
                    # <=id 和 id<
                    await cur.execute(self.s_sql, (start, tmp_end))
                    datas = await cur.fetchall()
                    uuids = []
                    for data in datas:
                        if data:
                            for key in list(data.keys()):
                                if not data[key]:
                                    data.pop(key)
                            keyword = self.kp.extract_keywords(
                                " ".join(data.values()))
                            if keyword:
                                keyword = ' '.join(set(keyword))   # 對關鍵字去重
                                # print(keyword)
                                uuids.append(
                                    (data.uuid, data.title, data.reason, keyword))
                    await cur.executemany(self.i_sql, uuids)
                    await conn.commit()
                    start = tmp_end

    def singleProcess(self, start, end):
        """單個進程的任務"""
        loop = asyncio.get_event_loop()
        # 為每個進程創建一個pool
        db = loop.run_until_complete(asyncio.ensure_future(
            self.createMysqlPool(loop)))

        tasks = []
        ranges = self.cutRange(start, end, self.aionum)
        print(ranges)
        for start, end in ranges:
            tasks.append(self.findKeyword(db, start, end))
        loop.run_until_complete(asyncio.gather(*tasks))

    def run(self):
        """多進程跑"""
        tasks = []
        ranges = self.cutRange(self.start, self.end, self.workers)
        start_time = time.time()
        with Pool(max_workers=self.workers) as executor:
            for start, end in ranges:
                print("processor start: %s, end: %s" % (start, end))
                tasks.append(executor.submit(self.singleProcess, start, end))
            for task in tasks:
                task.result()
        print("total time: %s" % (time.time() - start_time))

@click.command(help="運行")
@click.option("-w", "--workers", default=2, help="進程數")
@click.option('-p', "--pool", default=10, help="協程數")
@click.option('-s', '--start', default=0, help='MySQL開始的id')
@click.option('-e', "--end", default=2640000, help="MySQL結束的id")
def main(workers, pool, start, end):
    mp = MultiProcessMysql(workers=workers, pool=pool, start=start, end=end)
    if workers * pool > 100:
        if not click.confirm('MySQL連接數超過100(%s),確認嗎?' % (workers * pool)):
            return
    mp.run()

if __name__ == "__main__":
    main()

運行如下:
$ python3 async_mysql.py -w 2     # 可以指定其他參數,也可使用默認值

對于以上關于python3多進程和協程處理MySQL數據講義,如果大家還有更多需要了解的可以持續關注我們億速云的行業推新,如需獲取專業解答,可在官網聯系售前售后的,希望該文章可給大家帶來一定的知識更新。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

新巴尔虎左旗| 建湖县| 德钦县| 宁化县| 赣榆县| 光山县| 南昌县| 北辰区| 乐昌市| 商都县| 大埔县| 额济纳旗| 隆昌县| 阿拉善右旗| 东丽区| 谷城县| 于田县| 河曲县| 沅江市| 德阳市| 从江县| 惠水县| 米林县| 淳化县| 邛崃市| 乐山市| 蓝山县| 永春县| 沙田区| 南岸区| 湖北省| 本溪市| 铁岭县| 泌阳县| 祁连县| 苏尼特右旗| 渝中区| 栖霞市| 巩留县| 台江县| 石家庄市|