您的位置:首页 > 数据库

sqlalchemy一些用法整理

2017-02-18 09:55 302 查看

1  汇聚功能

查询某一个字段,相同值的个数

例子:

数据库包含字段(表明test):

字段

 含义

id

自增长(int)

name

名称(string)

type

类型(string)

 

统计name在表test中的相同值的个数:

Sql语句:

select name, count(1) from test where type=1group by name;

 

sqlalchemy语句:

result = session.query(models.Test.name,func.count('*').label("name")).filter_by(type=’xxx’).all()

 

在python中,result是一个包含多个元组的列表([(‘name1’, 10), (‘name2’,11)])。

 

2  group by

数据模型:


class Comsumption(ComSumBase):
"""Represents a Charge."""

__tablename__ = 'comsumption'
__table_args__ = (
# schema.UniqueConstraint('uuid', name='uniq_bay0uuid'),
table_args()
)
id = Column(Integer, primary_key=True)
resource_type = Column(String(36))
cost = Column(Numeric(20, 8), default=0.00)
start_time = Column(DateTime)
end_time = Column(DateTime)


需要导入数据:

from sqlalchemy.sql import func

from oslo_utils import timeutils

import datetime


sqlarchemy查询函数语句:

def sanitize_timestamp(timestamp):
"""Return a naive utc datetime object."""
if not timestamp:
return timestamp
if not isinstance(timestamp, datetime.datetime):
timestamp = timeutils.parse_isotime(timestamp)
return timeutils.normalize_time(timestamp)

def format_filter(domain_id=None, project_id=None, user_id=None, start_time=None, end_time=None):
filter = {}
if domain_id:
filter["domain_id"] = domain_id
if project_id:
filter["project_id"] = project_id
if user_id:
filter["user_id"] = user_id
if start_time:
filter["start_time"] = start_time
if end_time:
filter["end_time"] = end_time

return filter

def test_group_by(self, domain_id=None, project_id=None, user_id=None, start_time=None, end_time=None):
"""
"""
session = get_session()
filter_dict = self.format_filter(domain_id=domain_id, project_id=project_id, user_id=user_id,
start_time=start_time, end_time=end_time)

query = session.query(models.Comsumption.resource_type, (func.sum(models.Comsumption.cost)))

if "end_time" in filter_dict:
end_time = self.sanitize_timestamp(filter_dict["end_time"])
filter_dict.pop("end_time", None)
query = query.filter(Comsumption.start_time <= end_time)
if "start_time" in filter_dict:
start_time = self.sanitize_timestamp(filter_dict["start_time"])
filter_dict.pop("start_time", None)
query = query.filter(Comsumption.start_time >= start_time)

query = query.filter_by(**filter_dict).group_by(Comsumption.resource_type)

result = query.all()
LOG.debug("QUERY: resource_type_cost = %s" % result)
return result


最终 result的结果:

[('test-001', Decimal('315.00000000')), ('test-002', Decimal('5.40000000'))]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: