【Python模块】——pymysql

news/2025/2/27 14:07:21

pymysql是python操作mysql的标准库,可以通过pip install快速导入pymysql包操作数据库

使用pymysql操作mysql

简单demo

python">import pymysql
connect = pymysql.connect(
    host="localhost",
    port=3306,
    user="root",
    password="root",
    database="my_database",
    # charset="utf8mb4"
)
cursor = connect.cursor()

# 查询语句1
sql = "select * from user where name = %(name)s"
ret = cursor.execute(sql, {"name": "ls"})
# 查询语句2
sql = "select * from user where name = %s"
ret = cursor.execute(sql, "ls")
print(ret)

result = cursor.fetchall()
print("result", result)

cursor.close()
connect.close()

自定义SqlHelper

python">import pymysql

class MySQLClient(object):
    def __init__(self, **kwargs):
        self.conn = pymysql.connect(
            **kwargs
        )
        self.cursor = self.conn.cursor()


    def query(self, sql, *args):
        try:
            rowcount = self.cursor.execute(sql, *args)
            return rowcount
        except Exception as e:
            raise e


    def update(self, sql, *args):
        self.cursor.execute(sql, *args)
        self.conn.commit()


    def insert(self, sql, *args):
        self.cursor.execute(sql, *args)
        self.conn.commit()


    def fetch_one(self, sql, *args):
        self.query(sql, *args)
        result = self.cursor.fetchone()
        return result


    def fetch_all(self, sql, *args):
        self.query(sql, *args)
        result = self.cursor.fetchone()
        return result

    def close(self):
        self.cursor.close()
        self.conn.close()


config = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "root",
    "database": "my_database",
}

mysql_client = MySQLClient(**config)
sql = "select * from user where name=%s"
ret = mysql_client.fetch_one(sql, "ls")
print(ret)

# mysql_client.close()

借助DButils创建数据库连接池

DButils模块可以通过创建数据库连接池,提升数据库操作性能;
实现思路:

  1. 定义SqlHelper类
  2. 通过__init__方法定义pool=PoolDB(**kwargs),_local=threading.local()
  3. 定义__enter__获取connection与cursor和__exit__关闭connection与cursor,可支持with 上下文操作
  4. 为了保证每次获取的connection与cursor不会将之前的覆盖掉,引入threading.local进行保存;self._local = {thread_id: {“stack”: [(connection, cursor)]}}
python">#!/usr/bin/env python  
# -*- coding:utf-8 -*-  
import pymysql
from dbutils.pooled_db import PooledDB
from threading import local

class SqlHelper(object):
    def __init__(self):
        self.pool = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=5,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=1,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
            # maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
            setsession=[],  # 开始会话前执行的命令列表
            host='localhost',
            port=3306,
            user='root',
            password='root',
            database='my_database',
            charset='utf8'
        )
        self._local = local()

    def open(self):
        connection = self.pool.connection()
        cursor = connection.cursor()
        return  connection, cursor

    def close(self, cursor, conn):
        cursor.close()
        conn.close()

    def __enter__(self):
        conn, cursor = self.open()
        rv =  getattr(self._local, "stack", None)
        if not rv:
            self._local.stack = [(conn, cursor)]
        else:
            self._local.stack.append((conn, cursor))
        return cursor

    def __exit__(self, exc_type, exc_val, exc_tb):
        rv = getattr(self._local, "stack", None)
        if not rv:
            # del self._local.stack
            return
        elif len(rv) == 1:
            conn, cursor = rv[-1]
            # del self._local.stack
            return
        else:
            conn, cursor = rv.pop()
        cursor.close()
        conn.close()


    def fetchone(self, sql, *args):
        conn, cursor = self.open(self)
        try:
            rowcount = cursor.execute(sql, *args)
            ret = cursor.fetchone()
            return ret
        except Exception as e:
            raise


    def fetchall(self, sql, *args):
        conn, cursor = self.open(self)
        try:
            rowcount = cursor.execute(sql, *args)
            ret = cursor.fetchall()
            return ret
        except Exception as e:
            raise

db = SqlHelper()

sql = "select * from user"
with db as c1:
    ret = c1.execute(sql)
    print(ret)
    with db as c2:
        ret = c2.execute(sql)
        print(ret)

使用DButils的另一种写法

使用这种写法,每次都实例化SqlHelper,保证每次获取的connection和cursor不被覆盖

python">#!/usr/bin/env python  
# -*- coding:utf-8 -*-  
""" 
1. 定义全局变量POOL=pooledDB(**kwargs)
2. 每次用到db就实例化一次
"""
import pymysql
from dbutils.pooled_db import PooledDB
from threading import local

pool = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=0,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=1,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
            # maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
            blocking=False,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
            setsession=[],  # 开始会话前执行的命令列表
            host='localhost',
            port=3306,
            user='root',
            password='root',
            database='my_database',
            charset='utf8'
        )

class SqlHelper(object):
    def __init__(self):
        self.conn = None
        self.cursor = None

    def open(self):
        self.connection = pool.connection()
        self.cursor = self.connection.cursor()
        return  self.connection, self.cursor

    def close(self):
        self.cursor.close()
        self.conn.close()

    def __enter__(self):
        self.conn, self.cursor = self.open()
        return self.cursor

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()



db = SqlHelper()

sql = "select * from user"
with db as c1:
    ret = c1.execute(sql)
    print("c1.cursor: ", db.cursor)
    print(ret)
    with db as c2:
        ret = c2.execute(sql)
        print("c2.cursor: ", db.cursor)  # 一个实例对象是可以多次调用enter方法的,但db.cursor发生了改变,即上一次的连接丢了
        print(ret)

        print(type(c1), type(c2))

        print(c1 is c2) # false


    print("c1.cursor: ", db.cursor) # c2.cursor将c1.cursor覆盖了


http://www.niftyadmin.cn/n/5868691.html

相关文章

ui设计公司兰亭妙微分享:科研单位UI界面设计

科研单位的UI界面设计是一项至关重要的任务,它不仅关乎科研工作的效率,还直接影响到科研人员的用户体验。以下是对科研单位UI界面设计的详细分析: 一、设计目标 科研单位的UI界面设计旨在提升科研工作的效率与便捷性,同时确保科…

2024最新版鸿蒙纯血原生应用开发教程文档丨学习ArkTS语言-基本语法

ArkTS是HarmonyOS的主要应用开发语言,在TypeScript基础上进行了扩展,保留了其基本风格,并通过增强静态检查和分析来提高程序的稳定性和性能。本教程将帮助开发者掌握ArkTS的核心功能、语法及最佳实践,以便高效地构建高性能移动应用…

1分钟用DeepSeek编写一个PDF转Word软件

一、引言 如今,在线工具的普及让PDF转Word成为了一个常见需求,常见的pdf转word工具有收费的wps,免费的有pdfgear,见下文: PDFgear:一款免费的PDF编辑、格式转化软件-CSDN博客 还有网上在线的免费pdf转word工具smallp…

B站pwn教程笔记-3

栈知识、部分保护措施 GDB显示的栈地址有时候并不是可靠的地址,gdb也是用特殊的进程映像来拿地址的。且gdb默认关闭栈地址随机化。但是,偏移量是没有错误的。目前还没学到咋解决 第一个栈帧是main函数栈帧,之前的一些系统函数什么的没有栈帧…

【Android】用 chrome://inspect/#devices 调试H5页面

通常做Android开发的过程中,不可避免的需要遇到去与H5交互,甚至有时候需要去调试H5的信息。 这里分享一下Android工程里如何调试H5页面信息: 直接在浏览器地址栏输入 : chrome://inspect/#devices 直接连接手机usb,打开开发者模式…

猿大师播放器:交通水利、公安消防Web端Vue网页播放20路RTSP H.265 1080P监控视频流

随着互联网技术的飞速发展,视频监控已成为各行各业不可或缺的一部分。无论是交通物流、公安消防,还是水利农业、园区校园,视频监控都扮演着至关重要的角色。然而,传统的视频监控解决方案往往依赖于特定的客户端软件,这…

ssh配置 远程控制 远程协作 github本地配置

0.设备版本 windows11 ubuntu24.0.4 1.1 在 Linux 上启用 SSH 服务 首先,确保 Linux 计算机上安装并启用了 SSH 服务。 安装和启动 OpenSSH 服务(如果未安装) # 在终端安装 OpenSSH 服务(如果尚未安装) sudo apt …

qt QTreeWidget`总结

1. 基本概念 作用:用于显示树形结构的数据(如文件目录、层级菜单)。核心组件: QTreeWidget:树形控件容器。QTreeWidgetItem:树形控件中的节点(可包含子节点)。 2. 基本用法 创建树…