您好,登錄后才能下訂單哦!
這篇文章主要講解了“怎么用python+Element實現主機Host操作”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“怎么用python+Element實現主機Host操作”吧!
<%inherit file="/base.html"/> <div id="app"> <div class="col-md-10 col-md-offset-1" > <div class="container-fluid mb0 " > <div class="row"> <div class="col-md-4"> <form class="form-horizontal"> <div class="form-group clearfix "> <label class="col-sm-4 control-label bk-lh40 pt0">選擇業務:</label> <div class="col-sm-8"> <el-select v-model="searchBiz" placeholder="請選擇業務名稱" @change="changeBiz"> <el-option v-for="item in bkBizData" :label="item.bk_biz_name" :value="item.bk_biz_id"></el-option> </el-select> </div> </div> </form> </div> <div class="col-md-6"> <form class="form-horizontal"> <div class="form-group clearfix "> <label class="col-sm-4 control-label bk-lh40 pt0">請輸入主機IP:</label> <div class="col-sm-8"> <el-input v-model="searchHostIp" placeholder="請輸入主機IP" ></el-input> </div> </div> </form> </div> <div class="col-md-2"> <form class="form-horizontal"> <div class="form-group clearfix "> <div class="col-sm-8"> <el-button type="primary" @click="getSearch">搜索</el-button> </div> </div> </form> </div> </div> </div> <el-table :data="hostData" border > <el-table-column prop="ip" label="內網IP"></el-table-column> <el-table-column prop="bk_os_name" label="系統名"></el-table-column> <el-table-column prop="host_name" label="主機名"></el-table-column> <el-table-column prop="cloud_name" label="云區域"></el-table-column> <el-table-column prop="mem_use" label="Mem(%)"></el-table-column> <el-table-column prop="disk_use" label="Disk(%)"></el-table-column> <el-table-column prop="cpu_use" label="CPU(%)"></el-table-column> <el-table-column label="操作" width="300%"> <template slot-scope="scope"> <el-button :type='scope.row.is_monitored ? "primary":"success"' size="small" @click="editHost(scope.row)">{{ scope.row.is_monitored ? "移除監控":"加入監控" }}</el-button> <el-button type="warning" size="small" @click="getPerformData(scope.row)">查看性能</el-button> <el-button type="danger" size="small" @click="getStatus(scope.row)">查看狀態</el-button> </template> </el-table-column> </el-table> <!-- 設置面板End --> </div> </div>
<script type="text/javascript"> new Vue({ el: '#app', data: { bkBizData: [], hostData: [], searchBiz:'', searchHostIp:'' }, mounted() { // 頁面加載就獲取所有模板 this.init(); this.getSearch(); }, methods: { init() { axios.get(site_url + "get_biz_list/").then(res => { if (res.data.result){ this.bkBizData = res.data.data; }else{ this.$message.error('獲取業務失敗'); } },'json'); this.getSearch(); }, getSearch() { axios.get(site_url + "host_view/?search_biz_id=" + this.searchBiz + "&query_str=" + this.searchHostIp).then(res => { if (res.data.result){ this.hostData = res.data.data; }else{ this.$message.error('獲取模板失敗'); } },'json'); }, changeBiz(){ this.getSearch(); }, editHost(row) { const tmpText = row.is_monitored ? "主機移除監控隊列, 是否繼續?" : "主機加入監控隊列, 是否繼續?"; this.$confirm(tmpText, '提示', { confirmButtonText: '確定', cancelButtonText: '取消', type: 'warning' }).then(() => { axios.post(site_url + "host_view/", {"host_id": row.host_id,"is_monitored": row.is_monitored}).then(res => { if (res.data.result) { if(row.is_monitored){ this.$message.success('主機移除監控隊列成功'); } else { this.$message.warning('主機加入監控隊列成功'); } this.getSearch(); } else { this.$message.error('更新主機監控狀態失敗'); } }, 'json'); }).catch(() => { this.$message({type: 'info', message: '已取消刪除'}); }); }, getPerformData(row) { this.$confirm("是否查詢系統資源使用情況?", '提示', { confirmButtonText: '確定', cancelButtonText: '取消', type: 'warning' }).then(() => { const params = { "host_id": row.host_id, "bk_biz_id": row.bk_biz_id, "ip": row.ip, "cloud_id": row.cloud_id, }; axios.post(site_url + "get_perform_data/", params).then(res => { if (res.data.result) { row.mem_use = res.data.data.mem; row.cpu_use = res.data.data.cpu; row.disk_use = res.data.data.disk; } else { this.$message.error('查詢主機系統資源使用情況失敗'); } }, 'json'); }).catch(() => { this.$message({type: 'info', message: '已取消查詢'}); }); }, getStatus(row) { location.href = site_url + 'status/' } } }) </script>
urls.py
文件內容
from django.conf.urls import patterns from home_application.temp import views as temp_view from home_application.job import views as job_view from home_application.host import views as host_view from home_application.exam import views as exam_view urlpatterns = patterns( 'home_application.views', (r'^$', job_view.job), (r'^dev-guide/$', 'dev_guide'), (r'^contactus/$', 'contactus'), (r'^api/test/$', 'test'), (r'^temp/$', 'temp'), (r'^host/$', host_view.host), (r'^status/$', host_view.status), (r'^host_view/$', host_view.HostView.as_view()), (r'^get_all_hosts/$', host_view.get_all_hosts), (r'^get_perform_data/$', host_view.get_perform_data), ... )
host\views.py
文件內容
import json from django.views.generic import View from django.views.decorators.csrf import csrf_exempt from django.utils.decorators import method_decorator from django.http import JsonResponse from common.mymako import render_mako_context from home_application.models import Host, LoadData from home_application.utils.job_api import FastJobApi perform_script = """ #!/bin/bash MEMORY=$(free -m | awk 'NR==2{printf "%.2f%%",$3*100/$2}') DISK=$(df -h| awk '$NF=="/"{printf "%s",$5}') CPU=$(top -bn1 | grep load | awk '{printf "%.2f%%",$(NF-2)}') DATE=$(date "+%Y-%m-%d %H:%M:%S") echo -e "$DATE|$MEMORY|$DISK|$CPU" """ def host(request): return render_mako_context(request, '/home_application/host.html') def status(request): return render_mako_context(request, "/home_application/status.html") class CsrfExemptView(View): @method_decorator(csrf_exempt) def dispatch(self, request, *args, **kwargs): return super(CsrfExemptView, self).dispatch(request, *args, **kwargs) def get_all_hosts(request): from home_application.utils.cc_by_request import cc_search_host res_data = cc_search_host().get("info") for host in res_data: bk_host_innerip = host.get("host", {}).get("bk_host_innerip") bk_host_id = host.get("host", {}).get("bk_host_id") bk_host_name = host.get("host", {}).get("bk_host_name") bk_os_name = host.get("host", {}).get("bk_os_name") bk_biz_id = host.get("biz", [])[0].get("bk_biz_id") bk_biz_name = host.get("biz", [])[0].get("bk_biz_name") cloud_name = host.get("host", {}).get("bk_cloud_id")[0].get("bk_inst_name") cloud_id = host.get("host", {}).get("bk_cloud_id")[0].get("bk_inst_id") host_obj = { "ip": bk_host_innerip, "bk_biz_id": bk_biz_id, "os_name": bk_os_name, "host_name": bk_host_name, "bk_biz_name": bk_biz_name, "cloud_id": cloud_id, "cloud_name": cloud_name, } Host.objects.update_or_create(host_id=bk_host_id, defaults=host_obj) return JsonResponse({"result": res_data}) @csrf_exempt def get_perform_data(request): data = json.loads(request.body) host_id = data.get("host_id") try: obj = LoadData.objects.filter(host_id=host_id).last() if obj: res_data = { "cpu": obj.cpu, "mem": obj.mem, "disk": obj.disk, } return JsonResponse({"result": True, "data": res_data}) bk_biz_id = int(data.get("bk_biz_id")) ip = data.get("ip") cloud_id = data.get("cloud_id") # res_data = execute_script_and_return(request,ip_list, bk_biz_id, perform_script) obj = FastJobApi(bk_biz_id, cloud_id, ip, perform_script) job_res = obj.execute_script_and_return() if job_res: log_content = job_res[0].get("log_content") res_data = { "result": True, "cpu": log_content.split("|")[3], "mem": log_content.split("|")[1], "disk": log_content.split("|")[2], } return JsonResponse({"result": True, "data": res_data}) return JsonResponse({"result": False}) except Exception: return JsonResponse({"result": False}) class HostView(CsrfExemptView): def get(self, request, *args, **kwargs): try: host_query = Host.objects.all() except Exception: return JsonResponse({"result": False}) search_biz_id = request.GET.get("search_biz_id") query_str = request.GET.get("query_str") if search_biz_id: host_query = host_query.filter(bk_biz_id=search_biz_id) if query_str: host_query = host_query.filter(ip__in=query_str.split(",")) host_query = host_query[:30] if host_query.count() > 10 else host_query res_data = [i.to_dict() for i in host_query] return JsonResponse({"result": True, "data": res_data}) def post(self, request, *args, **kwargs): try: data = json.loads(request.body) host_id = data.get("host_id") is_monitored = data.get("is_monitored") if is_monitored: Host.objects.filter(host_id=host_id).update(is_monitored=False) else: Host.objects.filter(host_id=host_id).update(is_monitored=True) return JsonResponse({"result": True}) except Exception: return JsonResponse({"result": False})
models.py
文件內容
from django.db import models from home_application.utils.parse_time import parse_datetime_to_timestr class Host(models.Model): host_id = models.IntegerField(u"主機ID", primary_key=True, unique=True) ip = models.CharField(u"IP地址", max_length=32, blank=True, null=True) bk_biz_id = models.CharField(u"業務ID", max_length=16, blank=True, null=True) bk_biz_name = models.CharField(u"業務名稱", max_length=512, blank=True, null=True) os_name = models.CharField(u"系統名", max_length=128, blank=True, null=True) host_name = models.CharField(u"主機名", max_length=128, blank=True, null=True) cloud_id = models.IntegerField(u"云區域ID", blank=True, null=True) cloud_name = models.CharField(u"云區域名稱", max_length=32, blank=True, null=True) is_monitored = models.BooleanField(u"是否已監控", default=False) def to_dict(self): return { "host_id": self.host_id, "ip": self.ip, "pk": self.pk, "bk_biz_id": self.bk_biz_id, "bk_biz_name": self.bk_biz_name, "os_name": self.os_name, "host_name": self.host_name, "cloud_name": self.cloud_name, "cloud_id": self.cloud_id, "is_monitored": self.is_monitored, "mem_use": "-", "cpu_use": "-", "disk_use": "-", } def get_load_data(self): load_query = LoadData.objects.filter(host_id=self.pk).order_by("create_time") return [i.to_dict() for i in load_query] class LoadData(models.Model): host_id = models.IntegerField(u"主機ID", default=0) cpu = models.IntegerField(u"CPU使用率", default=0) mem = models.IntegerField(u"內存使用率", default=0) disk = models.IntegerField(u"硬盤使用率", default=0) create_time = models.DateTimeField(u"創建時間", auto_now_add=True) def to_dict(self): return { "host_id": self.host_id, "cpu": self.cpu, "mem": self.mem, "disk": self.disk, "create_time": parse_datetime_to_timestr(self.create_time) }
FastJobApi
部分代碼
import base64 import time from conf.default import APP_ID, APP_TOKEN from blueking.component.shortcuts import get_client_by_user, get_client_by_request count = 0 class FastJobApi(object): def __init__(self, bk_biz_id, bk_cloud_id, ip_list, script_content): self.client = get_client_by_user('admin') self.username = "admin" self.biz_id = bk_biz_id self.script_content = script_content self.ip_list = [{"bk_cloud_id": bk_cloud_id, "ip": i} for i in ip_list.split(",")] def _fast_execute_script(self, execute_account="root", param_content='', script_timeout=1000): """ 快速執行腳本 :param execute_account: 執行腳本的賬戶名 :param param_content: 執行腳本的參數 :param script_timeout: 腳本執行超時時間 :return: job_instance_id """ kwargs = { "bk_app_code": APP_ID, "bk_app_secret": APP_TOKEN, "bk_biz_id": self.biz_id, "bk_username": self.username, "script_content": base64.b64encode(self.script_content), "ip_list": self.ip_list, "script_type": 1, "account": execute_account, "script_param": base64.b64encode(param_content), "script_timeout": script_timeout } result = self.client.job.fast_execute_script(kwargs) if result["result"]: return result.get("data").get("job_instance_id") return False def _get_job_instance_status(self, task_id): """ 獲取腳本執行狀態 :param task_id: 執行腳本的 job_instance_id :return: """ global count count += 1 # 查詢執行狀態 resp = self.client.job.get_job_instance_status( bk_username=self.username, bk_biz_id=self.biz_id, job_instance_id=task_id ) if resp.get('data').get('is_finished'): count = 0 return True elif not resp.get('data').get('is_finished') and count <= 5: time.sleep(2) return self._get_job_instance_status(task_id) else: count = 0 return False def _get_job_instance_log(self, task_id): """ 查詢作業日志 :param task_id: 執行腳本的 job_instance_id :return: """ if self._get_job_instance_status(task_id): resp = self.client.job.get_job_instance_log( job_instance_id=task_id, bk_biz_id=self.biz_id, bk_username='admin' ) return resp['data'][0]['step_results'][0]['ip_logs'] def execute_script_and_return(self): """ 執行腳本并獲取腳本執行結果 :return: 腳本執行結果 """ job_instance_id = self._fast_execute_script() if job_instance_id: ip_logs = self._get_job_instance_log(job_instance_id) return ip_logs
實現效果
感謝各位的閱讀,以上就是“怎么用python+Element實現主機Host操作”的內容了,經過本文的學習后,相信大家對怎么用python+Element實現主機Host操作這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。