As p开发者_开发百科er the SQLAlchemy, select statements are treated as iterables in for loops. The effect is that a select statement that would return a massive amount of rows does not use excessive memory.
I am finding that the following statement on a MySQL table:
for row in my_connections.execute(MyTable.__table__.select()):
yield row
Does not seem to follow this, as I overflow available memory and begin thrashing before the first row is yielded. What am I doing wrong?
The basic MySQLdb
cursor fetches the entire query result at once from the server.
This can consume a lot of memory and time.
Use MySQLdb.cursors.SSCursor when you want to make a huge query and
pull results from the server one at a time.
Therefore, try passing connect_args={'cursorclass': MySQLdb.cursors.SSCursor}
when creating the engine
:
from sqlalchemy import create_engine, MetaData
import MySQLdb.cursors
engine = create_engine('mysql://root:zenoss@localhost/e2', connect_args={'cursorclass': MySQLdb.cursors.SSCursor})
meta = MetaData(engine, reflect=True)
conn = engine.connect()
rs = s.execution_options(stream_results=True).execute()
See http://www.sqlalchemy.org/trac/ticket/1089
Note that using SSCursor locks the table until the fetch is complete. This affects other cursors using the same connection: Two cursors from the same connection can not read from the table concurrently.
However, cursors from different connections can read from the same table concurrently.
Here is some code demonstrating the problem:
import MySQLdb
import MySQLdb.cursors as cursors
import threading
import logging
import config
logger = logging.getLogger(__name__)
query = 'SELECT * FROM huge_table LIMIT 200'
def oursql_conn():
import oursql
conn = oursql.connect(
host=config.HOST, user=config.USER, passwd=config.PASS,
db=config.MYDB)
return conn
def mysqldb_conn():
conn = MySQLdb.connect(
host=config.HOST, user=config.USER,
passwd=config.PASS, db=config.MYDB,
cursorclass=cursors.SSCursor)
return conn
def two_cursors_one_conn():
"""Two SSCursors can not use one connection concurrently"""
def worker(conn):
cursor = conn.cursor()
cursor.execute(query)
for row in cursor:
logger.info(row)
conn = mysqldb_conn()
threads = [threading.Thread(target=worker, args=(conn, ))
for n in range(2)]
for t in threads:
t.daemon = True
t.start()
# Second thread may hang or raise OperationalError:
# File "/usr/lib/pymodules/python2.7/MySQLdb/cursors.py", line 289, in _fetch_row
# return self._result.fetch_row(size, self._fetch_type)
# OperationalError: (2013, 'Lost connection to MySQL server during query')
for t in threads:
t.join()
def two_cursors_two_conn():
"""Two SSCursors from independent connections can use the same table concurrently"""
def worker():
conn = mysqldb_conn()
cursor = conn.cursor()
cursor.execute(query)
for row in cursor:
logger.info(row)
threads = [threading.Thread(target=worker) for n in range(2)]
for t in threads:
t.daemon = True
t.start()
for t in threads:
t.join()
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
two_cursors_one_conn()
two_cursors_two_conn()
Note that oursql is an alternative set of MySQL bindings for Python. oursql cursors are true server-side cursors which fetch rows lazily by default. With oursql
installed, if you change
conn = mysqldb_conn()
to
conn = oursql_conn()
then two_cursors_one_conn()
runs without hanging or raising an exception.
精彩评论