您的位置:首页 > 数据库 > Oracle

python etl oracle导出 2

2016-01-08 16:57 218 查看

python etl oracle导出 2

经过上篇的工作,目前环境已经可以正常使用了。下面开始代码编写的过程

思路如下:

获取数据库中的数据

处理每行数据,生成指定的文件格式

写入到指定输出文件中

下面根据以上思路进行开发

获取数据库中的数据

获取数据这块使用的是sqlalchemy库。因为是纯底层的数据导出工作。所以没有必要使用orm。而是使用原生sql的方式来完成的。步骤很简单后面可以参考代码。但是需要先解决一个问题。就是表的字段太多了。一个一个写好累呀。。。于是先解决这个问题。

获取数据库表中的字段。

这块就是纯属不想手动去抄。写了个小程序。首先用sqlplus获取表的说明并保存为文件db_files.txt。

desc table_name


生成的文件中间有很多空格。于是解决空格。。。把空格解决了。生成了一个列表中。列表中包括一系列字典。字典有两个字段name,type。type没有用。可能以后用的着先留着。代码如下:

# 这块是去掉中间空格的。
def strip_mid(line):
pre = ""
cur = ""
str = ""
for char in line:
cur = char
if (cur == pre) and (cur == " "):
continue
else:
str += cur
pre = cur
return str

def load_table_info(path):
with open(path) as f:
fields_list = []
for line in f:
str = strip_mid(line)
fields = str.split(' ')
fields_size = len(fields)
name = ""
# 这块是比较讨厌的。有的字段说明中有not null。所以要单独解决一下。
if fields_size == 2:
name,type = fields
elif fields_size == 4:
name,null,null,type = fields
fields_dict = {}
fields_dict['name'] = name
fields_dict['type'] = type
fields_list.append(fields_dict)
return fields_list


好了。有了上面的代码我们就有了字段的信息了。下面就可以进行实际处理了。也分享一下代码。之后放到github上去。

# -*- coding: utf-8 -*-
import logging
import sys

from sqlalchemy import create_engine

from parse_db_fields import load_table_info

# python2.7默认是ascii。需要设置一下。
reload(sys)
sys.setdefaultencoding('utf-8')
logger = logging.getLogger(__name__)

class DBUtils(object):
# 这个是连接串。隐去了。
conn_str = 'oracle+cx_oracle://。。。'

def __init__(sefl):
pass

def test(self):
logger.info('test')
logger.info('connect db with:%s' % self.conn_str)
self.engine = create_engine(self.conn_str, echo=True)
self.conn   = self.engine.connect()

fields_list = load_table_info('db_files.txt')
field_names = [ field['name'] for field in fields_list ]
# 表名也隐去了。
sql = "select " + ",".join(field_names) + " from tableAAA"
logger.info("%s,the sql will execute" % sql)

result = self.conn.execute(sql)

with open('dst.dat', 'w') as f:
for row in result:
values = [ " " if value is None else str(value) for value in row]
f.write("|+|".join(values))
f.write('\n')
self.conn.close()

if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
logger.info('beging test db test model')
db_utils = DBUtils()
db_utils.test()


好了。大功告成。但是速度不是很快呀。2000w的数据量。跑了2个小时左右。下周优化吧。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  python