连接数据库
PS C:\Users\dongx> python -V
Python 3.8.3
PS C:\Users\dongx> pip install --upgrade psycopg2
Collecting psycopg2
Using cached psycopg2-2.8.5-cp38-cp38-win_amd64.whl (1.1 MB)
Installing collected packages: psycopg2
Successfully installed psycopg2-2.8.5
[postgresql]
host = 192.168.56.104
port = 5432
database = hrdb
user = tony
password = tony
# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser
def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)
# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))
return db
db_config = read_db_config()
connection = None
try:
# 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
connection = psycopg2.connect(**db_config)
# 创建一个游标
cur = connection.cursor()
# 获取 PostgreSQL 版本号
cur.execute('SELECT version()')
db_version = cur.fetchone()
# 输出 PostgreSQL 版本
print("连接成功,PostgreSQL 服务器版本:", db_version)`在这里插入代码片`
# 关闭游标
cur.close()
except (Exception, DatabaseError) as e:
print("连接 PostgreSQL 失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()
print("PostgreSQL 数据库连接已关闭。")
首先,我们导入了 psycopg2 驱动和解析配置文件的 configparser 模块;
然后,创建一个读取配置文件的 read_db_config 函数;
接下来调用 psycopg2.connect 函数创建一个新的数据库连接;
然后通过连接对象的 cursor 函数创建一个新的游标,并且执行查询语句返回数据库的版本;
在此之后,调用游标对象的 fetchone() 方法获取返回结果并打印信息;
最后,调用 close() 方法关闭游标资源和数据库连接对象。
连接成功,PostgreSQL 服务器版本:('PostgreSQL 12.3 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-39), 64-bit',)
PostgreSQL 数据库连接已关闭。
创建和删除表
# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser
def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)
# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))
return db
db_config = read_db_config()
connection = None
try:
# 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
connection = psycopg2.connect(**db_config)
# 创建一个游标
cur = connection.cursor()
# 定义 SQL 语句
sql = """ create table users (
id serial primary key,
name character varying(10) not null unique,
created_at timestamp not null
) """
# 执行 SQL 命令
cur.execute(sql)
# 关闭游标
cur.close()
# 提交事务
connection.commit()
print("操作成功!")
except (Exception, DatabaseError) as e:
print("操作失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()
print("PostgreSQL 数据库连接已关闭。")
操作成功!
PostgreSQL 数据库连接已关闭。
操作失败:relation "users" already exists
插入数据
# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser
def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)
# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))
return db
db_config = read_db_config()
connection = None
try:
# 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
connection = psycopg2.connect(**db_config)
# 创建一个游标
cur = connection.cursor()
# 定义 SQL 语句
sql = """ insert into users(name, created_at)
values (%s, %s) RETURNING id
"""
# 执行 SQL 命令
cur.execute(sql, ('tony', '2020-06-08 11:00:00'))
# 获取 id
id = cur.fetchone()[0]
# 提交事务
connection.commit()
print("操作成功!用户 id:", id)
# 关闭游标
cur.close()
except (Exception, DatabaseError) as e:
print("操作失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()
print("PostgreSQL 数据库连接已关闭。")
操作成功!用户 id:1
PostgreSQL 数据库连接已关闭。
查询数据
# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser
def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)
# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))
return db
db_config = read_db_config()
connection = None
try:
# 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
connection = psycopg2.connect(**db_config)
# 创建一个游标
cur = connection.cursor()
# 定义 SQL 语句
sql = """ select id, name, created_at
from users
"""
# 执行 SQL 命令
cur.execute(sql)
print("用户数量:", cur.rowcount)
# 获取结果
user = cur.fetchone()
while user is not None:
print(user)
user = cur.fetchone()
# 关闭游标
cur.close()
except (Exception, DatabaseError) as e:
print("操作失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()
用户数量:1
(1, 'tony', datetime.datetime(2020, 6, 8, 11, 0))
修改数据
# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser
def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)
# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))
return db
db_config = read_db_config()
connection = None
try:
# 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
connection = psycopg2.connect(**db_config)
# 创建一个游标
cur = connection.cursor()
# 定义 SQL 语句
sql = """ update users
set name = %s
where id = %s
"""
# 执行 SQL 命令
cur.execute(sql, ('tom', 1))
# 获取 id
rows = cur.rowcount
# 提交事务
connection.commit()
print("操作成功!更新行数:", rows)
# 再次查询数据
sql = """ select id, name, created_at
from users where id = 1
"""
cur.execute(sql)
user = cur.fetchone()
print(user)
# 关闭游标
cur.close()
except (Exception, DatabaseError) as e:
print("操作失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()
操作成功!更新行数:1
(1, 'tom', datetime.datetime(2020, 6, 8, 11, 0))
删除数据
# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser
def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)
# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))
return db
db_config = read_db_config()
connection = None
try:
# 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
connection = psycopg2.connect(**db_config)
# 创建一个游标
cur = connection.cursor()
# 定义 SQL 语句
sql = """ delete from users where id = %s
"""
# 执行 SQL 命令
cur.execute(sql, (1,))
rows = cur.rowcount
# 提交事务
connection.commit()
print("操作成功!删除行数:", rows)
# 关闭游标
cur.close()
except (Exception, DatabaseError) as e:
print("操作失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()
操作成功!删除行数:1
管理事务
# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser
def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)
# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))
return db
db_config = read_db_config()
connection = None
try:
# 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
connection = psycopg2.connect(**db_config)
# 打印和设置自动提交
print('默认 autocommit:', connection.autocommit)
connection.autocommit = True
print('新的 autocommit:', connection.autocommit)
# 创建一个游标
cur = connection.cursor()
# 定义 SQL 语句
sql = """ insert into users(name, created_at)
values (%s, %s) RETURNING id
"""
# 执行 SQL 命令
cur.execute(sql, ('tony', '2020-06-08 11:00:00'))
# 获取 id
id = cur.fetchone()[0]
print("操作成功!用户 id:", id)
# 关闭游标
cur.close()
except (Exception, DatabaseError) as e:
print("操作失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()
print("PostgreSQL 数据库连接已关闭。")
默认 autocommit:False
新的 autocommit:True
操作成功!用户 id:2
PostgreSQL 数据库连接已关闭。
# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser
def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)
# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))
return db
db_config = read_db_config()
connection = None
try:
# 使用 with 语句管理事务
with psycopg2.connect(**db_config) as connection:
# 创建一个游标
with connection.cursor() as cur:
# 插入数据
sql = """ insert into users(name, created_at)
values (%s, %s)
"""
cur.execute(sql, ('Jason', '2020-06-08 15:30:00'))
# 更新数据
sql = """ update users
set created_at = %s
where name = %s
"""
cur.execute(sql, ('2020-06-08 16:00:00', 'tony'))
sql = """ select id, name, created_at
from users
"""
# 查询数据
cur.execute(sql)
# 获取结果
user = cur.fetchone()
while user is not None:
print(user)
user = cur.fetchone()
except (Exception, DatabaseError) as e:
print("操作失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()
(3, 'Jason', datetime.datetime(2020, 6, 8, 15, 30))
(2, 'tony', datetime.datetime(2020, 6, 8, 16, 0))
调用存储函数
CREATE OR REPLACE FUNCTION get_user_count()
returns int
AS $$
DECLARE
ln_count int;
BEGIN
select count(*) into ln_count
from users;
return ln_count;
END; $$
LANGUAGE plpgsql;
# 导入 psycopg2 模块和 Error 对象
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser
def read_db_config(filename='dbconfig.ini', section='postgresql'):
""" 读取数据库配置文件,返回一个字典对象
"""
# 创建解析器,读取配置文件
parser = ConfigParser()
parser.read(filename)
# 获取 postgresql 部分的配置
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))
return db
db_config = read_db_config()
connection = None
try:
# 使用 with 语句管理事务
with psycopg2.connect(**db_config) as connection:
# 创建一个游标
with connection.cursor() as cur:
# 调用存储函数
cur.callproc('get_user_count')
row = cur.fetchone()[0]
print('用户总数:', row)
except (Exception, DatabaseError) as e:
print("操作失败:", e)
finally:
# 释放数据库连接
if connection is not None:
connection.close()
cur.execute('select * from get_user_count()')
用户总数:2
