![]() |
Camphor Networks Platform API
|
Public Member Functions | |
def | camphor_ssh (cls, user, ip, cmd, quote="'", retry=0, wait=3, timeout=0, ignore=False, debug=False, use_echo=False, command='sudo bash', key='') |
def | camphor_scp (cls, user, ip, src, dst='', retry=0, wait=3, timeout=0, ignore=False, debug=False, key='', direction='to') |
def | drop_cache (cls, value=3) |
def | initialize_logging (cls, filename, filemode="a", level=logging.DEBUG) |
def | normalized_email (cls, email) |
str | ns (cls, str username=environ.get('CAMPHOR_USERNAME', 'administrator'), str project=environ.get('CAMPHOR_PROJECT', 'default')) |
dict | yamlStringToDict (cls, str yaml_str) |
dict | yamlFileToDict (cls, any yaml_file) |
dict | cleanDict (cls, dict data) |
def | is_ipv4_prefix_valid (cls, address, strict=True) |
tuple[str, int] | cli (cls, str cmd, str expect=None, str username=environ.get('CAMPHOR_TEST_USERNAME', 'administrator'), str project=environ.get('CAMPHOR_TEST_PROJECT', 'default'), str controller=environ.get('CAMPHOR_TEST_CONTROLLER', None), int port=int(environ.get('CAMPHOR_TEST_CONTROLLER_PORT', 22)), int retry=int(environ.get('CAMPHOR_TEST_RETRY', 0)), int wait=int(environ.get('CAMPHOR_TEST_WAIT', 3)), int timeout=int(environ.get('CAMPHOR_TEST_TIMEOUT', 0)), bool debug=environ.get('CAMPHOR_TEST_DEBUG', '').upper()=='TRUE', bool verbose=environ.get('CAMPHOR_TEST_VERBOSE', '').upper()=='TRUE', bool ignore=environ.get('CAMPHOR_TEST_IGNORE', '').upper()=='TRUE') |
Run a Camphor CLI Command. More... | |
tuple[str, int] | commit (cls, bool commit=True, bool clear=False, bool synchronize=environ.get('CAMPHOR_TEST_SYNCHRONIZE', False), str username=environ.get('CAMPHOR_TEST_USERNAME', 'administrator'), str project=environ.get('CAMPHOR_TEST_PROJECT', 'default'), str controller=environ.get('CAMPHOR_TEST_CONTROLLER', None), int port=int(environ.get('CAMPHOR_TEST_CONTROLLER_PORT', 22)), int retry=int(environ.get('CAMPHOR_TEST_RETRY', 0)), int wait=int(environ.get('CAMPHOR_TEST_WAIT', 3)), int timeout=int(environ.get('CAMPHOR_TEST_TIMEOUT', 0)), bool debug=environ.get('CAMPHOR_TEST_DEBUG', '').upper()=='TRUE', bool verbose=environ.get('CAMPHOR_TEST_VERBOSE', '').upper()=='TRUE', bool ignore=environ.get('CAMPHOR_TEST_IGNORE', '').upper()=='TRUE') |
Commit Camphor Configuration. More... | |
str | getDefaultDevicePassword (cls, str os=environ.get('CAMPHOR_DEVICE_DEFAULT_OS', 'FRR')) |
Get default password to login to a device. More... | |
list[dict] | getCamphorItems (cls, str type, str name='', bool conf=False, bool detail=False, bool json=False, bool yaml=False, bool debug=False, bool verbose=False, str username='administrator', str project='default', str controller=None, int port=None, int retry=0, int wait=3, int timeout=0, bool ignore=False) |
bool | ping (cls, str from_device_name, str from_device, str to_device, DeviceOS from_device_os=None, bool append_ns=True, int count=3, str jump_host=F"sshpass -p '{environ.get('CAMPHOR_PASSWORD', 'rootroot')}' "+F"ssh -l root {environ.get('CAMPHOR_CONTROLLER')}", str username='admin', str project='default', str password=environ.get('CAMPHOR_PASSWORD', 'rootroot'), int retry=60 *2, int wait=15, int timeout=30, bool ignore=True) |
Execute ping command on a device against a particular destination. More... | |
def | mergeDicts (cls, dest, src) |
str | appendToCommand (cls, str cmd, str key, any value, bool quote=False, bool multiLine=False) |
Append key value to a command string. More... | |
tuple[str, int] | uploadImage (cls, str file, str md5sum='', str camphor_kubernetes_master='localhost', bool ignore=False, int retry=6, int wait=10, str username='admin') |
Upload a device image onto the camphor networks platform. More... | |
def | getCamphorController (cls, str controller=None, int port=None, str username='administrator', str project='default') |
def | render_jinja_template (cls, file, base64, destdir='/var/log/camphor') |
def | printYaml (cls, str data, bool debug=False) |
def | is_ipv4_address_in_subnet (cls, address, subnet) |
def | is_ipv4_address_valid (cls, address) |
def | delete_file (cls, f) |
def | fread (cls, f, base64=False) |
def | camphor_skip_test (cls, skip_file='/tmp/.camphor_pytest_skip_until') |
def | mac (cls, separator=':') |
def | fwrite (cls, str f, str data, int mode=None, bool append=False) |
def | delete_lines (cls, file_name) |
def | grep (cls, file_name, pattern) |
def | initializeNbProjectData (cls) |
def | get_file_from_git (cls, project, file_name) |
def | merge_dicts (object cls, dict d1, dict d2) |
def | flatten_dict (cls, dikt, key) |
def | readFromFile (object cls, str fl) |
def | dict_get (cls, dikt, key, provider="", default="", secret=None, base64=True) |
def | encrypt (cls, data, secret="%s/.ssh/id_rsa" % environ['HOME'], base64=True) |
def | decrypt (cls, data, secret="%s/.ssh/id_rsa" % environ['HOME'], base64=True) |
Static Public Attributes | |
dictionary | CAMPHOR_CLUSTER_DATA = {} |
dictionary | CAMPHOR_PROJECT_DATA = {} |
dictionary | CAMPHOR_DATA = {} |
Definition at line 222 of file camphor_util.py.
def camphor_ssh | ( | cls, | |
user, | |||
ip, | |||
cmd, | |||
quote = "'" , |
|||
retry = 0 , |
|||
wait = 3 , |
|||
timeout = 0 , |
|||
ignore = False , |
|||
debug = False , |
|||
use_echo = False , |
|||
command = 'sudo bash' , |
|||
key = '' |
|||
) |
Definition at line 228 of file camphor_util.py.
def camphor_scp | ( | cls, | |
user, | |||
ip, | |||
src, | |||
dst = '' , |
|||
retry = 0 , |
|||
wait = 3 , |
|||
timeout = 0 , |
|||
ignore = False , |
|||
debug = False , |
|||
key = '' , |
|||
direction = 'to' |
|||
) |
Definition at line 241 of file camphor_util.py.
def drop_cache | ( | cls, | |
value = 3 |
|||
) |
Definition at line 257 of file camphor_util.py.
def initialize_logging | ( | cls, | |
filename, | |||
filemode = "a" , |
|||
level = logging.DEBUG |
|||
) |
Definition at line 262 of file camphor_util.py.
def normalized_email | ( | cls, | |
) |
Definition at line 269 of file camphor_util.py.
str ns | ( | cls, | |
str | username = environ.get('CAMPHOR_USERNAME', 'administrator') , |
||
str | project = environ.get('CAMPHOR_PROJECT', 'default') |
||
) |
Definition at line 273 of file camphor_util.py.
Referenced by Util.ping().
dict yamlStringToDict | ( | cls, | |
str | yaml_str | ||
) |
Definition at line 278 of file camphor_util.py.
References Util.cleanDict().
dict yamlFileToDict | ( | cls, | |
any | yaml_file | ||
) |
Definition at line 283 of file camphor_util.py.
References Util.cleanDict().
dict cleanDict | ( | cls, | |
dict | data | ||
) |
Definition at line 288 of file camphor_util.py.
Referenced by Util.yamlFileToDict(), and Util.yamlStringToDict().
def is_ipv4_prefix_valid | ( | cls, | |
address, | |||
strict = True |
|||
) |
Definition at line 292 of file camphor_util.py.
tuple[str, int] cli | ( | cls, | |
str | cmd, | ||
str | expect = None , |
||
str | username = environ.get('CAMPHOR_TEST_USERNAME', 'administrator') , |
||
str | project = environ.get('CAMPHOR_TEST_PROJECT', 'default') , |
||
str | controller = environ.get('CAMPHOR_TEST_CONTROLLER', None) , |
||
int | port = int(environ.get('CAMPHOR_TEST_CONTROLLER_PORT', 22)) , |
||
int | retry = int(environ.get('CAMPHOR_TEST_RETRY', 0)) , |
||
int | wait = int(environ.get('CAMPHOR_TEST_WAIT', 3)) , |
||
int | timeout = int(environ.get('CAMPHOR_TEST_TIMEOUT', 0)) , |
||
bool | debug = environ.get('CAMPHOR_TEST_DEBUG', '').upper() == 'TRUE' , |
||
bool | verbose = environ.get('CAMPHOR_TEST_VERBOSE', '').upper() == 'TRUE' , |
||
bool | ignore = environ.get('CAMPHOR_TEST_IGNORE', '').upper() == 'TRUE' |
||
) |
Run a Camphor CLI Command.
Execute a Camphor CLI command over ssh at the camphor controller
[in] | cmd | Command string to execute |
[in] | username | Username of the camphor controller |
[in] | project | Project name of the camphor controller |
[in] | controller | DNS name of the camphor controller, typically used from outside the platform |
[in] | port | Port number of the camphor controller, typically used from outside the platform |
[in] | retry | Number of times to retry the command before bailing out |
[in] | wait | Interval in seconds to wait between failed retry attempts |
[in] | timeout | Interval in seconds before timing out |
[in] | expect | String pattern to expect in the output |
[in] | ignore | Ignore failed command execution |
[in] | debug | Print debug information |
[in] | verbose | Print verbose information |
Definition at line 320 of file camphor_util.py.
References camphor_api.camphor_util.CAMPHOR_LOG_DEBUG().
Referenced by Util.commit().
tuple[str, int] commit | ( | cls, | |
bool | commit = True , |
||
bool | clear = False , |
||
bool | synchronize = environ.get('CAMPHOR_TEST_SYNCHRONIZE', False) , |
||
str | username = environ.get('CAMPHOR_TEST_USERNAME', 'administrator') , |
||
str | project = environ.get('CAMPHOR_TEST_PROJECT', 'default') , |
||
str | controller = environ.get('CAMPHOR_TEST_CONTROLLER', None) , |
||
int | port = int(environ.get('CAMPHOR_TEST_CONTROLLER_PORT', 22)) , |
||
int | retry = int(environ.get('CAMPHOR_TEST_RETRY', 0)) , |
||
int | wait = int(environ.get('CAMPHOR_TEST_WAIT', 3)) , |
||
int | timeout = int(environ.get('CAMPHOR_TEST_TIMEOUT', 0)) , |
||
bool | debug = environ.get('CAMPHOR_TEST_DEBUG', '').upper() == 'TRUE' , |
||
bool | verbose = environ.get('CAMPHOR_TEST_VERBOSE', '').upper() == 'TRUE' , |
||
bool | ignore = environ.get('CAMPHOR_TEST_IGNORE', '').upper() == 'TRUE' |
||
) |
Commit Camphor Configuration.
Execute Camphor CLI command "Commit" to commit the configuration
[in] | commit | Commit configuration |
[in] | clear | Clear configuration |
[in] | synchronize | Synchronize the committed configuration with the underlying K8 Cluster |
[in] | username | Username of the camphor controller |
[in] | project | Project name of the camphor controller |
[in] | controller | DNS name of the camphor controller, typically used from outside the platform |
[in] | port | Port number of the camphor controller, typically used from outside the platform |
[in] | retry | Number of times to retry the command before bailing out |
[in] | wait | Interval in seconds to wait between failed retry attempts |
[in] | timeout | Interval in seconds before timing out |
[in] | ignore | Ignore failed command execution |
[in] | debug | Print debug information |
[in] | verbose | Print verbose information |
Definition at line 365 of file camphor_util.py.
References camphor_api.camphor_util.CAMPHOR_LOG_ERROR(), and Util.cli().
str getDefaultDevicePassword | ( | cls, | |
str | os = environ.get('CAMPHOR_DEVICE_DEFAULT_OS', 'FRR') |
||
) |
Get default password to login to a device.
os | Type of the device OS |
Definition at line 408 of file camphor_util.py.
Referenced by Util.ping().
list[dict] getCamphorItems | ( | cls, | |
str | type, | ||
str | name = '' , |
||
bool | conf = False , |
||
bool | detail = False , |
||
bool | json = False , |
||
bool | yaml = False , |
||
bool | debug = False , |
||
bool | verbose = False , |
||
str | username = 'administrator' , |
||
str | project = 'default' , |
||
str | controller = None , |
||
int | port = None , |
||
int | retry = 0 , |
||
int | wait = 3 , |
||
int | timeout = 0 , |
||
bool | ignore = False |
||
) |
Definition at line 414 of file camphor_util.py.
bool ping | ( | cls, | |
str | from_device_name, | ||
str | from_device, | ||
str | to_device, | ||
DeviceOS | from_device_os = None , |
||
bool | append_ns = True , |
||
int | count = 3 , |
||
str | jump_host = F"sshpass -p '{environ.get('CAMPHOR_PASSWORD', 'rootroot')}' " +
F"ssh -l root {environ.get('CAMPHOR_CONTROLLER')}" , |
||
str | username = 'admin' , |
||
str | project = 'default' , |
||
str | password = environ.get('CAMPHOR_PASSWORD', 'rootroot') , |
||
int | retry = 60 * 2 , |
||
int | wait = 15 , |
||
int | timeout = 30 , |
||
bool | ignore = True |
||
) |
Execute ping command on a device against a particular destination.
[in] | from_device_name | Name of the device from where ping is initiated |
[in] | from_device_os | Type of the Network Operating System (NOS) of the from_device |
[in] | from_device | Address from where ping has to be initiated (sourced) |
[in] | to_device | Address to where ping has to be targeted (destined) |
[in] | append_ns | Should namespace be appended to the from_device |
[in] | count | Number of ping icmp echo packets to send |
[in] | jump_host | Intermediate jump host before ssh to from_device |
[in] | username | Name of the user to ssh into the device |
[in] | password | Password to access the device via ssh |
[in] | project | Name of the project |
[in] | retry | retry command as many times until success |
[in] | wait | wait time in seconds between repeated attempts (until success) |
[in] | timeout | Time after which command should be terminated as failed |
[in] | ignore | Ignore failure and not raise exception? |
Definition at line 455 of file camphor_util.py.
References Util.getDefaultDevicePassword(), and Util.ns().
def mergeDicts | ( | cls, | |
dest, | |||
src | |||
) |
Definition at line 485 of file camphor_util.py.
str appendToCommand | ( | cls, | |
str | cmd, | ||
str | key, | ||
any | value, | ||
bool | quote = False , |
||
bool | multiLine = False |
||
) |
Append key value to a command string.
[in] | cmd | Prefix command string |
[in] | key | Key name of the next command option |
[in] | value | Value of the command key option |
[in] | quote | Whether the value needs to be quoted |
[in] | multiLine | Whether the value is multiline |
Definition at line 504 of file camphor_util.py.
tuple[str, int] uploadImage | ( | cls, | |
str | file, | ||
str | md5sum = '' , |
||
str | camphor_kubernetes_master = 'localhost' , |
||
bool | ignore = False , |
||
int | retry = 6 , |
||
int | wait = 10 , |
||
str | username = 'admin' |
||
) |
Upload a device image onto the camphor networks platform.
[in] | file | Name of the file |
[in] | md5sum | Md5Sum of the file contents |
[in] | camphor_kubernetes_master | Name or IP of the Camphor Kubernetes Cluster master node |
[in] | ignore | Ignore if upload fails instead of raising an exception |
[in] | username | Name of the user |
[in] | retry | retry command as many times until success |
[in] | wait | wait time in seconds between repeated attempts (until success) |
Definition at line 531 of file camphor_util.py.
References camphor_api.camphor_util.CAMPHOR_LOG_WARN().
def getCamphorController | ( | cls, | |
str | controller = None , |
||
int | port = None , |
||
str | username = 'administrator' , |
||
str | project = 'default' |
||
) |
Definition at line 549 of file camphor_util.py.
def render_jinja_template | ( | cls, | |
file, | |||
base64, | |||
destdir = '/var/log/camphor' |
|||
) |
Definition at line 558 of file camphor_util.py.
References camphor_api.camphor_util.CAMPHOR_LOG_ERROR().
def printYaml | ( | cls, | |
str | data, | ||
bool | debug = False |
||
) |
Definition at line 587 of file camphor_util.py.
def is_ipv4_address_in_subnet | ( | cls, | |
address, | |||
subnet | |||
) |
Definition at line 609 of file camphor_util.py.
def is_ipv4_address_valid | ( | cls, | |
address | |||
) |
Definition at line 617 of file camphor_util.py.
def delete_file | ( | cls, | |
f | |||
) |
Definition at line 625 of file camphor_util.py.
def fread | ( | cls, | |
f, | |||
base64 = False |
|||
) |
Definition at line 630 of file camphor_util.py.
def camphor_skip_test | ( | cls, | |
skip_file = '/tmp/.camphor_pytest_skip_until' |
|||
) |
Definition at line 643 of file camphor_util.py.
References camphor_api.camphor_util.CAMPHOR_LOG_INFO().
def mac | ( | cls, | |
separator = ':' |
|||
) |
Definition at line 660 of file camphor_util.py.
def fwrite | ( | cls, | |
str | f, | ||
str | data, | ||
int | mode = None , |
||
bool | append = False |
||
) |
Definition at line 667 of file camphor_util.py.
def delete_lines | ( | cls, | |
file_name | |||
) |
Definition at line 685 of file camphor_util.py.
def grep | ( | cls, | |
file_name, | |||
pattern | |||
) |
Definition at line 698 of file camphor_util.py.
def initializeNbProjectData | ( | cls | ) |
Definition at line 706 of file camphor_util.py.
References camphor_api.camphor_util.CAMPHOR_LOCAL_DIR().
def get_file_from_git | ( | cls, | |
project, | |||
file_name | |||
) |
Definition at line 725 of file camphor_util.py.
def merge_dicts | ( | object | cls, |
dict | d1, | ||
dict | d2 | ||
) |
Definition at line 734 of file camphor_util.py.
def flatten_dict | ( | cls, | |
dikt, | |||
key | |||
) |
Definition at line 738 of file camphor_util.py.
def readFromFile | ( | object | cls, |
str | fl | ||
) |
Definition at line 746 of file camphor_util.py.
def dict_get | ( | cls, | |
dikt, | |||
key, | |||
provider = "" , |
|||
default = "" , |
|||
secret = None , |
|||
base64 = True |
|||
) |
Definition at line 753 of file camphor_util.py.
def encrypt | ( | cls, | |
data, | |||
secret = "%s/.ssh/id_rsa" % environ['HOME'] , |
|||
base64 = True |
|||
) |
Definition at line 778 of file camphor_util.py.
def decrypt | ( | cls, | |
data, | |||
secret = "%s/.ssh/id_rsa" % environ['HOME'] , |
|||
base64 = True |
|||
) |
Definition at line 790 of file camphor_util.py.
|
static |
Definition at line 223 of file camphor_util.py.
|
static |
Definition at line 224 of file camphor_util.py.
|
static |
Definition at line 225 of file camphor_util.py.