您的位置:首页 > 数据库

sqlalchemy 笔记

2016-03-27 23:18 661 查看

初始化数据库连接

DB_CONNECT_STRING = 'mysql+mysqldb://root:123123@localhost/sqlalchemy?charset=utf8'
engine = create_engine(DB_CONNECT_STRING, echo=True)


数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名


创建DBSession

DB_Session = sessionmaker(bind=engine)
session = DB_Session()


执行 SQL

session.execute('create database abc')
print session.execute('show databases').fetchall()
session.execute('use mysql')
#查询所有
print session.execute('select * from user').fetchall(), "fetchall"
#第一条
print session.execute('select * from user').first(), "first"
#第一条
print session.execute('select * from user').fetchone(), "fetchone"
#查询2条记录
print session.execute('select * from user').fetchmany(2), "fetchmany"


Model

#declarative_base() 创建了一个 BaseModel 类,这个类的子类可以自动与一个表关联。
BaseModel = declarative_base()
class User(BaseModel):
# 表的名字:
__tablename__ = 'user'

# 表的结构:
id = Column(Integer, primary_key=True)
name = Column(String(20), nullable=False, unique=True)
test = Column(String(20), name="a1", default="a1")
email_address = Column(String(32), nullable=False, default="email")

def __repr__(self):
return '<User id=%s,name=%s,test=%s,email_address=%s >' % (self.id, self.name, self.test, self.email_address)


primary_key 主键,自增

unique 唯一 UNIQUE (name)

name 数据库字段名字

nullable 是否可以为空 False NOT NULL

default 默认值

表属性

class User(BaseModel):
__table_args__ = {
'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8'
}


建表 or 删除表

#建表
User.metadata.create_all(engine) or  BaseModel.metadata.create_all(engine) #创建所有BaseModel的之类的表
#删除表
User.metadata.drop_all(engine) or BaseModel.metadata.drop_all(engine) #同上


使用

u1 = User(name="aa", email_address="aa_email")
u2 = User(name="bb")
session.add(u1)
session.add(u2)
session.commit()#提交会话

query = session.query(User)
for user in query:
print user
print query.all()  # 查询所有
print query.offset(1).limit(2).all()  # LIMIT 1, 2
print query.first()  # 第一条,不存在返回None
# print query.one() #之获取一条,不存在,或有多行记录时会抛出异常
print query.filter(User.id == 2).all()  # WHERE "user".id = 2
print query.filter(User.id >= 1).all()  # WHERE "user".id >= 1
print query.filter(User.id == User.name).all()  # WHERE user.id = user.name
print query.filter(User.id >= 1).filter(User.id <= 11).all()  # WHERE "user".id >= 1 AND user.id <= 11
print query.filter(User.id >= 1, User.id <= 11).all()  # 同上
print query.filter(or_(User.id >= 1, User.id <= 11),
User.name == "111").all()  # (user.id >= 1 OR user.id <=11) AND user.name = 111
print query.filter(User.id.in_((1, 2))).all()  # user.id IN (1, 2)
print query.filter(User.name.like("a")).all()  # user.name LIKE "a"
print query.filter(User.name.isnot(None)).all()  # WHERE user.name IS NOT NULL
print query.filter(User.name != None).all()  # WHERE user.name IS NOT NULL
print query.filter(not_(User.id != 1), not_(User.name.like("a"))).all()  # WHERE user.id = 1 AND user.name NOT LIKE a
print query.filter("id is null").all()  # 支持字符串
print query.get(2)  # 以主键获取,WHERE "user".id = 2
print query.order_by("name")  # ORDER BY "user".name
print query.order_by(User.name)  # 同上
print query.order_by(User.name.desc())  # ORDER BY "user".name desc
# print query.order_by("name desc")# 同上
print query.order_by(User.name.desc(), User.id.desc())  # ORDER BY name desc, id desc
print query.count()
print "#" * 30
#修改
# query.filter(User.id == 1).update({User.name: 'c'})
print "#" * 30

query2 = session.query(User).order_by(User.name.desc(), User.id.desc())
print query2.all()
print "#" * 30

# 返回元组
query3 = session.query(User.name)
print query3.all()
print query3.filter(User.id == 1).scalar()  # 如果有记录,返回第一条记录的第一个元素

print session.query('id', "name").select_from(User).filter(
User.id == 1).scalar()  # SELECT id, name FROM user WHERE user.id = 1
print "#" * 30

print session.query(func.count('1')).select_from(User).scalar()
print session.query(User).filter(User.name == func.now())#WHERE "user".name = now()
print session.query(func.count(User.id)).select_from(User).scalar()
print session.query(func.sum(User.id)).select_from(User).scalar()

print session.query(func.now()).scalar()  # func 后可以跟任意函数名,只要该数据库支持
print session.query(User.id, func.md5(User.name)).filter(User.id == 1).all()

u = session.query(User).get(1)
print u
u.name = "1111111111"
session.flush()# 写数据库,但并不提交
print session.query(User).get(1)
session.rollback()#返回
print session.query(User).get(1)
u.name = "1111111111"
print query.all()
#优先操作 HIGH_PRIORITY  滞后操作 LOW_PRIORITY 延时插入 INSERT DELAYED
session.query(User.name).prefix_with('HIGH_PRIORITY').all()#SELECT HIGH_PRIORITY user.name AS user_name FROM user
print query.all()

# session.delete(session.query(User).get(2))
# session.delete(session.query(User).first())
# session.query(User).filter(User.id >= 1).delete()
session.commit()


外建

class User(BaseModel):
# 表的名字:
__tablename__ = 'user'

# 表的结构:
id = Column(Integer, primary_key=True)
name = Column(String(20), nullable=False, unique=True)
test = Column(String(20), name="a1", default="a1")
email_address = Column(String(32), nullable=False, default="email")
addresses = relationship("Address", order_by="Address.id", backref="user")

def __repr__(self):
return '<User id=%s,name=%s,test=%s,email_address=%s >' % (self.id, self.name, self.test, self.email_address)

class Address(BaseModel):
__tablename__ = 'address'

id = Column(Integer, primary_key=True)
address = Column(String(20), nullable=False, unique=True)
user_id = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))

def __repr__(self):
return '<User id=%s,name=%s,test=%s,email_address=%s >' % (self.id, self.address)


user_id = Column(Integer, ForeignKey(‘user.id’, ondelete=’CASCADE’, onupdate=’CASCADE’)) 原因是删除 user 表的数据,可能会导致 address 的外键不指向一个真实存在的记录。在默认情况下,MySQL 会拒绝这种操作,也就是 RESTRICT。InnoDB 还允许指定 ON DELETE 为 CASCADE 和 SET NULL,前者会删除 friendship 中无效的记录,后者会将这些记录的外键设为 NULL。

relationship 标记User的addresses连接到Address表,session.query(User).filter(User.name == “bb”).one().addresses

backref 反向查询的细节 session.query(Address).filter(Address.address==”a1”).one().user

u1 = User(name="aa", email_address="aa_email")
u2 = User(name="bb")

a1 = Address(address="a1")
a2 = Address(address="a2")
a3 = Address(address="a3")

u2.addresses = [a1, a2]
session.add(u1)
session.add(u2)
session.add(a3)
session.flush()

session.commit()
print session.query(User).filter(User.name == "bb").one().addresses
print session.query(Address).filter(Address.address=="a1").one().user

print session.query(User).all()
print session.query(Address).all()


多对多

user_keywords = Table('user_keywords',
BaseModel.metadata,
Column('user_id', Integer, ForeignKey('user.id')),
Column('keyword_id', Integer, ForeignKey('keyword.id'))
)

class User(BaseModel):
# 表的名字:
__tablename__ = 'user'

# 表的结构:
id = Column(Integer, primary_key=True)
name = Column(String(20), nullable=False, unique=True)
test = Column(String(20), name="a1", default="a1")
email_address = Column(String(32), nullable=False, default="email")
keywords = relation("Keyword", secondary=user_keywords, backref="user")

def __repr__(self):
return '<User id=%s,name=%s,test=%s,email_address=%s >' % (self.id, self.name, self.test, self.email_address)

class Keyword(BaseModel):
__tablename__ = 'keyword'

id = Column(Integer, primary_key=True)
keyword = Column(String(20), nullable=False, unique=True)
user = relation("User", secondary=user_keywords, backref="user")

def __repr__(self):
return '<User id=%s,keyword=%s>' % (self.id, self.keyword)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  sqlalchemy python