"""ProxySQL classes"""
import json
from contextlib import contextmanager
import pymysql
from pymysql.cursors import DictCursor
from proxysql_tools import LOG, execute
from proxysql_tools.proxysql.exceptions import ProxySQLBackendNotFound, \
ProxySQLUserNotFound
from proxysql_tools.proxysql.proxysqlbackend import ProxySQLMySQLBackend
from proxysql_tools.proxysql.proxysqlbackendset import ProxySQLMySQLBackendSet
PROXYSQL_CONNECT_TIMEOUT = 20
# noinspection LongLine
[docs]class ProxySQLMySQLUser(object): # pylint: disable=too-many-instance-attributes,too-few-public-methods
"""ProxySQLMySQLUser describes record in ProxySQL table ``mysql_users``.
.. code-block:: mysql
CREATE TABLE mysql_users (
username VARCHAR NOT NULL,
password VARCHAR,
active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1,
use_ssl INT CHECK (use_ssl IN (0,1)) NOT NULL DEFAULT 0,
default_hostgroup INT NOT NULL DEFAULT 0,
default_schema VARCHAR,
schema_locked INT CHECK (schema_locked IN (0,1)) NOT NULL DEFAULT 0,
transaction_persistent INT CHECK (transaction_persistent IN (0,1))
NOT NULL DEFAULT 0,
fast_forward INT CHECK (fast_forward IN (0,1)) NOT NULL DEFAULT 0,
backend INT CHECK (backend IN (0,1)) NOT NULL DEFAULT 1,
frontend INT CHECK (frontend IN (0,1)) NOT NULL DEFAULT 1,
max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 10000,
PRIMARY KEY (username, backend),
UNIQUE (username, frontend))
:param username: MySQL username to connect to ProxySQL or Galera node.
:param password: MySQL password.
:param active: Users with active = 0 will be tracked in the database,
but will be never loaded in the in-memory data structures.
:type active: bool
:param use_ssl: Use SSL to connect to MySQL or not
:type use_ssl: bool
:param default_hostgroup: If there is no matching rule for the queries sent by
the users, the traffic it generates is sent to the specified hostgroup_.
:param default_schema: The schema to which the connection should change
by default.
:param schema_locked: not supported yet.
:type schema_locked: bool
:param transaction_persistent: if this is set for the user with which the MySQL
client is connecting to ProxySQL (thus a "frontend" user - see below),
transactions started within a hostgroup will remain within that hostgroup
regardless of any other rules.
:type transaction_persistent: bool
:param fast_forward: If set, it bypasses the query processing layer (rewriting,
caching) and passes the query directly to the backend server.
:type fast_forward: bool
:param frontend: If True, this (username, password) pair is used for
authenticating to the ProxySQL instance.
:type frontend: bool
:param backend: If True, this (username, password) pair is used for
authenticating to the mysqld servers against any hostgroup.
:param max_connections: Maximum number of connection this user
can create to MySQL node.
.. _hostgroup: http://bit.ly/2rGnT5i
"""
def __init__(self, username='root', password=None, active=True, # pylint: disable=too-many-arguments
use_ssl=False,
default_hostgroup=0, default_schema='information_schema',
schema_locked=False, transaction_persistent=False,
fast_forward=False, backend=True, frontend=True,
max_connections=10000):
self.username = username
self.password = password
self.active = bool(int(active))
self.use_ssl = bool(int(use_ssl))
self.default_hostgroup = int(default_hostgroup)
self.default_schema = default_schema
self.schema_locked = bool(int(schema_locked))
self.transaction_persistent = bool(int(transaction_persistent))
self.fast_forward = bool(int(fast_forward))
self.backend = bool(int(backend))
self.frontend = bool(int(frontend))
self.max_connections = int(max_connections)
def __eq__(self, other):
return all(
(
self.username == other.username,
self.password == other.password,
self.active == other.active,
self.default_hostgroup == other.default_hostgroup,
self.default_schema == other.default_schema,
self.schema_locked == other.schema_locked,
self.transaction_persistent ==
other.transaction_persistent,
self.fast_forward == other.fast_forward,
self.backend == other.backend,
self.frontend == other.frontend,
self.max_connections == other.max_connections
)
)
def __ne__(self, other):
return not self.__eq__(other)
[docs]class ProxySQL(object):
"""
ProxySQL describes a single ProxySQL instance.
:param host: ProxySQL hostname.
:param port: Port on which ProxySQL listens to admin connections.
:param user: ProxySQL admin user.
:param password: Password for ProxySQL admin.
:param socket: Socket to connect to ProxySQL admin interface.
"""
# noinspection LongLine
def __init__(self, host='localhost', port=3306, user='root', # pylint: disable=too-many-arguments
password=None, socket=None):
self.host = host
self.port = int(port)
self.user = user
self.password = password
self.socket = socket
[docs] def ping(self):
"""Check health of ProxySQL.
:return: True if ProxySQL healthy and False otherwise.
:rtype: bool"""
try:
result = self.execute('SELECT 1 AS result')
return result[0]['result'] == '1'
except pymysql.err.OperationalError:
LOG.debug('ProxySQL %s:%d is dead', self.host, self.port)
return False
[docs] def execute(self, query, *args):
"""Execute query in ProxySQL.
:param query: Query to execute.
:type query: str
:return: Query result or None if the query is not supposed
to return result
:rtype: dict
"""
with self._connect() as conn:
return execute(conn, query, *args)
[docs] def reload_users(self):
"""
Loads MySQL users from the in-memory database
to the runtime data structures.
"""
self.execute('LOAD MYSQL USERS TO RUNTIME')
[docs] def reload_servers(self):
"""
Loads MySQL servers from the in-memory database
to the runtime data structures.
"""
self.execute('LOAD MYSQL SERVERS TO RUNTIME')
[docs] def reload_variables(self):
"""
Loads MySQL variables from the in-memory database
to the runtime data structures.
"""
self.execute('LOAD MYSQL VARIABLES TO RUNTIME')
[docs] def save_users(self):
"""
Persists the MySQL users from the runtime data structures
to the in-memory database.
"""
self.execute('SAVE MYSQL USERS TO DISK')
[docs] def save_servers(self):
"""
Persists the MySQL servers from the runtime data structures
to the in-memory database.
"""
self.execute('SAVE MYSQL SERVERS TO DISK')
[docs] def save_variables(self):
"""
Persists the MySQL variables from the in-memory database
to the on-disk database.
"""
self.execute('SAVE MYSQL VARIABLES TO DISK')
[docs] def reload_runtime(self):
"""Reload the ProxySQL runtime configuration."""
self.reload_servers()
self.reload_users()
self.reload_variables()
[docs] def save_runtime(self):
"""Saves ProxySQL configuration to disk."""
self.save_users()
self.save_servers()
self.save_variables()
[docs] def get_users(self):
"""
Get mysql users
:return: List of users or empty list
:rtype: list(ProxySQLMySQLUser)
"""
query = "SELECT * FROM mysql_users"
result = self.execute(query)
users = []
for row in result:
kwargs = {
'username': row['username'],
'password': row['password'],
'active': row['active'],
'use_ssl': row['use_ssl'],
'default_hostgroup': row['default_hostgroup'],
'default_schema': row['default_schema'],
'schema_locked': row['schema_locked'],
'transaction_persistent': row['transaction_persistent'],
'fast_forward': row['fast_forward'],
'backend': row['backend'],
'frontend': row['frontend'],
'max_connections': row['max_connections']
}
user = ProxySQLMySQLUser(**kwargs)
users.append(user)
return users
[docs] def get_user(self, username):
"""
Get user by username
:param username: Username
:return: User information
:rtype: ProxySQLMySQLUser
:raise: ProxySQLUserNotFound
"""
result = self.execute("SELECT * FROM mysql_users "
"WHERE username = '{username}'"
.format(username=username))
if not result:
raise ProxySQLUserNotFound
else:
row = result[0]
kwargs = {
'username': row['username'],
'password': row['password'],
'active': row['active'],
'use_ssl': row['use_ssl'],
'default_hostgroup': row['default_hostgroup'],
'default_schema': row['default_schema'],
'schema_locked': row['schema_locked'],
'transaction_persistent': row['transaction_persistent'],
'fast_forward': row['fast_forward'],
'backend': row['backend'],
'frontend': row['frontend'],
'max_connections': row['max_connections']
}
user = ProxySQLMySQLUser(**kwargs)
return user
[docs] def add_user(self, user):
"""
Add MySQL user
:param user: user for add
:type user: ProxySQLMySQLUser
"""
kwargs = {
'username': user.username,
'password': user.password,
'active': int(user.active),
'use_ssl': int(user.use_ssl),
'default_hostgroup': int(user.default_hostgroup),
'default_schema': user.default_schema,
'schema_locked': int(user.schema_locked),
'transaction_persistent': int(user.transaction_persistent),
'fast_forward': int(user.fast_forward),
'backend': int(user.backend),
'frontend': int(user.frontend),
'max_connections': user.max_connections
}
query = "REPLACE INTO mysql_users(" \
"`username`, `password`, `active`, " \
"`use_ssl`, `default_hostgroup`, `default_schema`, " \
"`schema_locked`, `transaction_persistent`, `fast_forward`, " \
"`backend`, `frontend`, `max_connections`) " \
"VALUES(" \
"'{username}', '{password}', {active}, " \
"{use_ssl}, {default_hostgroup}, '{default_schema}', " \
"{schema_locked}, {transaction_persistent}, {fast_forward}, " \
"{backend}, {frontend}, {max_connections})" \
"".format(**kwargs)
self.execute(query)
self.reload_users()
self.save_users()
[docs] def delete_user(self, username):
"""
Delete MySQL user
:param username: username of user
:type username: str
"""
self.execute("DELETE FROM mysql_users WHERE username='{username}'"
.format(username=username))
self.reload_users()
self.save_users()
[docs] def register_backend(self, backend):
"""Register Galera node in ProxySQL
:param backend: Galera node.
:type backend: ProxySQLMySQLBackend
"""
comment = self._get_comment(backend)
kwargs = {
'hostgroup_id': int(backend.hostgroup_id),
'hostname': pymysql.escape_string(backend.hostname),
'port': int(backend.port),
'status': backend.status,
'weight': int(backend.weight),
'compression': int(backend.compression),
'max_connections': int(backend.max_connections),
'max_replication_lag': int(backend.max_replication_lag),
'use_ssl': int(backend.use_ssl),
'max_latency_ms': int(backend.max_latency_ms),
'comment': comment
}
query = "REPLACE INTO mysql_servers(" \
"`hostgroup_id`, `hostname`, `port`, " \
"`status`, `weight`, `compression`, " \
"`max_connections`, `max_replication_lag`, `use_ssl`, " \
"`max_latency_ms`, `comment`) " \
"VALUES(" \
"{hostgroup_id}, '{hostname}', {port}, " \
"'{status}', {weight}, {compression}, " \
"{max_connections}, {max_replication_lag}, {use_ssl}, " \
"{max_latency_ms}, '{comment}')" \
"".format(**kwargs)
self.execute(query)
self.reload_servers()
[docs] def update_backend(self, backend):
"""
Updates backend in ProxySQL table mysql_servers.
Currently synonym of register_backend().
"""
self.register_backend(backend)
[docs] def deregister_backend(self, backend):
"""
Deregister a Galera node from ProxySQL
:param backend: Galera node.
:type backend: ProxySQLMySQLBackend
"""
query = "DELETE FROM mysql_servers " \
"WHERE" \
" hostgroup_id={hostgroup_id}" \
" AND hostname='{hostname}'" \
" AND port={port}" \
"".format(hostgroup_id=int(backend.hostgroup_id),
hostname=pymysql.escape_string(backend.hostname),
port=int(backend.port))
self.execute(query)
self.reload_runtime()
[docs] def find_backends(self, hostgroup_id=None, status=None):
"""
Find backends from mysql_servers. If hostgroup_id or status is given
it will filter out backends based on that criteria.
:param hostgroup_id: writer hostgroup_id
:type hostgroup_id: int
:param status: Look only for backends in this status
:type status: BackendStatus
:return: Writer MySQL backend or None if doesn't exist
:rtype: ProxySQLMySQLBackendSet
:raise: ProxySQLBackendNotFound
"""
query = 'SELECT ' \
'`hostgroup_id`, `hostname`, `port`, ' \
'`status`, `weight`, `compression`, ' \
'`max_connections`, `max_replication_lag`, `use_ssl`, ' \
'`max_latency_ms`, `comment` ' \
'FROM `mysql_servers` ' \
'WHERE 1=1'
if hostgroup_id:
query += ' AND hostgroup_id = %d' % hostgroup_id
if status:
query += " AND status = '%s'" % status
result = self.execute(query)
backends = ProxySQLMySQLBackendSet()
for row in result:
kwargs = {
'hostgroup_id': row['hostgroup_id'],
'port': row['port'],
'status': row['status'],
'weight': row['weight'],
'compression': row['compression'],
'max_connections': row['max_connections'],
'max_replication_lag': row['max_replication_lag'],
'use_ssl': row['use_ssl'],
'max_latency_ms': row['max_latency_ms'],
'comment': row['comment']
}
backend = ProxySQLMySQLBackend(row['hostname'], **kwargs)
backends.add(backend)
if backends:
return backends
else:
raise ProxySQLBackendNotFound('Can not find any backends')
[docs] def backend_registered(self, backend):
"""
Check if backend is registered.
:param backend: ProxySQLMySQLBackend instance
:return: True if registered, False otherwise
:rtype: bool
"""
query = "SELECT `hostgroup_id`, `hostname`, `port` " \
"FROM `mysql_servers` " \
"WHERE hostgroup_id = %d " \
"AND `hostname` = '%s' " \
"AND `port` = %s" % \
(
int(backend.hostgroup_id),
pymysql.escape_string(backend.hostname),
int(backend.port)
)
result = self.execute(query)
return result != ()
@staticmethod
def _get_comment(backend):
"""Generate comment in mysql_servers for ProxySQL"""
status = {
'role': backend.role,
'admin_status': backend.admin_status
}
return json.dumps(status, sort_keys=True)
@contextmanager
def _connect(self):
"""Connect to ProxySQL admin interface."""
connect_args = {
'user': self.user,
'passwd': self.password,
'connect_timeout': PROXYSQL_CONNECT_TIMEOUT,
'cursorclass': DictCursor
}
if self.socket is not None:
connect_args['unix_socket'] = self.socket
else:
connect_args['host'] = self.host
connect_args['port'] = self.port
conn = pymysql.connect(**connect_args)
yield conn
conn.close()