0%

SanicDB 使用文档

1. sanicdb简介

SanicDB是为python的异步web框架sanic方便操作mysql(mariadb)而开发的工具,是对aiomysql.Pool的轻量级封装。Sanic是异步IO的web框架,用异步IO的方式读取mysql也才能更加发挥它的效率。

备注:sanicdb也可以用于其他任何需要异步IO操作mysql的地方,不仅仅限于sanic中

github:https://github.com/veelion/sanicdb

安装

pip install sanicdb

2. 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
class SanicDB:
"""A lightweight wrapper around aiomysql.Pool for easy to use
"""
def __init__(self, host, database, user, password,
loop=None, sanic=None,
minsize=3, maxsize=5,
return_dict=True,
pool_recycle=7*3600,
autocommit=True,
charset = "utf8mb4", **kwargs):
'''
kwargs: all parameters that aiomysql.connect() accept.
'''
  • 前4个参数是对应mysql数据库的。

  • loop是应用程序中的事件循环(event_loop),默认为None,则程序会自动调用asyncio.get_event_loop()创建。此外,sanic和loop两者只需要提供一个即可,当同时存在时,了解数据库其实用的是sanic中的loop。可以在初始化中看到

1
2
3
4
async def init_pool(self):
if self.sanic:
self.db_args['loop'] = self.sanic.loop
self.pool = await aiomysql.create_pool(**self.db_args)
  • minsize: 最小连接数量
  • maxsize:最大连接数量
  • return_dict : 是返回的数据一条记录为一个dict,key是MySQL表的字段名,value是字段的值
  • pool_recycle :连接池重连的时间间隔,MySQL默认的连接闲置时间是8小时(我们服务器目前已改为24小时)
  • autocommit:是否自动提交

3. 函数定义

sanicdb提供了6个功能函数。

  • query

    功能如名,查询数据库。

    1
    2
    async def query(self, query, *parameters, **kwparameters):
    """Returns a row list for the given query and parameters."""
  • get

    查询数据库,只返回一条数据。

    1
    2
    3
    async def get(self, query, *parameters, **kwparameters):
    """Returns the (singular) row returned by the given query.
    """
  • execute

    执行操作,返回受影响的行ID

    1
    2
    async def execute(self, query, *parameters, **kwparameters):
    """Executes the given query, returning the lastrowid from the query."""
  • table_has

    检查一个table中是否含有某字段为某值的记录

    1
    async def table_has(self, table_name, field, value):
  • table_insert

    插入数据库。

    1
    2
    async def table_insert(self, table_name, item, ignore_duplicated=True):
    '''item is a dict : key is mysql table field'''
  • table_update

    更新数据库

    1
    2
    3
    async def table_update(self, table_name, updates,
    field_where, value_where):
    '''updates is a dict of {field_update:value_update}'''

4. 示例

  1. 普通的异步mysql例子,参照官网给出的test

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65

    import asyncio
    from sanicdb import SanicDB


    async def test(loop):
    db = SanicDB('localhost', 'testdb', 'root', 'the_password',
    minsize=3, maxsize=5,
    connect_timeout=5,
    loop=loop)
    sql = 'Drop table test'
    await db.execute(sql)

    sql = """CREATE TABLE `test` (
    `id` int(8) NOT NULL AUTO_INCREMENT,
    `name` varchar(16) NOT NULL,
    `content` varchar(255) NOT NULL,
    PRIMARY KEY (`id`),
    UNIQUE KEY `name` (`name`)
    ) ENGINE=MyISAM ;"""
    await db.execute(sql)

    sql = 'select * from test where name = %s'
    data = await db.query(sql, 'abc')
    print('query():', data)

    sql += ' limit 1'
    d = await db.get(sql, 'abc')
    print('get():', d)

    sql = 'delete from test where name=%s'
    lastrowid = await db.execute(sql, 'xyz')
    print('execute(delete...):', lastrowid)
    sql = 'insert into test set name=%s, content=%s'
    lastrowid = await db.execute(sql, 'xyz', '456')
    print('execute(insert...):', lastrowid)

    ret = await db.table_has('test', 'name', 'abc')
    print('has(): ', ret)

    ret = await db.table_update('test', {'content': 'updated'},
    'name', 'abc')
    print('update():', ret)
    sql = 'select * from test where name = %s'
    data = await db.query(sql, 'abc')
    print('query():', data)

    item = {
    'name': 'abc',
    'content': '123'
    }
    i = 0
    while 1:
    i += 1
    if i % 2 == 0:
    lastid = await db.table_insert('test', item, ignore_duplicated=False)
    else:
    lastid = await db.table_insert('test', item)
    print('insert():', lastid)
    await asyncio.sleep(1)


    if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(test(loop))
  2. sanic web中异步插叙mysql

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    # author: sunshine
    # datetime: 2020/10/29 10:12
    from sanic import Sanic
    from sanic_cors import CORS
    from sanic.response import json
    from sanicdb import SanicDB

    app = Sanic(__name__)
    # 支持跨域
    CORS(app)

    db = SanicDB(
    host='172.27.0.6',
    database='privacy',
    user='root',
    password='PdiCond$402875432',
    sanic=app,
    maxsize=18
    )

    @app.route('/simple', methods=['POST'])
    async def simple_query(request):
    """
    对数据库的简单查询测试
    :param request:
    :return:
    """
    sql_str = 'select * from employee'
    num = 0
    data = await app.db.query(sql_str)
    num += len(data)
    return json({"查询条数": num})

    if __name__ == '__main__':
    app.run('0.0.0.0', port=5067)