Source code for proxysql_tools.proxysql.proxysql

"""ProxySQL classes"""
import json
from contextlib import contextmanager

import pymysql
from pymysql.cursors import DictCursor

from proxysql_tools import LOG, execute
from proxysql_tools.proxysql.exceptions import ProxySQLBackendNotFound, \
    ProxySQLUserNotFound
from proxysql_tools.proxysql.proxysqlbackend import ProxySQLMySQLBackend
from proxysql_tools.proxysql.proxysqlbackendset import ProxySQLMySQLBackendSet

PROXYSQL_CONNECT_TIMEOUT = 20


# noinspection LongLine
[docs]class ProxySQLMySQLUser(object): # pylint: disable=too-many-instance-attributes,too-few-public-methods """ProxySQLMySQLUser describes record in ProxySQL table ``mysql_users``. .. code-block:: mysql CREATE TABLE mysql_users ( username VARCHAR NOT NULL, password VARCHAR, active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1, use_ssl INT CHECK (use_ssl IN (0,1)) NOT NULL DEFAULT 0, default_hostgroup INT NOT NULL DEFAULT 0, default_schema VARCHAR, schema_locked INT CHECK (schema_locked IN (0,1)) NOT NULL DEFAULT 0, transaction_persistent INT CHECK (transaction_persistent IN (0,1)) NOT NULL DEFAULT 0, fast_forward INT CHECK (fast_forward IN (0,1)) NOT NULL DEFAULT 0, backend INT CHECK (backend IN (0,1)) NOT NULL DEFAULT 1, frontend INT CHECK (frontend IN (0,1)) NOT NULL DEFAULT 1, max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 10000, PRIMARY KEY (username, backend), UNIQUE (username, frontend)) :param username: MySQL username to connect to ProxySQL or Galera node. :param password: MySQL password. :param active: Users with active = 0 will be tracked in the database, but will be never loaded in the in-memory data structures. :type active: bool :param use_ssl: Use SSL to connect to MySQL or not :type use_ssl: bool :param default_hostgroup: If there is no matching rule for the queries sent by the users, the traffic it generates is sent to the specified hostgroup_. :param default_schema: The schema to which the connection should change by default. :param schema_locked: not supported yet. :type schema_locked: bool :param transaction_persistent: if this is set for the user with which the MySQL client is connecting to ProxySQL (thus a "frontend" user - see below), transactions started within a hostgroup will remain within that hostgroup regardless of any other rules. :type transaction_persistent: bool :param fast_forward: If set, it bypasses the query processing layer (rewriting, caching) and passes the query directly to the backend server. :type fast_forward: bool :param frontend: If True, this (username, password) pair is used for authenticating to the ProxySQL instance. :type frontend: bool :param backend: If True, this (username, password) pair is used for authenticating to the mysqld servers against any hostgroup. :param max_connections: Maximum number of connection this user can create to MySQL node. .. _hostgroup: http://bit.ly/2rGnT5i """ def __init__(self, username='root', password=None, active=True, # pylint: disable=too-many-arguments use_ssl=False, default_hostgroup=0, default_schema='information_schema', schema_locked=False, transaction_persistent=False, fast_forward=False, backend=True, frontend=True, max_connections=10000): self.username = username self.password = password self.active = bool(int(active)) self.use_ssl = bool(int(use_ssl)) self.default_hostgroup = int(default_hostgroup) self.default_schema = default_schema self.schema_locked = bool(int(schema_locked)) self.transaction_persistent = bool(int(transaction_persistent)) self.fast_forward = bool(int(fast_forward)) self.backend = bool(int(backend)) self.frontend = bool(int(frontend)) self.max_connections = int(max_connections) def __eq__(self, other): return all( ( self.username == other.username, self.password == other.password, self.active == other.active, self.default_hostgroup == other.default_hostgroup, self.default_schema == other.default_schema, self.schema_locked == other.schema_locked, self.transaction_persistent == other.transaction_persistent, self.fast_forward == other.fast_forward, self.backend == other.backend, self.frontend == other.frontend, self.max_connections == other.max_connections ) ) def __ne__(self, other): return not self.__eq__(other)
[docs]class ProxySQL(object): """ ProxySQL describes a single ProxySQL instance. :param host: ProxySQL hostname. :param port: Port on which ProxySQL listens to admin connections. :param user: ProxySQL admin user. :param password: Password for ProxySQL admin. :param socket: Socket to connect to ProxySQL admin interface. """ # noinspection LongLine def __init__(self, host='localhost', port=3306, user='root', # pylint: disable=too-many-arguments password=None, socket=None): self.host = host self.port = int(port) self.user = user self.password = password self.socket = socket
[docs] def ping(self): """Check health of ProxySQL. :return: True if ProxySQL healthy and False otherwise. :rtype: bool""" try: result = self.execute('SELECT 1 AS result') return result[0]['result'] == '1' except pymysql.err.OperationalError: LOG.debug('ProxySQL %s:%d is dead', self.host, self.port) return False
[docs] def execute(self, query, *args): """Execute query in ProxySQL. :param query: Query to execute. :type query: str :return: Query result or None if the query is not supposed to return result :rtype: dict """ with self._connect() as conn: return execute(conn, query, *args)
[docs] def reload_users(self): """ Loads MySQL users from the in-memory database to the runtime data structures. """ self.execute('LOAD MYSQL USERS TO RUNTIME')
[docs] def reload_servers(self): """ Loads MySQL servers from the in-memory database to the runtime data structures. """ self.execute('LOAD MYSQL SERVERS TO RUNTIME')
[docs] def reload_variables(self): """ Loads MySQL variables from the in-memory database to the runtime data structures. """ self.execute('LOAD MYSQL VARIABLES TO RUNTIME')
[docs] def save_users(self): """ Persists the MySQL users from the runtime data structures to the in-memory database. """ self.execute('SAVE MYSQL USERS TO DISK')
[docs] def save_servers(self): """ Persists the MySQL servers from the runtime data structures to the in-memory database. """ self.execute('SAVE MYSQL SERVERS TO DISK')
[docs] def save_variables(self): """ Persists the MySQL variables from the in-memory database to the on-disk database. """ self.execute('SAVE MYSQL VARIABLES TO DISK')
[docs] def reload_runtime(self): """Reload the ProxySQL runtime configuration.""" self.reload_servers() self.reload_users() self.reload_variables()
[docs] def save_runtime(self): """Saves ProxySQL configuration to disk.""" self.save_users() self.save_servers() self.save_variables()
[docs] def get_users(self): """ Get mysql users :return: List of users or empty list :rtype: list(ProxySQLMySQLUser) """ query = "SELECT * FROM mysql_users" result = self.execute(query) users = [] for row in result: kwargs = { 'username': row['username'], 'password': row['password'], 'active': row['active'], 'use_ssl': row['use_ssl'], 'default_hostgroup': row['default_hostgroup'], 'default_schema': row['default_schema'], 'schema_locked': row['schema_locked'], 'transaction_persistent': row['transaction_persistent'], 'fast_forward': row['fast_forward'], 'backend': row['backend'], 'frontend': row['frontend'], 'max_connections': row['max_connections'] } user = ProxySQLMySQLUser(**kwargs) users.append(user) return users
[docs] def get_user(self, username): """ Get user by username :param username: Username :return: User information :rtype: ProxySQLMySQLUser :raise: ProxySQLUserNotFound """ result = self.execute("SELECT * FROM mysql_users " "WHERE username = '{username}'" .format(username=username)) if not result: raise ProxySQLUserNotFound else: row = result[0] kwargs = { 'username': row['username'], 'password': row['password'], 'active': row['active'], 'use_ssl': row['use_ssl'], 'default_hostgroup': row['default_hostgroup'], 'default_schema': row['default_schema'], 'schema_locked': row['schema_locked'], 'transaction_persistent': row['transaction_persistent'], 'fast_forward': row['fast_forward'], 'backend': row['backend'], 'frontend': row['frontend'], 'max_connections': row['max_connections'] } user = ProxySQLMySQLUser(**kwargs) return user
[docs] def add_user(self, user): """ Add MySQL user :param user: user for add :type user: ProxySQLMySQLUser """ kwargs = { 'username': user.username, 'password': user.password, 'active': int(user.active), 'use_ssl': int(user.use_ssl), 'default_hostgroup': int(user.default_hostgroup), 'default_schema': user.default_schema, 'schema_locked': int(user.schema_locked), 'transaction_persistent': int(user.transaction_persistent), 'fast_forward': int(user.fast_forward), 'backend': int(user.backend), 'frontend': int(user.frontend), 'max_connections': user.max_connections } query = "REPLACE INTO mysql_users(" \ "`username`, `password`, `active`, " \ "`use_ssl`, `default_hostgroup`, `default_schema`, " \ "`schema_locked`, `transaction_persistent`, `fast_forward`, " \ "`backend`, `frontend`, `max_connections`) " \ "VALUES(" \ "'{username}', '{password}', {active}, " \ "{use_ssl}, {default_hostgroup}, '{default_schema}', " \ "{schema_locked}, {transaction_persistent}, {fast_forward}, " \ "{backend}, {frontend}, {max_connections})" \ "".format(**kwargs) self.execute(query) self.reload_users() self.save_users()
[docs] def delete_user(self, username): """ Delete MySQL user :param username: username of user :type username: str """ self.execute("DELETE FROM mysql_users WHERE username='{username}'" .format(username=username)) self.reload_users() self.save_users()
[docs] def register_backend(self, backend): """Register Galera node in ProxySQL :param backend: Galera node. :type backend: ProxySQLMySQLBackend """ comment = self._get_comment(backend) kwargs = { 'hostgroup_id': int(backend.hostgroup_id), 'hostname': pymysql.escape_string(backend.hostname), 'port': int(backend.port), 'status': backend.status, 'weight': int(backend.weight), 'compression': int(backend.compression), 'max_connections': int(backend.max_connections), 'max_replication_lag': int(backend.max_replication_lag), 'use_ssl': int(backend.use_ssl), 'max_latency_ms': int(backend.max_latency_ms), 'comment': comment } query = "REPLACE INTO mysql_servers(" \ "`hostgroup_id`, `hostname`, `port`, " \ "`status`, `weight`, `compression`, " \ "`max_connections`, `max_replication_lag`, `use_ssl`, " \ "`max_latency_ms`, `comment`) " \ "VALUES(" \ "{hostgroup_id}, '{hostname}', {port}, " \ "'{status}', {weight}, {compression}, " \ "{max_connections}, {max_replication_lag}, {use_ssl}, " \ "{max_latency_ms}, '{comment}')" \ "".format(**kwargs) self.execute(query) self.reload_servers()
[docs] def update_backend(self, backend): """ Updates backend in ProxySQL table mysql_servers. Currently synonym of register_backend(). """ self.register_backend(backend)
[docs] def deregister_backend(self, backend): """ Deregister a Galera node from ProxySQL :param backend: Galera node. :type backend: ProxySQLMySQLBackend """ query = "DELETE FROM mysql_servers " \ "WHERE" \ " hostgroup_id={hostgroup_id}" \ " AND hostname='{hostname}'" \ " AND port={port}" \ "".format(hostgroup_id=int(backend.hostgroup_id), hostname=pymysql.escape_string(backend.hostname), port=int(backend.port)) self.execute(query) self.reload_runtime()
[docs] def find_backends(self, hostgroup_id=None, status=None): """ Find backends from mysql_servers. If hostgroup_id or status is given it will filter out backends based on that criteria. :param hostgroup_id: writer hostgroup_id :type hostgroup_id: int :param status: Look only for backends in this status :type status: BackendStatus :return: Writer MySQL backend or None if doesn't exist :rtype: ProxySQLMySQLBackendSet :raise: ProxySQLBackendNotFound """ query = 'SELECT ' \ '`hostgroup_id`, `hostname`, `port`, ' \ '`status`, `weight`, `compression`, ' \ '`max_connections`, `max_replication_lag`, `use_ssl`, ' \ '`max_latency_ms`, `comment` ' \ 'FROM `mysql_servers` ' \ 'WHERE 1=1' if hostgroup_id: query += ' AND hostgroup_id = %d' % hostgroup_id if status: query += " AND status = '%s'" % status result = self.execute(query) backends = ProxySQLMySQLBackendSet() for row in result: kwargs = { 'hostgroup_id': row['hostgroup_id'], 'port': row['port'], 'status': row['status'], 'weight': row['weight'], 'compression': row['compression'], 'max_connections': row['max_connections'], 'max_replication_lag': row['max_replication_lag'], 'use_ssl': row['use_ssl'], 'max_latency_ms': row['max_latency_ms'], 'comment': row['comment'] } backend = ProxySQLMySQLBackend(row['hostname'], **kwargs) backends.add(backend) if backends: return backends else: raise ProxySQLBackendNotFound('Can not find any backends')
[docs] def backend_registered(self, backend): """ Check if backend is registered. :param backend: ProxySQLMySQLBackend instance :return: True if registered, False otherwise :rtype: bool """ query = "SELECT `hostgroup_id`, `hostname`, `port` " \ "FROM `mysql_servers` " \ "WHERE hostgroup_id = %d " \ "AND `hostname` = '%s' " \ "AND `port` = %s" % \ ( int(backend.hostgroup_id), pymysql.escape_string(backend.hostname), int(backend.port) ) result = self.execute(query) return result != ()
@staticmethod def _get_comment(backend): """Generate comment in mysql_servers for ProxySQL""" status = { 'role': backend.role, 'admin_status': backend.admin_status } return json.dumps(status, sort_keys=True) @contextmanager def _connect(self): """Connect to ProxySQL admin interface.""" connect_args = { 'user': self.user, 'passwd': self.password, 'connect_timeout': PROXYSQL_CONNECT_TIMEOUT, 'cursorclass': DictCursor } if self.socket is not None: connect_args['unix_socket'] = self.socket else: connect_args['host'] = self.host connect_args['port'] = self.port conn = pymysql.connect(**connect_args) yield conn conn.close()