亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

python異步實現定時任務和周期任務的方法

發布時間:2020-10-25 02:50:25 來源:腳本之家 閱讀:238 作者:狡猾的皮球 欄目:開發技術

一. 如何調用

def f1(arg1, arg2):
  print('f1', arg1, arg2)
 
 
def f2(arg1):
  print('f2', arg1)
 
 
def f3():
  print('f3')
 
 
def f4():
  print('周期任務', int(time.time()))
 
 
timer = TaskTimer()
# 把任務加入任務隊列
timer.join_task(f1, [1, 2], timing=15.5) # 每天15:30執行
timer.join_task(f2, [3], timing=14) # 每天14:00執行
timer.join_task(f3, [], timing=15) # 每天15:00執行
timer.join_task(f4, [], interval=10) # 每10秒執行1次
# 開始執行(此時才會創建線程)
timer.start()

f1~f4是我們需要定時執行的函數。

首先創建TaskTimer對象(TaskTimer的代碼在下面)。調用join_task函數,把需要執行的函數加入到任務隊列。最后調用start,任務就開始執行了。

join_task參數:

fun:需要執行的函數

arg:fun的參數,如果沒有就傳一個空列表

interval:如果有此參數,說明任務是周期任務,單位為秒(注意interval最少5秒)

timing:如果有此參數,說明任務是定時任務,單位為時

注意:interval和timing只能選填1個

二. 源碼

import datetime
import time
from threading import Thread
from time import sleep
 
 
class TaskTimer:
  __instance = None
 
  def __new__(cls, *args, **kwargs):
    """
    單例模式
    """
    if not cls.__instance:
      cls.__instance = object.__new__(cls)
    return cls.__instance
 
  def __init__(self):
    if not hasattr(self, 'task_queue'):
      setattr(self, 'task_queue', [])
 
    if not hasattr(self, 'is_running'):
      setattr(self, 'is_running', False)
 
  def write_log(self, level, msg):
    cur_time = datetime.datetime.now()
    with open('./task.log', mode='a+', encoding='utf8') as file:
      s = "[" + str(cur_time) + "][" + level + "]  " + msg
      print(s)
      file.write(s + "\n")
 
  def work(self):
 
    """
    處理任務隊列
    """
    while True:
      for task in self.task_queue:
        if task['interval']:
          self.cycle_task(task)
        elif task['timing']:
          self.timing_task(task)
 
      sleep(5)
 
  def cycle_task(self, task):
    """
    周期任務
    """
    if task['next_sec'] <= int(time.time()):
      try:
        task['fun'](*task['arg'])
        self.write_log("正常", "周期任務:" + task['fun'].__name__ + " 已執行")
      except Exception as e:
        self.write_log("異常", "周期任務:" + task['fun'].__name__ + " 函數內部異常:" + str(e))
      finally:
        task['next_sec'] = int(time.time()) + task['interval']
 
  def timing_task(self, task):
    """
    定時任務
    """
    # 今天已過秒數
    today_sec = self.get_today_until_now()
 
    # 到了第二天,就重置任務狀態
    if task['today'] != self.get_today():
      task['today'] = self.get_today()
      task['today_done'] = False
 
    # 第一次執行
    if task['first_work']:
      if today_sec >= task['task_sec']:
        task['today_done'] = True
        task['first_work'] = False
      else:
        task['first_work'] = False
 
    # 今天還沒有執行
    if not task['today_done']:
      if today_sec >= task['task_sec']: # 到點了,開始執行任務
        try:
          task['fun'](*task['arg'])
          self.write_log("正常", "定時任務:" + task['fun'].__name__ + " 已執行")
        except Exception as e:
          self.write_log("異常", "定時任務:" + task['fun'].__name__ + " 函數內部異常:" + str(e))
        finally:
          task['today_done'] = True
          if task['first_work']:
            task['first_work'] = False
 
  def get_today_until_now(self):
    """
    獲取今天凌晨到現在的秒數
    """
    i = datetime.datetime.now()
    return i.hour * 3600 + i.minute * 60 + i.second
 
  def get_today(self):
    """
    獲取今天的日期
    """
    i = datetime.datetime.now()
    return i.day
 
  def join_task(self, fun, arg, interval=None, timing=None):
    """
    interval和timing只能存在1個
    :param fun: 你要調用的任務
    :param arg: fun的參數
    :param interval: 周期任務,單位秒
    :param timing: 定時任務,取值:[0,24)
    """
    # 參數校驗
    if (interval != None and timing != None) or (interval == None and timing == None):
      raise Exception('interval和timing只能選填1個')
 
    if timing and not 0 <= timing < 24:
      raise Exception('timing的取值范圍為[0,24)')
 
    if interval and interval < 5:
      raise Exception('interval最少為5')
 
    # 封裝一個task
    task = {
      'fun': fun,
      'arg': arg,
      'interval': interval,
      'timing': timing,
    }
    # 封裝周期或定時任務相應的參數
    if timing:
      task['task_sec'] = timing * 3600
      task['today_done'] = False
      task['first_work'] = True
      task['today'] = self.get_today()
    elif interval:
      task['next_sec'] = int(time.time()) + interval
 
    # 把task加入任務隊列
    self.task_queue.append(task)
 
    self.write_log("正常", "新增任務:" + fun.__name__)
 
  def start(self):
    """
    開始執行任務
    返回線程標識符
    """
    if not self.is_running:
      thread = Thread(target=self.work)
 
      thread.start()
 
      self.is_running = True
 
      self.write_log("正常", "TaskTimer已開始運行!")
 
      return thread.ident
 
    self.write_log("警告", "TaskTimer已運行,請勿重復啟動!")

以上這篇python異步實現定時任務和周期任務的方法就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持億速云。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

镇沅| 嘉禾县| 五常市| 海盐县| 临安市| 怀来县| 体育| 咸丰县| 四平市| 瑞丽市| 南康市| 江都市| 浪卡子县| 周宁县| 德昌县| 闵行区| 莱阳市| 罗江县| 平谷区| 新建县| 婺源县| 韶山市| 金沙县| 大悟县| 商城县| 泰顺县| 淮南市| 沿河| 永德县| 田阳县| 安阳县| 榆树市| 五家渠市| 阳朔县| 于都县| 兴城市| 米脂县| 甘肃省| 琼中| 南昌县| 治多县|