Source code for proxysql_tools.proxysql.proxysql

"""ProxySQL classes"""
from contextlib import contextmanager

import pymysql
from pymysql.cursors import DictCursor

from proxysql_tools import LOG, execute
from proxysql_tools.proxysql.exceptions import ProxySQLBackendNotFound, ProxySQLUserNotFound

PROXYSQL_CONNECT_TIMEOUT = 20


[docs]class BackendStatus(object): # pylint: disable=too-few-public-methods """Status of ProxySQL backend""" online = 'ONLINE' shunned = 'SHUNNED' offline_soft = 'OFFLINE_SOFT' offline_hard = 'OFFLINE_HARD'
[docs]class ProxySQLMySQLBackend(object): # pylint: disable=too-many-instance-attributes,too-few-public-methods """ProxySQLMySQLBackend describes record in ProxySQL table ``mysql_servers``. .. code-block:: mysql CREATE TABLE mysql_servers ( hostgroup_id INT NOT NULL DEFAULT 0, hostname VARCHAR NOT NULL, port INT NOT NULL DEFAULT 3306, status VARCHAR CHECK (UPPER(status) IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE', weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1, compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0, max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000, max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0, use_ssl INT CHECK (use_ssl IN(0,1)) NOT NULL DEFAULT 0, max_latency_ms INT UNSIGNED CHECK (max_latency_ms>=0) NOT NULL DEFAULT 0, comment VARCHAR NOT NULL DEFAULT '', PRIMARY KEY (hostgroup_id, hostname, port) ) """ def __init__(self, hostname, hostgroup_id=0, port=3306, # pylint: disable=too-many-arguments status=BackendStatus.online, weight=1, compression=0, max_connections=10000, max_replication_lag=0, use_ssl=False, max_latency_ms=0, comment=None): self.hostname = hostname self.hostgroup_id = int(hostgroup_id) self.port = int(port) self.status = status self.weight = int(weight) self.compression = int(compression) self.max_connections = int(max_connections) self.max_replication_lag = int(max_replication_lag) self.use_ssl = bool(int(use_ssl)) self.max_latency_ms = int(max_latency_ms) self.comment = comment self._connection = None def __eq__(self, other): try: return self.hostgroup_id == other.hostgroup_id and \ self.hostname == other.hostname and \ self.port == other.port except AttributeError: return False def __ne__(self, other): return not self.__eq__(other) def __repr__(self): return "%d__%s__%d" % (self.hostgroup_id, self.hostname, self.port) def __str__(self): return "hostgroup_id={hostgroup_id}, " \ "hostname={hostname}, " \ "port={port}, " \ "status={status}, " \ "weight={weight}, " \ "compression={compression}, " \ "max_connections={max_connections}, " \ "max_replication_lag={max_replication_lag}, " \ "use_ssl={use_ssl}, " \ "max_latency_ms={max_latency_ms}, " \ "comment={comment}".format( hostgroup_id=self.hostgroup_id, hostname=self.hostname, port=self.port, status=self.status, weight=self.weight, compression=self.compression, max_connections=self.max_connections, max_replication_lag=self.max_replication_lag, use_ssl=self.use_ssl, max_latency_ms=self.max_latency_ms, comment=self.comment)
[docs] def connect(self, username, password): """ Make a MySQL connection to the backend. :param username: MySQL user. :param password: MySQL password. """ self._connection = pymysql.connect(host=self.hostname, port=self.port, user=username, passwd=password, cursorclass=DictCursor)
[docs] def execute(self, query, *args): """Execute query in MySQL Backend. :param query: Query to execute. :type query: str :return: Query result or None if the query is not supposed to return result :rtype: dict """ return execute(self._connection, query, *args)
[docs]class ProxySQLMySQLUser(object): # pylint: disable=too-many-instance-attributes,too-few-public-methods """ProxySQLMySQLUser describes record in ProxySQL table ``mysql_users``. .. code-block:: mysql CREATE TABLE mysql_users ( username VARCHAR NOT NULL, password VARCHAR, active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1, use_ssl INT CHECK (use_ssl IN (0,1)) NOT NULL DEFAULT 0, default_hostgroup INT NOT NULL DEFAULT 0, default_schema VARCHAR, schema_locked INT CHECK (schema_locked IN (0,1)) NOT NULL DEFAULT 0, transaction_persistent INT CHECK (transaction_persistent IN (0,1)) NOT NULL DEFAULT 0, fast_forward INT CHECK (fast_forward IN (0,1)) NOT NULL DEFAULT 0, backend INT CHECK (backend IN (0,1)) NOT NULL DEFAULT 1, frontend INT CHECK (frontend IN (0,1)) NOT NULL DEFAULT 1, max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 10000, PRIMARY KEY (username, backend), UNIQUE (username, frontend)) :param username: MySQL username to connect to ProxySQL or Galera node. :param password: MySQL password. :param active: Users with active = 0 will be tracked in the database, but will be never loaded in the in-memory data structures. :type active: bool :param use_ssl: Use SSL to connect to MySQL or not :type use_ssl: bool :param default_hostgroup: If there is no matching rule for the queries sent by the users, the traffic it generates is sent to the specified hostgroup_. :param default_schema: The schema to which the connection should change by default. :param schema_locked: not supported yet. :type schema_locked: bool :param transaction_persistent: if this is set for the user with which the MySQL client is connecting to ProxySQL (thus a "frontend" user - see below), transactions started within a hostgroup will remain within that hostgroup regardless of any other rules. :type transaction_persistent: bool :param fast_forward: If set, it bypasses the query processing layer (rewriting, caching) and passes the query directly to the backend server. :type fast_forward: bool :param frontend: If True, this (username, password) pair is used for authenticating to the ProxySQL instance. :type frontend: bool :param backend: If True, this (username, password) pair is used for authenticating to the mysqld servers against any hostgroup. :param max_connections: Maximum number of connection this user can create to MySQL node. .. _hostgroup: http://bit.ly/2rGnT5i """ def __init__(self, username='root', password=None, active=True, use_ssl=False, # pylint: disable=too-many-arguments default_hostgroup=0, default_schema='information_schema', schema_locked=False, transaction_persistent=False, fast_forward=False, backend=True, frontend=True, max_connections=10000): self.username = username self.password = password self.active = bool(int(active)) self.use_ssl = bool(int(use_ssl)) self.default_hostgroup = int(default_hostgroup) self.default_schema = default_schema self.schema_locked = bool(int(schema_locked)) self.transaction_persistent = bool(int(transaction_persistent)) self.fast_forward = bool(int(fast_forward)) self.backend = bool(int(backend)) self.frontend = bool(int(frontend)) self.max_connections = int(max_connections) def __eq__(self, other): try: return self.username == other.username and \ self.password == other.password and \ self.active == other.active and \ self.default_hostgroup == other.default_hostgroup and \ self.default_schema == other.default_schema and \ self.schema_locked == other.schema_locked and \ self.transaction_persistent == other.transaction_persistent and \ self.fast_forward == other.fast_forward and \ self.backend == other.backend and \ self.frontend == other.frontend and \ self.max_connections == other.max_connections except AttributeError: return False def __ne__(self, other): return not self.__eq__(other)
[docs]class ProxySQL(object): """ ProxySQL describes a single ProxySQL instance. :param host: ProxySQL hostname. :param port: Port on which ProxySQL listens to admin connections. :param user: ProxySQL admin user. :param password: Password for ProxySQL admin. :param socket: Socket to connect to ProxySQL admin interface. """ def __init__(self, host='localhost', port=3306, user='root', # pylint: disable=too-many-arguments password=None, socket=None): self.host = host self.port = int(port) self.user = user self.password = password self.socket = socket
[docs] def ping(self): """Check health of ProxySQL. :return: True if ProxySQL healthy and False otherwise. :rtype: bool""" try: result = self.execute('SELECT 1 AS result') return result[0]['result'] == '1' except pymysql.err.OperationalError: LOG.debug('ProxySQL %s:%d is dead', self.host, self.port) return False
[docs] def execute(self, query, *args): """Execute query in ProxySQL. :param query: Query to execute. :type query: str :return: Query result or None if the query is not supposed to return result :rtype: dict """ with self._connect() as conn: return execute(conn, query, *args)
[docs] def reload_runtime(self): """Reload the ProxySQL runtime configuration.""" self.execute('LOAD MYSQL SERVERS TO RUNTIME') self.execute('LOAD MYSQL USERS TO RUNTIME') self.execute('LOAD MYSQL VARIABLES TO RUNTIME')
[docs] def save_user(self): """Save user to on-disk database""" self.execute('SAVE MYSQL USERS TO DISK')
[docs] def get_users(self): """ Get mysql users :return: List of users or empty list :rtype: list(ProxySQLMySQLUser) """ query = "SELECT * FROM mysql_users;" result = self.execute(query) users = [] for row in result: user = ProxySQLMySQLUser(username=row['username'], password=row['password'], active=row['active'], use_ssl=row['use_ssl'], default_hostgroup=row['default_hostgroup'], default_schema=row['default_schema'], schema_locked=row['schema_locked'], transaction_persistent=row['transaction_persistent'], fast_forward=row['fast_forward'], backend=row['backend'], frontend=row['frontend'], max_connections=row['max_connections']) users.append(user) return users
[docs] def get_user(self, username): """ Get user by username :param username: Username :return: User information :rtype: ProxySQLMySQLUser :raise: ProxySQLUserNotFound """ result = self.execute("SELECT * FROM mysql_users WHERE username = '{username}'" .format(username=username)) if not result: raise ProxySQLUserNotFound else: row = result[0] user = ProxySQLMySQLUser(username=row['username'], password=row['password'], active=row['active'], use_ssl=row['use_ssl'], default_hostgroup=row['default_hostgroup'], default_schema=row['default_schema'], schema_locked=row['schema_locked'], transaction_persistent=row['transaction_persistent'], fast_forward=row['fast_forward'], backend=row['backend'], frontend=row['frontend'], max_connections=row['max_connections']) return user
[docs] def add_user(self, user): """ Add MySQL user :param user: user for add :type user: ProxySQLMySQLUser """ query = "REPLACE INTO mysql_users(`username`, `password`, `active`, " \ "`use_ssl`, `default_hostgroup`, `default_schema`, `schema_locked`, " \ "`transaction_persistent`, `fast_forward`, `backend`, `frontend`, " \ "`max_connections`) " \ "VALUES('{username}', '{password}', {active}, {use_ssl}, " \ "{default_hostgroup}, '{default_schema}', {schema_locked}, " \ "{transaction_persistent}, {fast_forward}, {backend}, {frontend}, " \ "{max_connections})" \ "".format(username=user.username, password=user.password, active=int(user.active), use_ssl=int(user.use_ssl), default_hostgroup=int(user.default_hostgroup), default_schema=user.default_schema, schema_locked=int(user.schema_locked), transaction_persistent=int(user.transaction_persistent), fast_forward=int(user.fast_forward), backend=int(user.backend), frontend=int(user.frontend), max_connections=user.max_connections) self.execute(query) self.reload_runtime() self.save_user()
[docs] def delete_user(self, username): """ Delete MySQL user :param username: username of user :type username: str """ self.execute("DELETE FROM mysql_users WHERE username='{username}'" .format(username=username)) self.reload_runtime() self.save_user()
[docs] def register_backend(self, backend): """Register Galera node in ProxySQL :param backend: Galera node. :type backend: ProxySQLMySQLBackend """ if backend.comment: comment = "'%s'" % pymysql.escape_string(backend.comment) else: comment = 'NULL' query = "REPLACE INTO mysql_servers(`hostgroup_id`," \ " `hostname`, `port`," \ " `status`, `weight`, `compression`, `max_connections`," \ " `max_replication_lag`, `use_ssl`, `max_latency_ms`," \ " `comment`) " \ "VALUES({hostgroup_id}, '{hostname}', {port}," \ " '{status}', {weight}, {compression}, {max_connections}," \ " {max_replication_lag}, {use_ssl}, {max_latency_ms}," \ " {comment})" \ "".format(hostgroup_id=int(backend.hostgroup_id), hostname=pymysql.escape_string(backend.hostname), port=int(backend.port), status=pymysql.escape_string(backend.status), weight=int(backend.weight), compression=int(backend.compression), max_connections=int(backend.max_connections), max_replication_lag=int(backend.max_replication_lag), use_ssl=int(backend.use_ssl), max_latency_ms=int(backend.max_latency_ms), comment=comment) self.execute(query) self.reload_runtime()
[docs] def deregister_backend(self, backend): """ Deregister a Galera node from ProxySQL :param backend: Galera node. :type backend: ProxySQLMySQLBackend """ query = "DELETE FROM mysql_servers WHERE hostgroup_id={hostgroup_id}" \ " AND hostname='{hostname}'" \ " AND port={port}" \ "".format(hostgroup_id=int(backend.hostgroup_id), hostname=pymysql.escape_string(backend.hostname), port=int(backend.port)) self.execute(query) self.reload_runtime()
[docs] def find_backends(self, hostgroup_id, status=None): """ Get writer from mysql_servers :param hostgroup_id: writer hostgroup_id :type hostgroup_id: int :param status: Look only for backends in this status :type status: BackendStatus :return: Writer MySQL backend or None if doesn't exist :rtype: list(ProxySQLMySQLBackend) :raise: ProxySQLBackendNotFound """ result = self.execute('SELECT `hostgroup_id`, `hostname`, ' '`port`, `status`, `weight`, `compression`, ' '`max_connections`, `max_replication_lag`, ' '`use_ssl`, `max_latency_ms`, `comment`' ' FROM `mysql_servers`' ' WHERE hostgroup_id = %s', hostgroup_id) backends = [] for row in result: backend = ProxySQLMySQLBackend(row['hostname'], hostgroup_id=row['hostgroup_id'], port=row['port'], status=row['status'], weight=row['weight'], compression=row['compression'], max_connections= row['max_connections'], max_replication_lag= row['max_replication_lag'], use_ssl=row['use_ssl'], max_latency_ms= row['max_latency_ms'], comment=row['comment']) if status and backend.status != status: continue backends.append(backend) if backends: return backends else: raise ProxySQLBackendNotFound('Can not find any backends')
[docs] def backend_registered(self, backend): """ Check if backend is registered. :param backend: ProxySQLMySQLBackend instance :return: True if registered, False otherwise :rtype: bool """ result = self.execute('SELECT `hostgroup_id`, `hostname`, ' '`port`' ' FROM `mysql_servers`' ' WHERE hostgroup_id = %s ' ' AND `hostname` = %s ' ' AND `port` = %s', ( backend.hostgroup_id, backend.hostname, backend.port )) return result != ()
[docs] def set_status(self, backend, status): """Update status of a backend in ProxySQL""" self.execute('UPDATE `mysql_servers` SET `status` = %s ' ' WHERE hostgroup_id = %s ' ' AND `hostname` = %s ' ' AND `port` = %s', ( status, backend.hostgroup_id, backend.hostname, backend.port )) self.reload_runtime()
@contextmanager def _connect(self): """Connect to ProxySQL admin interface.""" if self.socket is not None: conn = pymysql.connect(unix_socket=self.socket, user=self.user, passwd=self.password, connect_timeout=PROXYSQL_CONNECT_TIMEOUT, cursorclass=DictCursor) else: conn = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.password, connect_timeout=PROXYSQL_CONNECT_TIMEOUT, cursorclass=DictCursor) yield conn conn.close()