python etl oracle导出 2
2016-01-08 16:57
218 查看
python etl oracle导出 2
经过上篇的工作,目前环境已经可以正常使用了。下面开始代码编写的过程思路如下:
获取数据库中的数据
处理每行数据,生成指定的文件格式
写入到指定输出文件中
下面根据以上思路进行开发
获取数据库中的数据
获取数据这块使用的是sqlalchemy库。因为是纯底层的数据导出工作。所以没有必要使用orm。而是使用原生sql的方式来完成的。步骤很简单后面可以参考代码。但是需要先解决一个问题。就是表的字段太多了。一个一个写好累呀。。。于是先解决这个问题。获取数据库表中的字段。
这块就是纯属不想手动去抄。写了个小程序。首先用sqlplus获取表的说明并保存为文件db_files.txt。desc table_name
生成的文件中间有很多空格。于是解决空格。。。把空格解决了。生成了一个列表中。列表中包括一系列字典。字典有两个字段name,type。type没有用。可能以后用的着先留着。代码如下:
# 这块是去掉中间空格的。 def strip_mid(line): pre = "" cur = "" str = "" for char in line: cur = char if (cur == pre) and (cur == " "): continue else: str += cur pre = cur return str def load_table_info(path): with open(path) as f: fields_list = [] for line in f: str = strip_mid(line) fields = str.split(' ') fields_size = len(fields) name = "" # 这块是比较讨厌的。有的字段说明中有not null。所以要单独解决一下。 if fields_size == 2: name,type = fields elif fields_size == 4: name,null,null,type = fields fields_dict = {} fields_dict['name'] = name fields_dict['type'] = type fields_list.append(fields_dict) return fields_list
好了。有了上面的代码我们就有了字段的信息了。下面就可以进行实际处理了。也分享一下代码。之后放到github上去。
# -*- coding: utf-8 -*- import logging import sys from sqlalchemy import create_engine from parse_db_fields import load_table_info # python2.7默认是ascii。需要设置一下。 reload(sys) sys.setdefaultencoding('utf-8') logger = logging.getLogger(__name__) class DBUtils(object): # 这个是连接串。隐去了。 conn_str = 'oracle+cx_oracle://。。。' def __init__(sefl): pass def test(self): logger.info('test') logger.info('connect db with:%s' % self.conn_str) self.engine = create_engine(self.conn_str, echo=True) self.conn = self.engine.connect() fields_list = load_table_info('db_files.txt') field_names = [ field['name'] for field in fields_list ] # 表名也隐去了。 sql = "select " + ",".join(field_names) + " from tableAAA" logger.info("%s,the sql will execute" % sql) result = self.conn.execute(sql) with open('dst.dat', 'w') as f: for row in result: values = [ " " if value is None else str(value) for value in row] f.write("|+|".join(values)) f.write('\n') self.conn.close() if __name__ == '__main__': logging.basicConfig(level=logging.INFO) logger.info('beging test db test model') db_utils = DBUtils() db_utils.test()
好了。大功告成。但是速度不是很快呀。2000w的数据量。跑了2个小时左右。下周优化吧。
相关文章推荐
- Python动态类型的学习---引用的理解
- Python3写爬虫(四)多线程实现数据爬取
- 垃圾邮件过滤器 python简单实现
- 下载并遍历 names.txt 文件,输出长度最长的回文人名。
- install and upgrade scrapy
- Scrapy的架构介绍
- Centos6 编译安装Python
- 使用Python生成Excel格式的图片
- 让Python文件也可以当bat文件运行
- [Python]推算数独
- Python中zip()函数用法举例
- Python中map()函数浅析
- Python将excel导入到mysql中
- Python在CAM软件Genesis2000中的应用
- 使用Shiboken为C++和Qt库创建Python绑定
- FREEBASIC 编译可被python调用的dll函数示例
- Python 七步捉虫法