博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python使用dbutils的PooledDB连接池,操作数据库
阅读量:6608 次
发布时间:2019-06-24

本文共 4299 字,大约阅读时间需要 14 分钟。

1、使用dbutils的PooledDB连接池,操作数据库。

这样就不需要每次执行sql后都关闭数据库连接,频繁的创建连接,消耗时间

2、如果是使用一个连接一直不关闭,多线程下,插入超长字符串到数据库,运行一段时间后很容易出现OperationalError: (2006, ‘MySQL server has gone away’)这个错误。

使用PooledDB解决。

 

# coding=utf-8"""使用DBUtils数据库连接池中的连接,操作数据库OperationalError: (2006, ‘MySQL server has gone away’)"""import jsonimport pymysqlimport datetimefrom DBUtils.PooledDB import PooledDBimport pymysqlclass MysqlClient(object):    __pool = None;    def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,                 maxusage=100, setsession=None, reset=True,                 host='127.0.0.1', port=3306, db='test',                 user='root', passwd='123456', charset='utf8mb4'):        """        :param mincached:连接池中空闲连接的初始数量        :param maxcached:连接池中空闲连接的最大数量        :param maxshared:共享连接的最大数量        :param maxconnections:创建连接池的最大数量        :param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理        :param maxusage:单个连接的最大重复使用次数        :param setsession:optional list of SQL commands that may serve to prepare            the session, e.g. ["set datestyle to ...", "set time zone ..."]        :param reset:how connections should be reset when returned to the pool            (False or None to rollback transcations started with begin(),            True to always issue a rollback for safety's sake)        :param host:数据库ip地址        :param port:数据库端口        :param db:库名        :param user:用户名        :param passwd:密码        :param charset:字符编码        """        if not self.__pool:            self.__class__.__pool = PooledDB(pymysql,                                             mincached, maxcached,                                             maxshared, maxconnections, blocking,                                             maxusage, setsession, reset,                                             host=host, port=port, db=db,                                             user=user, passwd=passwd,                                             charset=charset,                                             cursorclass=pymysql.cursors.DictCursor                                             )        self._conn = None        self._cursor = None        self.__get_conn()    def __get_conn(self):        self._conn = self.__pool.connection();        self._cursor = self._conn.cursor();    def close(self):        try:            self._cursor.close()            self._conn.close()        except Exception as e:            print e    def __execute(self, sql, param=()):        count = self._cursor.execute(sql, param)        print count        return count    @staticmethod    def __dict_datetime_obj_to_str(result_dict):        """把字典里面的datatime对象转成字符串,使json转换不出错"""        if result_dict:            result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}            result_dict.update(result_replace)        return result_dict    def select_one(self, sql, param=()):        """查询单个结果"""        count = self.__execute(sql, param)        result = self._cursor.fetchone()        """:type result:dict"""        result = self.__dict_datetime_obj_to_str(result)        return count, result    def select_many(self, sql, param=()):        """        查询多个结果        :param sql: qsl语句        :param param: sql参数        :return: 结果数量和查询结果集        """        count = self.__execute(sql, param)        result = self._cursor.fetchall()        """:type result:list"""        [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]        return count, result    def execute(self, sql, param=()):        count = self.__execute(sql, param)        return count    def begin(self):        """开启事务"""        self._conn.autocommit(0)    def end(self, option='commit'):        """结束事务"""        if option == 'commit':            self._conn.autocommit()        else:            self._conn.rollback()if __name__ == "__main__":    mc = MysqlClient()    sql1 = 'SELECT * FROM shiji  WHERE  id = 1'    result1 = mc.select_one(sql1)    print json.dumps(result1[1], ensure_ascii=False)    sql2 = 'SELECT * FROM shiji  WHERE  id IN (%s,%s,%s)'    param = (2, 3, 4)    print json.dumps(mc.select_many(sql2, param)[1], ensure_ascii=False)

 

如果独立使用pymysql数据库,最好是配合DButils库。

上面的用法中是一直使用了mc这个对象,真实意图不是这样的,你可以随意实例化MysqlClient(),用不同的MysqlClient实例操纵数据库。

转载地址:http://esiso.baihongyu.com/

你可能感兴趣的文章
【DM642】ICELL Interface—Cells as Algorithm Containers
查看>>
力扣算法题—085最大矩阵
查看>>
svs 在创建的时候 上传文件夹 bin obj 这些不要提交
查看>>
Tinkphp
查看>>
How to temporally disable IDE tools (load manually)
查看>>
Vue.js学习 Item4 -- 数据双向绑定
查看>>
几种排序方式的java实现(01:插入排序,冒泡排序,选择排序,快速排序)
查看>>
图片存储类型的种类、特点、区别
查看>>
GETTING UP AND RUNNING WITH NODE.JS, EXPRESS, JADE, AND MONGODB
查看>>
MySQLs数据库建外键时自动跑到缩影处,真奇怪
查看>>
static关键字
查看>>
js 合并多个对象 Object.assign
查看>>
Java 反射机制
查看>>
temporary Object and destructor
查看>>
xcode - 移动手势
查看>>
细说浏览器特性检测(1)-jQuery1.4添加部分
查看>>
古中国数学家的计算力真是惊人
查看>>
Java基础-算术运算符(Arithmetic Operators)
查看>>
C#编程(四十七)----------集合接口和类型
查看>>
【转】关于大型网站技术演进的思考(十二)--网站静态化处理—缓存(4)
查看>>