美文网首页
sqlalchemy

sqlalchemy

作者: 樊海鹏 | 来源:发表于2017-06-10 10:45 被阅读0次

https://www.zybuluo.com/zwenqiang/note/111570
https://www.zouyesheng.com/sqlalchemy.html
http://www.jb51.net/article/49789.htm
https://blog.csdn.net/abcd1f2/article/details/51395561

db.session.query(BIUser.user_id).filter(BIUser.username == user_index).first()
  
如果结果是[(1,2,3),(4,5,6),(7,8,9)]  
取多行的第一行,即(1,2,3),相当于all()[0]
但是请注意即使是一个元素,也会得到(1,)


 db.session.query(BIUser.user_id).filter(BIUser.username == user_index).scalar()

如果结果是[(1,2,3),(4,5,6),(7,8,9)]  会报错。
如果结果是[(1,2,3)] ,结果就是1。


 id, gold_balance, reg_time = db.session.query(BIUser.id, BIUser.gold_balance, BIUser.reg_time).filter(
        BIUser.user_id == user_id).first()

取出的时间类型,可以直接这样转换
 reg_time= reg_time.strftime('%Y%m%d'),

# coding: utf-8  
  
from sqlalchemy import *   
import tushare as ts  
import pandas as pd  
from sqlalchemy.orm import sessionmaker,mapper  
from datetime import *  
  
engine = create_engine('mysql+pymysql://root:123456@127.0.0.1/mystock?charset=utf8')  
     
#%%  1  hand-written SQL 方法  
result = engine.execute('select * from stock_basics where pe < %s', 2)  
  
# sqlalchemy推荐使用text()函数封装一下sql字符串,不同数据库, 可以使用统一的sql参数传递写法. 参数须以:号引出.  
result = engine.execute(text('select * from stock_basics where pe < :pe'), {'pe': 2})  
  
# 遍历result时, 得到的每一个行都是RowProxy对象, 获取字段的方法非常灵活, 下标和字段名甚至属性都行.  
# rowproxy[0] == rowproxy['id'] == rowproxy.id  
ans = result.fetchall() # 获取所有数据  
ans1 = pd.DataFrame(ans) # 将数据转成 DataFrame格式  
  
#  事务处理  
conn = engine.connect()  
conn.begin()  
try:  
    dosomething(connection)  
    conn.commit()  
except:     
    conn.rollback()    
conn.close()    
  
    
    #%%  SQL-expressions in Python 方法  
meta = MetaData(bind=engine, reflect=True)  
table = meta.tables['stock_basics']  
result2 = list(engine.execute(table.select(table.c.pe < 2)))   # pe为stock_basics的一个列名  
  
#%% ORM 方法   表中要有主键  
engine.echo = True  # We want to see the SQL we're creating  
metadata = MetaData(engine)  
  
# The stock_basics table already exists, so no need to redefine it. Just  
# load it from the database using the "autoload" feature.  
users = Table('stock_basics', metadata, autoload=True)  
  
def run(stmt):  
    rs = stmt.execute()  
    for row in rs:  
        print(row)  
  
# Most WHERE clauses can be constructed via normal comparisons  
s = users.select(users.c.code == '000001')  
run(s)  
s = users.select(users.c.pe < 1)  # pe为stock_basics的一个列名  
rs = s.execute().fetchall()  
ans2 = pd.DataFrame(rs)    #将结果转换成 DataFrame格式  
  
# Python keywords like "and", "or", and "not" can't be overloaded, so  
# SQLAlchemy uses functions instead  
s = users.select(and_(users.c.age < 40, users.c.name != 'Mary'))  
s = users.select(or_(users.c.age < 40, users.c.name != 'Mary'))  
s = users.select(not_(users.c.name == 'Susan'))  
  
# Or you could use &, | and ~ -- but watch out for priority!  
s = users.select((users.c.age < 40) & (users.c.name != 'Mary'))  #最好添加(),注意优先级  
s = users.select((users.c.age < 40) | (users.c.name != 'Mary'))  
s = users.select(~(users.c.name == 'Susan'))  
  
# There's other functions too, such as "like", "startswith", "endswith"  
s = users.select(users.c.name.startswith('M'))  
s = users.select(users.c.name.like('%a%'))  
s = users.select(users.c.name.endswith('n'))  
  
# The "in" and "between" operations are also available  
s = users.select(users.c.age.between(30,39))  
# Extra underscore after "in" to avoid conflict with Python keyword  
s = users.select(users.c.name.in_('Mary', 'Susan'))  
  
# If you want to call an SQL function, use "func"  
s = users.select(func.substr(users.c.name, 2, 1) == 'a')  
  
# You don't have to call select() on a table; it's got a bare form  
s = select([users], users.c.name != 'Carl')  
s = select([users.c.name, users.c.age], users.c.name != 'Carl')  
   
# This can be handy for things like count()  
s = select([func.count(users.c.user_id)])  
# Here's how to do count(*)  
s = select([func.count("*")], from_obj=[users])  
#%%多表联查  
#    现在存在两个表  
users = Table('users', metadata,  
    Column('user_id', Integer, primary_key=True),  
    Column('name', String(40)),  
    Column('age', Integer),)  
users.create()  
#    emails = Table('emails', metadata,  
#        Column('email_id', Integer, primary_key=True),  
#        Column('address', String),  
#        Column('user_id', Integer, ForeignKey('users.user_id')),)  
s = select([users, emails], emails.c.user_id == users.c.user_id)  
# 查询部分列  
s = select([users.c.name, emails.c.address], emails.c.user_id == users.c.user_id)  
#基于外键的李娜和查询  
s = join(users, emails).select()  
#使用 outerjoin 查询所有用户,不论是否有邮箱  
s = outerjoin(users, emails).select()  
  
#%% 将数据库中的对象映射到对象中  
users = Table('users', metadata, autoload=True)  
# These are the empty classes that will become our data classes  
class User(object):  
    pass  
  
usermapper = mapper(User, users)  
session = DBSession()  
#  查询 -----------------  
query = session.query(User)  
print(query) # 显示SQL 语句  
print(query.statement) # 同上  
for user in query: # 遍历时查询  
    print(user.name)  
print(query.all()) # 返回的是一个类似列表的对象  
print(query.first().name) # 记录不存在时,first() 会返回 None  
# print(query.one().name) # 不存在,或有多行记录时会抛出异常  
print(query.filter(User.id == 2).first().name)  
print(query.get(2).name) # 以主键获取,等效于上句  
print(query.filter('id = 2').first().name) # 支持字符串  
query2 = session.query(User.name)  
print(query2.all()) # 每行是个元组  
print(query2.limit(1).all()) # 最多返回 1 条记录  
print(query2.offset(1).all()) # 从第 2 条记录开始返回  
print(query2.order_by(User.name).all())  
print(query2.order_by('name').all())  
print(query2.order_by(User.name.desc()).all())  
print(query2.order_by('name desc').all())  
print(session.query(User.id).order_by(User.name.desc(), User.id).all())  
print(query2.filter(User.id == 1).scalar()) # 如果有记录,返回第一条记录的第一个元素  
print(session.query('id').select_from(User).filter('id = 1').scalar())  
print(query2.filter(User.id > 1, User.name != 'a').scalar()) # and  
query3 = query2.filter(User.id > 1) # 多次拼接的 filter 也是 and  
query3 = query3.filter(User.name != 'a')  
print(query3.scalar())  
print(query2.filter(or_(User.id == 1, User.id == 2)).all()) # or  
print(query2.filter(User.id.in_((1, 2))).all()) # in  
query4 = session.query(User.id)  
print(query4.filter(User.name == None).scalar())  
print(query4.filter('name is null').scalar())  
print(query4.filter(not_(User.name == None)).all()) # not  
print(query4.filter(User.name != None).all())  
print(query4.count())  
print(session.query(func.count('*')).select_from(User).scalar())  
print(session.query(func.count('1')).select_from(User).scalar())  
print(session.query(func.count(User.id)).scalar())  
print(session.query(func.count('*')).filter(User.id > 0).scalar()) # filter() 中包含 User,因此不需要指定表  
print(session.query(func.count('*')).filter(User.name == 'a').limit(1).scalar() == 1) # 可以用 limit() 限制 count() 的返回数  
print(session.query(func.sum(User.id)).scalar())  
print(session.query(func.now()).scalar()) # func 后可以跟任意函数名,只要该数据库支持  
print(session.query(func.current_timestamp()).scalar())  
print(session.query(func.md5(User.name)).filter(User.id == 1).scalar())  
# 修删------  
query.filter(User.id == 1).update({User.name: 'c'})  
user = query.get(1)  
print(user.name)  
user.name = 'd'  
session.flush() # 写数据库,但并不提交  
print(query.get(1).name)  
session.delete(user)  
session.flush()  
session.rollback()  # 回滚  
query.filter(User.id == 1).delete()  
session.commit()  #提交,保存到数据库  
print query.get(1)  
session.close()  # 关闭session

## 处理查询结果:


 list(db.session.query(PromotionPushHistory.id).filter(PromotionPushHistory.push_id==push_id).all()) 
= result.fetchall()
= list(result_proxy = db.engine.execute(text(cron_sql))
    

[row['xxx'] for row in result_proxy]

[row[0] for row in result_proxy]

[row.xxx for row in result _proxy]

如果是单列,避免总是处理[(0,),(0,)(1,)]

label_names = db.session.query(AdminUserQuery.label_name).filter_by(is_custom_label=True)
 label_names  = list(zip(*label_names))[0]


class WPTUserAvatar(db.Model):
    
     __bind_key__ = 'orig_wpt'  # 绑定到的数据库
    __tablename__ = 'avatar_user_assign'

    avatar_id = db.Column('avatar_id', db.Integer, primary_key=True)
    user_id = db.Column('user_id', db.BIGINT, primary_key=True)
    user_name = db.Column('role_name', db.String)
    lz_account = db.Column('lz_account', db.String)
    is_current_avatar = db.column('is_current_avatar', db.Boolean)

https://www.zouyesheng.com/sqlalchemy.html#toc10

ORM 的添加和更新



query = db.session.query(AdminUserQuery).filter_by(sql_key=sql_key).first()

没有添加

if query is None:
        query = AdminUserQuery(
            target_db=database,
            sql=formatted_sql,
            sql_key=sql_key,
            status=ADMIN_USER_QUERY_STATUSES.PENDING.value,
            admin_user_id=current_user.id
        )

        db.session.add(query)
        db.session.commit()

否则更新

    else:
        query.status = ADMIN_USER_QUERY_STATUSES.PENDING.value
        query.rows = None
        query.error_message = None
        query.run_time = None

# 更新用flush

        db.session.flush()
        db.session.commit()

    query_id = query.id

关联查询join


label_names = db.session.query
(AdminUserQuery.id, AdminUser.name, AdminUserQuery.label_name,AdminUserQuery.display).
join(AdminUser,  AdminUserQuery.admin_user_id == AdminUser.id).
filter(AdminUserQuery.is_custom_label==True).all()

取列名

        column_names = BIUser.__table__.columns.keys()

相关文章

网友评论

      本文标题:sqlalchemy

      本文链接:https://www.haomeiwen.com/subject/asmqqxtx.html