#!/usr/bin/python3
import argparse
import json
import logging
import time
from datetime import datetime
from http import HTTPStatus
from typing import Callable, Dict
import requests
def retry(fn, **kwargs):
response = fn(**kwargs)
if response.status_code == HTTPStatus.SERVICE_UNAVAILABLE:
for attempt in range(1, 11):
time.sleep(0.1 * attempt)
logging.info(f"Trying to execute request to {kwargs['url']} attempt {attempt}" )
response = fn(**kwargs)
if response.status_code != HTTPStatus.SERVICE_UNAVAILABLE:
break
return response
def make_json_error(code: int, text: str):
return {
"error_code" : code,
"error" : text
}
def execute_request(request: Callable, path, output=False, token=None, **kwargs):
cookies = {}
headers = {}
if token:
cookies.update({"ses6" : token})
headers.update({"x-xsrf-token" : token})
logging.info(f"Executing request to {path}" )
response = retry(request, url=path, cookies=cookies, headers=headers, **kwargs)
content = response.content.decode("utf-8" )
if response.status_code not in (HTTPStatus.OK, HTTPStatus.CREATED):
logging.error(f"Return code {response.status_code}: {content}" )
return make_json_error(response.status_code, content)
if output:
try:
result = json.loads(content)
except json.JSONDecodeError as e:
logging.error(f"Error while parsing response: {content}" )
result = make_json_error(HTTPStatus.INTERNAL_SERVER_ERROR.value, e.msg)
return result
return None
class API:
def __init__(self, url, email, password):
self.url = url
self.email = email
self.password = password
self.token = None
def get(self, path, params=None):
return execute_request(requests.get, f"{self.url}{path}" , output=True, token=self.token, params=params)
def post(self, path, data = {}):
return execute_request(requests.post, f"{self.url}{path}" , output=True, token=self.token, json=data)
def delete(self, path):
return execute_request(requests.delete, f"{self.url}{path}" , token=self.token)
def __auth(self):
result = self.post("/auth/v4/public/token" , {"email" : self.email, "password" : self.password})
if result is not None and result.get("error" ):
raise RuntimeError(result["error" ])
self.token = result["token" ]
def __enter__(self):
self.__auth()
return self
def __exit__(self, e_type, e_value, e_traceback):
if self.token:
self.delete(f"/auth/v4/token/{self.token}" )
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Check VM bandwidth" )
parser.add_argument("--url" , type =str, metavar="url" , help =f"Platform url in format \"https://<your_domain>\"" , required=True)
parser.add_argument("--email" , type =str, metavar="email" , help ="Platform administrator email" , required=True)
parser.add_argument("--password" , type =str, metavar="password" , help ="Platform administrator password" , required=True)
parser.add_argument("--vmid" , type =int, metavar="vmid" , help ="Id of the VM" , required=True)
parser.add_argument("--threshold-gib" , type =int, metavar="threshold_gib" , help ="VM bandwidth threshold, in GiB" , default=1024)
parser.add_argument("--limit-mbitps" , type =int, metavar="limit_mbitps" , help ="VM bandwidth limit after reaching threshold, in Mbit/s" , default=10)
return parser.parse_args()
def main():
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
args = parse_args()
print (args)
now = datetime.now().strftime('%H:%M_%Y%m%d' )
first_day = datetime.today().replace(day=1, hour=0, minute=0).strftime('%H:%M_%Y%m%d' )
logging.info(f"Checking bandwidth from {first_day} to {now} for VM {args.vmid}" )
with API(args.url, args.email, args.password) as api:
summary_bytes = 0
rx = api.get(f"/vm/v3/host/{args.vmid}/metrics" , params = {
"target" : "net_rx_summary" ,
"from" : first_day,
"until" : now,
"output" : "single" ,
"interval" : "1month"
})
logging.debug(rx)
if rx is not None and isinstance(rx, Dict) and rx.get("error" ):
raise RuntimeError(rx["error" ])
if rx:
datapoints = rx[0].get("datapoints" , [])
for point in datapoints:
if point[0]:
summary_bytes += point[0]
tx = api.get(f"/vm/v3/host/{args.vmid}/metrics" , params = {
"target" : "net_tx_summary" ,
"from" : first_day,
"until" : now,
"output" : "single" ,
"interval" : "1month"
})
if tx is not None and isinstance(tx, Dict) and tx.get("error" ):
raise RuntimeError(tx["error" ])
if tx:
datapoints = tx[0].get("datapoints" , [])
for point in datapoints:
if point[0]:
summary_bytes += point[0]
if summary_bytes / 2**30 >= args.threshold_gib:
logging.info(f"Threshold {args.threshold_gib} GiB reached, limiting speed to {args.limit_mbitps}" )
result = api.post(f"/vm/v3/host/{args.vmid}/resource" , data = {
"net_in_mbitps" : args.limit_mbitps,
"net_out_mbitps" : args.limit_mbitps
})
logging.info(result)
if __name__ == '__main__' :
main()