如何更好地连接数据库


在 SQLAlchemy 中如何更好地使用 create_engine 方法连接数据库。

如何更好地连接数据库


1. 问题起因

该问题一直没有在我们本地复现

问题的起因还是因为我在做高可用方法的时候,使用 TCP 代理数据库的情况,将连接数据库的请求通过 Nginxstream 模块进行了转发。目的是为了,在发生故障的时候可以及时进行数据库的切换,减少服务中断时间。但是就是因为这个操作,导致我们的系统登录他们内部的用户系统昨晚认证之后,请求数据库有问题。一直都没有解决掉。

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
    context)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 508, in do_execute
    cursor.execute(statement, parameters)
psycopg2.OperationalError: server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.

The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/tornado/web.py", line 1541, in _execute
    result = method(*self.path_args, **self.path_kwargs)
  ......
l/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 508, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
 [SQL: 'SELECT anon_1.user_id AS anon_1_user_id, ......
(Background on this error at: http://sqlalche.me/e/e3q8)

从上述报错中,可以看出是 Web 服务连接数据库的时候,没有正确和数据库建立连接,导致查询数据的时候发生异常。同时,通过查看服务的日志,确认证实了上述的想法。但是,手动连接或者使用工具压测数据库都是正常的,并没见异常。

如何更好地连接数据库


2. 处理经过

查找问题的过程还是真的有点费劲

之前看出日志,发现 Nginx 的日志有出现 timeout 的情况,后来为了保证 stream 的持久连接,加上了如下参数配置。配置之后,这样算是解决了 timeout 的问题,但是发现还是会有上述跳转的问题出现。哎,头发白了…..

stream {
    tcp_nodelay on;

    upstream app_postgres {
        hash $remote_addr consistent;
        server 192.168.10.100:5432;
    }
    server {
        listen 5432 so_keepalive=on;
        proxy_pass app_postgres;
    }

其实上述登录问题是概率性问题,是间隔一段时间之后才会复现一次的。后来发现是,设置的 Nginxstream 超时时间导致的。超时时间已过期或者重启数据库就会出现该问题。后来,决定看看服务连接数据库的代码。如下所示,使用的熟知的 SQLAlchemy,顺便查了查对应的参数,发现解决问题的方法了。

  • PostgreSQL
# default
engine = create_engine('postgresql://scott:tiger@localhost/mydatabase')

# psycopg2
engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/mydatabase')

# pg8000
engine = create_engine('postgresql+pg8000://scott:tiger@localhost/mydatabase')
More notes on connecting to PostgreSQL at PostgreSQL.
  • MySQL
# default
engine = create_engine('mysql://scott:tiger@localhost/foo')

# mysqlclient (a maintained fork of MySQL-Python)
engine = create_engine('mysql+mysqldb://scott:tiger@localhost/foo')

# PyMySQL
engine = create_engine('mysql+pymysql://scott:tiger@localhost/foo')

3. 解决方法

对应连接异常问题,官方给出了以下两种解决方法。

如何更好地连接数据库

SQLAlchemy 是通过 Engine 来驱动,而 Engine 自己维护了一个连接池 Pool 对象和方言 Dialect。其实,方言简单而言就是你可以通过同样的命令参数连接到不同的数据库,比如 PostgreSQL/MySQL 等,方便服务连接使用。其中连接池非常重要,因为每次发送 SQL 语句查询的时候都需要事先建立连接。如果程序启动的时候事先就初始化一批连接放在连接池,每次用完后又放回连接池给其它请求使用,就能大大提高查询和响应效率。

官网文档中介绍,SQLAlchemy 的连接池支持事件接口(event interface),该事件接口允许我们定义钩子函数(hooks)在首次连接数据库时,进行检测和处理。这样,就为我们处理异常连接提供了新的方法。

SQLAlchemy中连接池具有 refresh 单个连接及其整个连接池连接的能力,可以将先前的连接设置为”无效”状态。一个常见的用法就是当数据库服务器重新启动并且所有先前建立的连接不再起作用时,允许连接池正常恢复。有以下两种方法。

from sqlalchemy import create_engine

engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/mydatabase',
    echo=False,         # 为True时可将sql语句打印出来
    pool_size=100,      # 设置连接池的大小,默认为5个,而0表示无限制
    pool_recycle=3600,  # 在设置的指定时间内回收连接
    pool_pre_ping=True  # 连接前发送测试语句
)

  • [1] 悲观的处理方式 - pool_pre_ping

悲观方法是指在每个连接池开始连接之前,先发送 SQL 测试语句,以测试数据库连接仍然可用。如果有错误,监测为断开的状态,连接将被立即回收。通常,使用如 SELECT 1 这种简单的语句,但也可以使用某些特定于 DBAPI 的方法来测试数据库连接是否可用。。这个参数是用在悲观场景下的也就是数据库频繁重启的,乐观场景下就是默认的,第一次会失败,失败后 SQLAlchemy 会重连,后面就好了。

需要注意的是,这个连接之前预处理方法不太适合用于事务或其他 SQL 操作中间丢失的连接情况。如果在进行事务操作的时候,数据库不可用,则会导致事务丢失并且将引发数据库错误。当出现这种情况时,虽然 Connection 对象会检测到 断开连接 的情况并及时回收连接并使其余的连接池无效,但引发异常的单个操作将丢失,并且取决于应用程序放弃该操作,或再次重试整个事务。

在某些罕见的情况下,连接数据库可用但无法响应 ping 对应的测试语句,pre_ping 参数将最多尝试 3 次,如果最后还是无法连接,将发送调用放连接数据库错误。

# 该参数version 1.2版本引入的
from sqlalchemy import create_engine
engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)
  • [2] 悲观的处理方式 - event.listens_for

在没有 create_engine.pool_pre_ping 函数之前,我们一般可以通过自定义函数的方式达到同样的效果。通过 listens_for 的方法,对创建连接事件进行监听,来做连接的特殊处理。如果,使用像 try... except 来处理 select 语句,如果没问题就关闭。

from sqlalchemy import exc
from sqlalchemy import event
from sqlalchemy import select

some_engine = create_engine(...)

@event.listens_for(some_engine, "engine_connect")
def ping_connection(connection, branch):
    if branch:
        # "branch" refers to a sub-connection of a connection,
        # we don't want to bother pinging on these.
        return

    # turn off "close with result".  This flag is only used with
    # "connectionless" execution, otherwise will be False in any case
    save_should_close_with_result = connection.should_close_with_result
    connection.should_close_with_result = False

    try:
        # run a SELECT 1.   use a core select() so that
        # the SELECT of a scalar value without a table is
        # appropriately formatted for the backend
        connection.scalar(select([1]))
    except exc.DBAPIError as err:
        # catch SQLAlchemy's DBAPIError, which is a wrapper
        # for the DBAPI's exception.  It includes a .connection_invalidated
        # attribute which specifies if this connection is a "disconnect"
        # condition, which is based on inspection of the original exception
        # by the dialect in use.
        if err.connection_invalidated:
            # run the same SELECT again - the connection will re-validate
            # itself and establish a new connection.  The disconnect detection
            # here also causes the whole connection pool to be invalidated
            # so that all stale connections are discarded.
            connection.scalar(select([1]))
        else:
            raise
    finally:
        # restore "close with result"
        connection.should_close_with_result = save_should_close_with_result
  • [3] 乐观的处理方式 - try… except

使用 try... except 处理一个 select 语句,如果连接无效,则返回 Connection was invalidated!。然后开一个新的连接,再去执行 select 语句。这个方式可以写一个装饰器,放在每个查询前面,保证数据库语句可以正常被执行。

from sqlalchemy import create_engine, exc
e = create_engine(...)
c = e.connect()

try:
    # suppose the database has been restarted.
    c.execute("SELECT * FROM table")
    c.close()
except exc.DBAPIError, e:
    # an exception is raised, Connection is invalidated.
    if e.connection_invalidated:
        print("Connection was invalidated!")

# after the invalidate event, a new connection
# starts with a new Pool
c = e.connect()
c.execute("SELECT * FROM table")
  • [4] 乐观的处理方式 - pool_recycle

此参数可以防止连接池使用已超过一定期限的特定连接,超时之后将被立即回收,并且适用于数据库后端(例如 MySQL),该数据库在特定时间段后会自动关闭已失效的连接。pool_recyclePool 本身的功能,与是否正在使用 Engine 无关。

from sqlalchemy import create_engine
e = create_engine("mysql://scott:tiger@localhost/test", pool_recycle=3600)

4. 参考链接

授人玫瑰,手有余香!


文章作者: Escape
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Escape !