您好,登錄后才能下訂單哦!
小編給大家分享一下運維系列之FastAPI接口服務怎么用,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
第一部分:FastAPI接口部分
from fastapi import FastAPI, Path
from com.fy.fastapi.monitor.shell.fabric.FabricLinux import FabricLinux
app = FastAPI()
#執行shell命令;
#調用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95
@app.get("/cmd/{cmd}")
def runCommand(cmd, host, userName, password):
fabric = FabricLinux(host , userName , password)
result = fabric.runCommand(cmd)
fabric.closeClient()
return {"res": result }
#上傳文件;
#調用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95
@app.get("/upload/{srcFile}")
def upload(srcFile, targetDir, host, userName, password):
fabric = FabricLinux(host , userName , password)
result = fabric.upload(srcFile, targetDir)
fabric.closeClient()
return {"res": result }
#下載文件;
#調用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95
@app.get("/download/{srcFile}")
def download(srcFile, targetDir, host, userName, password):
#fabricu.download("/home/Crawler/WeChatSouGouMain.tar.gz", "D:\\txt\\WeChatSouGouMain.tar.gz")
fabric = FabricLinux(host , userName , password)
result = fabric.download(srcFile, targetDir)
fabric.closeClient()
return {"res": result }
#刪除文件
#調用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95
@app.get("/delete/{targetPath}")
def delete(targetPath:"必須是全路徑", host, userName, password):
fabric = FabricLinux(host , userName , password)
result = fabric.runCommand("rm -rf " + targetPath)
fabric.closeClient()
return {"res": result }
#解壓.tar.gz文件
#調用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95
@app.get("/delete/{targetPath}")
def decomTarGz(srcPath, tarPath, host, userName, password):
fabric = FabricLinux(host , userName , password)
result = fabric.decomTarGz(srcPath, tarPath)
fabric.closeClient()
return {"res": result }
#根據PID關閉相關的服務;
#調用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95
@app.get("/kill/{pid}")
def stop(pid: int=Path(..., gt=0, le=32767), host, userName, password):
''' gt: 大于; le: 小于等于 '''
fabric = FabricLinux(host , userName , password)
result = fabric.runCommand("kill -9 " + pid)
fabric.closeClient()
return {"res": result }
#根據Python命令符,以及啟動文件路徑,啟動相應的服務;
#調用示例:http://127.0.0.1:8000/start/python?startFile=%E6%B5%8B%E8%AF%95
@app.get("/start/{pycmd}")
def start(pycmd , startFile , host, userName, password):
fabric = FabricLinux(host , userName , password)
result = fabric.runCommand("nohup " + pycmd + " " + startFile + " &")
fabric.closeClient()
return {"res": result }
第二部分:fabric接口部分,主要用來執行命令行
from fabric import Connection
import traceback, os
class FabricLinux:
def __init__(self, host:"服務器IP", userName:"用戶名", password:"密碼"):
self.host = host
self.userName = userName
self.password = password
print(self.userName + "@" + self.host, {"password": self.password})
self.initClient()#初始化鏈接
#初始化ssh鏈接對象;
def initClient(self):
#如果服務器配置了SSH免密碼登錄,就不需要 connect_kwargs 來指定密碼了。
self.con = Connection(self.userName + "@" + self.host, connect_kwargs={"password": self.password})
#關閉fabric操作對象;
def closeClient(self):
self.con.close()
#執行shell命令;
def runCommand(self, sshCommand:"Linux命令行語句"):
#top命令尚未測試通過;
#如果命令行中包含路徑,最好使用絕對路徑;
try:
#語法:run('遠程命令')
result = self.con.run(sshCommand, hide=True)
if result.return_code == 0:# 返回碼,0表示正確執行,1表示錯誤
return True, result.stdout
return result.failed, result.stdout
except:
exp = traceback.format_exc()
if "mkdir" in exp and 'File exists' in exp:
print("目錄【", sshCommand, "】已存在")
else:
print(exp)
return False, exp
#在特定目錄下,執行shell命令;
def runDir(self, sshCommand:"Linux命令行語句", dir):
if not os.path.isdir(dir):
return "目標路徑錯誤,必須是目錄"
with self.cd(dir):#with表示,with塊的代碼是在dir目錄下執行;
return self.runCommand(sshCommand)
#解壓.tar.gz文件;
def decomTarGz(self, srcPath, tarPath):
if os.path.isdir(srcPath):
return "帶解壓的路徑,必須是文件類型"
if not os.path.isdir(tarPath):
return "目標路徑錯誤,必須是目錄"
return fabricu.runCommand("tar -zxvf " + srcPath + " -C " + tarPath)
#切換到某個目錄下(上下文不連貫)
def cd(self, dir):
#語法:cd('遠程目錄')
self.con.cd(dir)
#上傳本地文件到遠程主機
def upload(self, src:"待上傳的文件全路徑。路徑中最好不要有空格等", target:"保存到服務器上的目錄"):
if not os.path.isdir(target):
return "待上傳的文件全路徑"
elif " " in src:
return "路徑中不允許有空格、(、)等特殊字符"
#語法:put('本地文件', '遠程目錄')
else:return self.con.put(src, target)
#從遠程主機下載文件到本地
def download(self , src:"遠程文件全路徑", target:"本地文件路徑(必須包含文件名稱的全路徑)"):
#語法:get('遠程文件', '本地文件路徑')
#示例:fabricu.download("/home/Crawler/WeChatSouGouMain.tar.gz", "D:\\txt\\WeChatSouGouMain.tar.gz")
if not os.path.isdir(target):
return self.con.get(src, target)
else:return "目標路徑錯誤,必須是包含文件名稱的全路徑"
看完了這篇文章,相信你對“運維系列之FastAPI接口服務怎么用”有了一定的了解,如果想了解更多相關知識,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。