美文网首页
3. 数据操作

3. 数据操作

作者: SingleDiego | 来源:发表于2022-03-16 16:58 被阅读0次

官方文档:






本章数据库模型

本节中的操作将在以下数据库模型里进行。

from sqlalchemy import create_engine, MetaData
from sqlalchemy import Table, Column, Integer, String, ForeignKey

engine = create_engine('sqlite:///memory.db', echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user_account",
    metadata_obj,
    Column('id', Integer, primary_key=True),
    Column('name', String(30)),
    Column('fullname', String)
)

address_table = Table(
    "address",
    metadata_obj,
    Column('id', Integer, primary_key=True),
    Column('user_id', ForeignKey('user_account.id'), nullable=False),
    Column('email_address', String, nullable=False)
)

同时我们会使用 ORM 来定义数据库模型。

from sqlalchemy import create_engine, MetaData
from sqlalchemy import Table, Column, Integer, String, ForeignKey
from sqlalchemy.orm import declarative_base, relationship

engine = create_engine('sqlite:///memory.db', echo=True, future=True)
Base = declarative_base()

class User(Base):
    __tablename__ = 'user_account'

    id = Column(Integer, primary_key=True)
    name = Column(String(30))
    fullname = Column(String)

    addresses = relationship("Address", back_populates="user")

    def __repr__(self):
        return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"

class Address(Base):
    __tablename__ = 'address'

    id = Column(Integer, primary_key=True)
    email_address = Column(String, nullable=False)
    user_id = Column(Integer, ForeignKey('user_account.id'))

    user = relationship("User", back_populates="addresses")

    def __repr__(self):
        return f"Address(id={self.id!r}, email_address={self.email_address!r})"






insert() 表达式

一个 insert() 例子,stmt 变量列明了要插入数据的表格和要插入的数据。

from sqlalchemy import insert

stmt = insert(user_table).values(name='spongebob', fullname="Spongebob Squarepants")

with engine.connect() as conn:
    result = conn.execute(stmt)
    conn.commit()

上面的 stmt 变量是一个 Insert 实例。我们把它字符串化已更好地理解。

>>> print(stmt)
INSERT INTO user_account (name, fullname) VALUES (:name, :fullname)

字符串化是通过一个数据库对象编译形创建的,我们使用 ClauseElement.compile() 获取这个对象;要想查看绑定的参数,可读取 params 变量。

>>> compiled = stmt.compile()
>>> compiled.params
{'name': 'spongebob', 'fullname': 'Spongebob Squarepants'}

执行上述代码后,会在指定的数据表插入一行数据,改行数据会被默认加上主键 id,其值默认为 1。我们可以使用 CursorResult.inserted_primary_key 来查看。

>>> result.inserted_primary_key
(1,)
插入多行
with engine.connect() as conn:
    result = conn.execute(
        insert(user_table),
        [
            {"name": "sandy", "fullname": "Sandy Cheeks"},
            {"name": "patrick", "fullname": "Patrick Star"}
        ]
    )
    conn.commit()
使用标量子查询集(scalar subquery)插入

现在我们实现一个复杂点的操作,有下面一组数据:

 [
    {"username": 'spongebob', "email_address": "spongebob@sqlalchemy.org"},
    {"username": 'sandy', "email_address": "sandy@sqlalchemy.org"},
    {"username": 'sandy', "email_address": "sandy@squirrelpower.org"},
]

我们需要在 user_table 数据表根据 username 找到对应的行,再把对应的 idemail_address 写入到 address_table 表的 user_idemail_address 列中。

from sqlalchemy import create_engine, MetaData
from sqlalchemy import Table, Column, Integer, String, ForeignKey
from sqlalchemy import insert, select, bindparam

engine = create_engine('sqlite:///memory.db', echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user_account",
    metadata_obj,
    Column('id', Integer, primary_key=True),
    Column('name', String(30)),
    Column('fullname', String)
)

address_table = Table(
    "address",
    metadata_obj,
    Column('id', Integer, primary_key=True),
    Column('user_id', ForeignKey('user_account.id'), nullable=False),
    Column('email_address', String, nullable=False)
)

scalar_subq = (
    select(user_table.c.id).
    where(user_table.c.name==bindparam('username')).
    scalar_subquery()
)

with engine.connect() as conn:
    result = conn.execute(
        insert(address_table).values(user_id=scalar_subq),
        [
            {"username": 'spongebob', "email_address": "spongebob@sqlalchemy.org"},
            {"username": 'sandy', "email_address": "sandy@sqlalchemy.org"},
            {"username": 'sandy', "email_address": "sandy@squirrelpower.org"},
        ]
    )
    conn.commit()






select() 表达式

我们使用 select() 表达式进行 SQL 的查询操作;我们把语句字符串化,让它更好理解。

from sqlalchemy import select

stmt = select(user_table).where(user_table.c.name == 'spongebob')
print(stmt)

# 输出结果:
SELECT user_account.id, user_account.name, user_account.fullname
FROM user_account
WHERE user_account.name = :name_1

现在我们执行这个查询语句。

from sqlalchemy import select

stmt = select(user_table).where(user_table.c.name == 'spongebob')

with engine.connect() as conn:
    for row in conn.execute(stmt):
        print(row)

# 输出结果:
FROM user_account
WHERE user_account.name = ?
2022-03-22 11:02:50,869 INFO sqlalchemy.engine.Engine [generated in 0.00171s] ('spongebob',)
(1, 'spongebob', 'Spongebob Squarepants')
2022-03-22 11:02:50,902 INFO sqlalchemy.engine.Engine ROLLBACK

当我们使用 ORM 方法定义数据库对象的时候,我们更适合的方式是使用 Session.execute() 来执行语句。

from sqlalchemy.orm import Session
from sqlalchemy import select

stmt = select(User).where(User.name == 'spongebob')
with Session(engine) as session:
    for row in session.execute(stmt):
        print(row)

# 输出结果
FROM user_account
WHERE user_account.name = ?
INFO sqlalchemy.engine.Engine [generated in 0.00102s] ('spongebob',)
(User(id=1, name='spongebob', fullname='Spongebob Squarepants'),)
INFO sqlalchemy.engine.Engine ROLLBACK

两种方法有什么区别呢,我们把它字符串化打印出来。

>>> print(select(user_table))
SELECT user_account.id, user_account.name, user_account.fullname
FROM user_account
>>> print(select(User))
SELECT user_account.id, user_account.name, user_account.fullname
FROM user_account

可见二者是等价的。如果要访问表的某个列对象,我们怎么做呢?

>>> print(select(user_table.c.id))
SELECT user_account.id
FROM user_account
>>> print(select(User.id))
SELECT user_account.id
FROM user_account

可见前一种方法要使用 Table.c 方法来访问,后一种方法可以直接访问属性名。

以对象方式显示查询结果

当使用 ORM 方式时,我们希望数据库中的一行能被封装为一个 Python 对象。

from sqlalchemy.orm import Session
from sqlalchemy import select

with Session(engine) as session:
    row = session.execute(select(User)).first()
    print(row)

# 输出结果:
(User(id=1, name='spongebob', fullname='Spongebob Squarepants'),)

上例中,我们选择了 user_account 的第一行,并根据改行的数据封装成一个 User 对象;注意,row 现在是一个元组。

>>> row[0]
User(id=1, name='spongebob', fullname='Spongebob Squarepants')

另一个方法是使用 session.scalars 方法来执行语句。

with Session(engine) as session:
    user = session.scalars(select(User)).first()
    print(user)

也可以直接获取行的某个数值而不返回一个 User 对象。

with Session(engine) as session:
    row = session.execute(select(User.name, User.fullname)).first()
    print(row)

# 输出结果:
('spongebob', 'Spongebob Squarepants')
多重查询条件
with Session(engine) as session:
    row = session.execute(
        select(User.name, Address).
        where(User.id==Address.user_id).
        order_by(Address.id)
    ).all()

    print(row)

# 输出结果:
[('spongebob', Address(id=1, email_address='spongebob@sqlalchemy.org')),
('sandy', Address(id=2, email_address='sandy@sqlalchemy.org')),
('sandy', Address(id=3, email_address='sandy@squirrelpower.org'))]

上例中的查询相当于:

SELECT user_account.name, address.id, address.email_address, address.user_id
FROM user_account, address
WHERE user_account.id = address.user_id ORDER BY address.id
label 方法

ColumnElement.label() 方法可以为查询结果设定一个“别名”,然后在其他地方调用。

with Session(engine) as session:
    stmt = (
        select(
            ("Username: " + user_table.c.name).label("username"),
        ).order_by(user_table.c.name)
    )
    
    for row in session.execute(stmt):
        print(f"{row.username}")

# 输出结果:
Username: patrick
Username: sandy
Username: spongebob
text 方法和 literal_column 方法

text() 方法可以在 select() 表达式中增加一个自定义的字符串作为列对象。

from sqlalchemy import select, text

stmt = (
    select(
        text("'some phrase'"), user_table.c.name
    ).order_by(user_table.c.name)
)


with engine.connect() as conn:
    print(conn.execute(stmt).all())

# 输出结果:
[('some phrase', 'patrick'), ('some phrase', 'sandy'), ('some phrase', 'spongebob')]

如果想把这种自定义的字符串和 label() 方法结合一起使用,可以用 literal_column() 方法实施。

from sqlalchemy import literal_column

stmt = (
    select(
        literal_column("'some phrase'").label("p"), user_table.c.name
    ).order_by(user_table.c.name)
)

with engine.connect() as conn:
    for row in conn.execute(stmt):
        print(f"{row.p}, {row.name}")

# 输出结果:
some phrase, patrick
some phrase, sandy
some phrase, spongebob






where 字句

where 子句用于使用查询条件,大多数 Python 运算符,如 ==!=<>=等都支持。

>>> print(user_table.c.name == 'squidward')
user_account.name = :name_1

>>> print(address_table.c.user_id > 10)
address.user_id > :user_id_1

select 方法和 where 字句结合使用。

>>> print(select(user_table).where(user_table.c.name == 'squidward'))
SELECT user_account.id, user_account.name, user_account.fullname
FROM user_account
WHERE user_account.name = :name_1

多个 where 子句可以并列使用,产生 AND 的效果。

print(
    select(address_table.c.email_address).
    where(user_table.c.name == 'squidward').
    where(address_table.c.user_id == user_table.c.id)
)

# 输出结果:
SELECT address.email_address
FROM address, user_account
WHERE user_account.name = :name_1 AND address.user_id = user_account.id

另一种写法:

print(
    select(address_table.c.email_address).
    where(
        user_table.c.name == 'squidward',
        address_table.c.user_id == user_table.c.id
    )
)

ANDOR 表达式可以使用 sqlalchemy 的 and_()or_() 方法来实现。

from sqlalchemy import and_, or_

print(
    select(Address.email_address).
    where(
        and_(
            or_(User.name == 'squidward', User.name == 'sandy'),
            Address.user_id == User.id
        )
    )
)

# 输出结果:
SELECT address.email_address
FROM address, user_account
WHERE (user_account.name = :name_1 OR user_account.name = :name_2) AND address.user_id = user_account.id

当我们使用 ORM 定义数据库时,还有一个简单的查询方法,就是使用 Select.filter_by() 方法。

print(
    select(User).filter_by(name='spongebob', fullname='Spongebob Squarepants')
)

# 输出结果:
SELECT user_account.id, user_account.name, user_account.fullname
FROM user_account
WHERE user_account.name = :name_1 AND user_account.fullname = :fullname_1






join 子句

要在 sqlalchemy 实现 JOIN 操作(JOIN 的具体含义见:SQL - JOIN
),一种方式是使用 Select.join_from()

print(
    select(user_table.c.name, address_table.c.email_address).
    join_from(user_table, address_table)
)

# 输出结果:
SELECT user_account.name, address.email_address
FROM user_account JOIN address ON user_account.id = address.user_id

另一种方式是使用 Select.join() 方法。

print(
    select(user_table.c.name, address_table.c.email_address).
    join(address_table)
)

# 输出结果:
SELECT user_account.name, address.email_address
FROM user_account JOIN address ON user_account.id = address.user_id

使用 join() 时候也可以把左表和右表的关系都写清楚。

print(
    select(user_table.c.name, address_table.c.email_address).
    select_from(user_table).join(address_table)
)

那么使用 join() 的时候, sqlalchemy 怎么知道我们连接两表的条件是 user_account.id = address.user_id 的呢?这是因为我们的数据库模型设定了外键:ForeignKey('user_account.id')

如果没有在数据库模型中设定外键,可以给 join() 方法额外添加一个参数来起到 ON 子句的作用。

print(
    select(address_table.c.email_address).
    select_from(user_table).
    join(address_table, user_table.c.id == address_table.c.user_id)
)
LEFT OUTER JOIN 与 FULL OUTER JOIN

当使用 join_from()join() 方法时候默认为 INNER JOIN,如果使用 LEFT OUTER JOINFULL OUTER JOIN 时,可以带上 Select.join.isouterSelect.join.full 参数。

print(
    select(user_table).join(address_table, isouter=True)
)

# 输出结果:
SELECT user_account.id, user_account.name, user_account.fullname
FROM user_account LEFT OUTER JOIN address ON user_account.id = address.user_id
print(
    select(user_table).join(address_table, full=True)
)

# 输出结果:
SELECT user_account.id, user_account.name, user_account.fullname
FROM user_account FULL OUTER JOIN address ON user_account.id = address.user_id

还有一个 Select.outerjoin() 方法可以实现 LEFT OUTER JOIN

print(
    select(user_table).outerjoin(address_table)
)

# 输出结果:
SELECT user_account.id, user_account.name, user_account.fullname
FROM user_account LEFT OUTER JOIN address ON user_account.id = address.user_id

注意:SQL 还有一个 RIGHT OUTER JOIN 的子句,但 sqlalchemy 并不直接使用它,要达到该效果可以把左右表调转位置后再使用 LEFT OUTER JOIN






ORDER BY, GROUP BY, HAVING 子句

ORDER BY

使用 Select.order_by() 方法来实现 ORDER BY 子句的功能。

print(select(user_table).order_by(user_table.c.name))

# 输出结果:
SELECT user_account.id, user_account.name, user_account.fullname
FROM user_account ORDER BY user_account.name

要使用升序或降序来排序可以使用 ColumnElement.asc()ColumnElement.desc()

print(select(user_table).order_by(user_table.c.fullname.desc()))

# 输出结果:
SELECT user_account.id, user_account.name, user_account.fullname
FROM user_account ORDER BY user_account.fullname DESC
使用 GROUP BY, HAVING 的聚合函数

在 SQL 中,聚合函数(aggregate functions)允许将多行的列表达式聚合在一起,产生一个单一的结果。比如:计算数量、计算平均数、最大值、最小值等。

>>> from sqlalchemy import func
>>> count_fn = func.count(user_table.c.id)
>>> print(count_fn)
count(user_account.id)

使用聚合函数时,GROUP BY 子句定义了分组的依据,HAVING 子句的使用方式与 WHERE 子句类似,用于定义过滤的条件。

SQLAlchemy 使用 Select.group_by()Select.having() 方法来实现 GROUP BY 子句和 HAVING 子句。

下面来看一个例子:

from sqlalchemy import select, func

with engine.connect() as conn:
    result = conn.execute(
        select(User.name, func.count(Address.id).label("count")).
        join(Address).
        group_by(User.name).
        having(func.count(Address.id) > 1)
    )
    print(result.all())

# 查询结果:
[('sandy', 2)]

这等价于这样的 SQL 语句。

SELECT user_account.name, count(address.id) AS count
FROM user_account JOIN address 
ON user_account.id = address.user_id 
GROUP BY user_account.name
HAVING count(address.id) > 1
对聚合结果排序

使用聚合函数后会产生一个或多个新的列,我们可以对这些新产生的列进行排序操作,使用 label 方法给它们加上自定义的名字会更方便。

stmt = select(
    Address.user_id,
    func.count(Address.id).label('num_addresses')).\
    group_by("user_id").order_by("user_id", desc("num_addresses")
)
print(stmt)

# 输出结果:
SELECT address.user_id, count(address.id) AS num_addresses
FROM address GROUP BY address.user_id ORDER BY address.user_id, num_addresses DESC






使用别名(alias)

有时候我们需要对同一个表的同一列作多次引用,这时候我们就需要使用 SQL 的 AS 子句对该列定一个“别名”。sqlalchemy 中我们使用 FromClause.alias() 方法来实现。

user_alias_1 = user_table.alias()
user_alias_2 = user_table.alias()

print(
    select(user_alias_1.c.name, user_alias_2.c.name).
    join_from(user_alias_1, user_alias_2, user_alias_1.c.id > user_alias_2.c.id)
)

# 输出结果:
SELECT user_account_1.name, user_account_2.name AS name_1
FROM user_account AS user_account_1 
JOIN user_account AS user_account_2 ON user_account_1.id > user_account_2.id
ORM 下的别名

使用 ORM 时我们用 aliased() 来定义别名。

from sqlalchemy.orm import aliased

address_alias_1 = aliased(Address)
address_alias_2 = aliased(Address)

print(
    select(User).
    join_from(User, address_alias_1).
    where(address_alias_1.email_address == 'patrick@aol.com').
    join_from(User, address_alias_2).
    where(address_alias_2.email_address == 'patrick@gmail.com')
)

# 输出结果:
SELECT user_account.id, user_account.name, user_account.fullname
FROM user_account 
JOIN address AS address_1 ON user_account.id = address_1.user_id 
JOIN address AS address_2 ON user_account.id = address_2.user_id
WHERE address_1.email_address = :email_address_1 AND address_2.email_address = :email_address_2

相关文章

网友评论

      本文标题:3. 数据操作

      本文链接:https://www.haomeiwen.com/subject/cvacdrtx.html