源系统表结构比对跟踪并进行邮件发送
2016-04-01 07:05
288 查看
在这里源系统均为mysql数据库,并将每日的表结构信息拉取到目标mysql数据库中;在目标mysql库中通过存储过程代码查询出两日内表结构之间的差异,并通过python脚本使用邮件将相关信息发送给相关人员。
1、数据拉取、装载、mysqlsp运算及邮件发送调度的shell脚本
路径:/home/hs/opt/dw-etl/tabsrc_strurecord/srctab_stru_pull.sh
srctab_stru_pull.sh
2、目标mysql上的存储过程及相关表结构
2.1、src_tab_stru_info(sor库)
2.2、srctab_change_info(dm库)
2.3、p_srctab_change_info(dm库)
3、获取信息发送邮件的python脚本
路径:/home/hs/opt/dw-etl/tabsrc_strurecord/srctab_change_sendmail.py
srctab_change_sendmail.py
4、定时每日进行数据的处理及邮件的发送
9 3 * * * sh /home/hs/opt/dw-etl/dw_batch.sh
cat /home/hs/opt/dw-etl/dw_batch.sh
1、数据拉取、装载、mysqlsp运算及邮件发送调度的shell脚本
路径:/home/hs/opt/dw-etl/tabsrc_strurecord/srctab_stru_pull.sh
srctab_stru_pull.sh
#!/bin/bash export yesterday=`date -d last-day +%Y-%m-%d` rm -rf /home/hs/opt/dw-etl/tabsrc_strurecord/db99src_$yesterday.dat #db99huanx src tab stru data pull ... /usr/local/bin/mysql -hsrcipaddress.mysql.rds.aliyuncs.com -udbreader -piloveyou -N -e"select CONCAT( 'rdsb0o1vpjxpecbe9uq0.mysql.rds.aliyuncs.com' ,'|', ifnull(replace(replace(replace(a1.TABLE_SCHEMA,char(13),''),char(10),''),'|',''),'') ,'|', ifnull(replace(replace(replace(a1.TABLE_NAME,char(13),''),char(10),''),'|',''),'') ,'|', ifnull(replace(replace(replace(a1.TABLE_COMMENT,char(13),''),char(10),''),'|',''),'') ,'|', ifnull(a1.TABLE_TYPE,'') ,'|', ifnull(a1.\`ENGINE\`,'') ,'|', ifnull(a1.CREATE_TIME,'') ,'|', ifnull(a1.UPDATE_TIME,'') ,'|', ifnull(a1.TABLE_COLLATION,'') ,'|', ifnull(replace(replace(replace(a2.COLUMN_NAME,char(13),''),char(10),''),'|',''),'') ,'|', ifnull(replace(replace(replace(a2.COLUMN_COMMENT,char(13),''),char(10),''),'|',''),'') ,'|', ifnull(a2.ORDINAL_POSITION,'') ,'|', ifnull(a2.COLUMN_TYPE,'') ,'|', ${yesterday//-/} ,'|', NOW()) src_tab_stru_info from information_schema.\`TABLES\` a1 left join information_schema.\`COLUMNS\` a2 on a1.TABLE_SCHEMA=a2.TABLE_SCHEMA and a1.TABLE_NAME=a2.TABLE_NAME where a1.TABLE_SCHEMA in('db99huanx','db99yinping'); " >> /home/hs/opt/dw-etl/tabsrc_strurecord/db99src_$yesterday.dat #db99finance src tab stru data pull ... ......(同上一段) /usr/local/mysql/bin/mysql -hipaddress.mysql.rds.aliyuncs.com -udatauser -piloveyou -e "use sor;delete from src_tab_stru_info where data_date=${yesterday//-/};load data local infile '/home/hs/opt/dw-etl/tabsrc_strurecord/db99src_$yesterday.dat' into table src_tab_stru_info fields terminated by '|' enclosed by '' lines terminated by '\n' ignore 0 lines;" /usr/local/mysql/bin/mysql -hipaddress.mysql.rds.aliyuncs.com -udatauser -piloveyou -e "use dm;call p_srctab_change_info(99);" python /home/hs/opt/dw-etl/tabsrc_strurecord/srctab_change_sendmail.py
2、目标mysql上的存储过程及相关表结构
2.1、src_tab_stru_info(sor库)
CREATE TABLE `src_tab_stru_info` ( `IP_ADDRESS` varchar(43) NOT NULL DEFAULT '', `TABLE_SCHEMA` varchar(64) NOT NULL DEFAULT '', `TABLE_NAME` varchar(64) NOT NULL DEFAULT '', `TABLE_COMMENT` text NOT NULL, `TABLE_TYPE` varchar(64) NOT NULL DEFAULT '', `DB_ENGINE` varchar(64) NOT NULL DEFAULT '', `CREATE_TIME` varchar(19) NOT NULL DEFAULT '', `UPDATE_TIME` varchar(19) NOT NULL DEFAULT '', `TABLE_COLLATION` varchar(32) NOT NULL DEFAULT '', `COLUMN_NAME` varchar(64) NOT NULL DEFAULT '', `COLUMN_COMMENT` text NOT NULL, `COLUMN_ORDINAL_POSITION` varchar(21) NOT NULL DEFAULT '', `COLUMN_TYPE` longtext NOT NULL, `DATA_DATE` int(8) NOT NULL DEFAULT '0', `ETL_TIME` datetime NOT NULL DEFAULT '0000-00-00 00:00:00', KEY `idx_src_tab` (`IP_ADDRESS`,`TABLE_SCHEMA`,`TABLE_NAME`,`COLUMN_NAME`,`DATA_DATE`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT=' 源系统表结构信息';
2.2、srctab_change_info(dm库)
CREATE TABLE `srctab_change_info` ( `CHANGE_DATE` date NOT NULL DEFAULT '0000-00-00', `ADD_TYPE` varchar(80) NOT NULL DEFAULT '', `IP_ADDRESS` varchar(100) NOT NULL DEFAULT '', `TABLE_SCHEMA` varchar(100) NOT NULL DEFAULT '', `TABLE_NAME` varchar(100) NOT NULL DEFAULT '', `COLUMN_NAME` varchar(100) NOT NULL DEFAULT '', `COLUMN_TYPE` longtext NOT NULL, `ETL_TIME` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2.3、p_srctab_change_info(dm库)
CREATE DEFINER=`datauser`@`%` PROCEDURE `p_srctab_change_info`(IN `p_etl_date` datetime) BEGIN #declare variable and set #set @v_etldate=CURDATE(); set @v_etldate=date_sub(CURDATE(),interval 1 day); #set @v_etldate=p_etl_date; #delete data from srctab_change_info delete from srctab_change_info where CHANGE_DATE=@v_etldate; #insert data to srctab_change_info insert into srctab_change_info select @v_etldate CHANGE_DATE,'column' ADD_TYPE,a1.IP_ADDRESS,a1.TABLE_SCHEMA,a1.TABLE_NAME,a1.COLUMN_NAME,a1.COLUMN_TYPE,now() ETL_TIME from (select * from sor.src_tab_stru_info where DATA_DATE=date_sub(@v_etldate,interval 0 day)) a1 LEFT JOIN (select * from sor.src_tab_stru_info where DATA_DATE=date_sub(@v_etldate,interval 1 day)) a2 on a1.IP_ADDRESS=a2.IP_ADDRESS and a1.TABLE_SCHEMA=a2.TABLE_SCHEMA and a1.TABLE_NAME=a2.TABLE_NAME and a1.COLUMN_NAME=a2.COLUMN_NAME where a2.COLUMN_NAME is null union all select @v_etldate CHANGE_DATE,'table' ADD_TYPE,a1.IP_ADDRESS,a1.TABLE_SCHEMA,a1.TABLE_NAME,null COLUMN_NAME,null COLUMN_TYPE,now() ETL_TIME from (select distinct IP_ADDRESS,TABLE_SCHEMA,TABLE_NAME from sor.src_tab_stru_info where DATA_DATE=date_sub(@v_etldate,interval 0 day)) a1 LEFT JOIN (select distinct IP_ADDRESS,TABLE_SCHEMA,TABLE_NAME from sor.src_tab_stru_info where DATA_DATE=date_sub(@v_etldate,interval 1 day)) a2 on a1.IP_ADDRESS=a2.IP_ADDRESS and a1.TABLE_SCHEMA=a2.TABLE_SCHEMA and a1.TABLE_NAME=a2.TABLE_NAME where a2.TABLE_NAME is null union all select @v_etldate CHANGE_DATE,'nochange' ADD_TYPE,null IP_ADDRESS,null TABLE_SCHEMA,null TABLE_NAME,null COLUMN_NAME,null COLUMN_TYPE,now() ETL_TIME ; END
3、获取信息发送邮件的python脚本
路径:/home/hs/opt/dw-etl/tabsrc_strurecord/srctab_change_sendmail.py
srctab_change_sendmail.py
# -*- coding=utf-8 -*- import smtplib import MySQLdb import json import warnings import datetime from email.header import Header from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart warnings.filterwarnings("ignore") db_config = { 'host': 'rdsipaddress.mysql.rds.aliyuncs.com', 'user': 'datauser', 'passwd': 'iloveyou', 'port': 3306, 'db': 'dm' } def getDB(): try: conn = MySQLdb.connect(host=db_config['host'],user=db_config['user'],passwd=db_config['passwd'],port=db_config['port']) conn.autocommit(True) curr = conn.cursor() curr.execute("SET NAMES utf8"); curr.execute("USE %s" % db_config['db']); return conn, curr except MySQLdb.Error,e: print "Mysql Error %d: %s" % (e.args[0], e.args[1]) return None, None conn,curr = getDB() today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) tomorrow = today + datetime.timedelta(days=1) print today print yesterday sql_text = "select count(*) from srctab_change_info where add_type in('column','table') and CHANGE_DATE='%s';" % (yesterday) curr.execute(sql_text) chg = curr.fetchall() for chg_num in chg: chg_num0 = chg_num[0] print chg_num0 if (chg_num0 >= 1): sql_text = "select CHANGE_DATE,ADD_TYPE,IP_ADDRESS,TABLE_SCHEMA,TABLE_NAME,COLUMN_NAME,COLUMN_TYPE from srctab_change_info where add_type in('column','table') and CHANGE_DATE='%s';" % (yesterday) curr.execute(sql_text) html_data = "<tr style='font-weight:bold;'><td>变动日期</td><td>变动类型</td><td>IP地址</td><td>模式</td><td>表名</td><td>列名</td><td>列类型</td></tr>" items = curr.fetchall() for item in items: item0 = str(item[0]) item1 = str(item[1]) item2 = item[2] item3 = item[3] item4 = str(item[4]) item5 = str(item[5]) item6 = str(item[6]) html_data += "<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s</td></tr>" % (item0,item1,item2,item3,item4,item5,item6) curr.close() conn.close() sender = "nisj@99hong.com" rcpt = ["nisj@99hong.com","chenhx@99hong.com","yanc@99hong.com","zhangmm@99hong.com"] msg = MIMEMultipart('alternatvie') msg['Subject'] = Header("源系统表结构变化信息日汇报","utf-8") #组装信头 msg['From'] = sender #使用国际化编码 msg['To'] = ', '.join(rcpt) html = "Hi All:<br>今日源系统表结构有变化,请根据具体情况进行数据仓库各层的相应变动及更改!<br><br><table border='1' style='background-color:#22B8DD'>"+html_data+"</table>" html_part = MIMEText(html,'html') #实例化为html部分 html_part.set_charset('utf-8') #设置编码 msg.attach(html_part) #绑定到message里 #构造附件 att_path = "/home/hs/opt/dw-etl/tabsrc_strurecord/db59src_%s.dat" % (yesterday) att_file_name = "attachment; filename=\"db59src_%s.dat\"" % (yesterday) att = MIMEText(open(att_path, 'rb').read(), 'base64', 'utf-8') att["Content-Type"] = 'application/octet-stream' att["Content-Disposition"] = att_file_name msg.attach(att) try: s = smtplib.SMTP('smtp.exmail.qq.com') #登录SMTP服务器,发信 s.login('nisj@99hong.com','mte$p%iiu2feizd%') s.sendmail(sender,rcpt,msg.as_string()) except Exception,e: print e else: print "%s,no src tab stru change!" % (yesterday)
4、定时每日进行数据的处理及邮件的发送
9 3 * * * sh /home/hs/opt/dw-etl/dw_batch.sh
cat /home/hs/opt/dw-etl/dw_batch.sh
#!/bin/bash export today=`date +%Y-%m-%d` export yesterday=`date -d last-day +%Y-%m-%d` export beforeytd=`date -d "2 days ago" +%Y-%m-%d` #echo -n "please enter a day for runing :" #read yesterday export ytd=${yesterday//-/} ...... #pull tab stru data from src and load in rds-sor echo `date +"%Y-%m-%d %H:%M:%S"` >>/home/hs/opt/dw-etl/dw_batch.log echo "$yesterday,[sh /home/hs/opt/dw-etl/tabsrc_strurecord/srctab_stru_pull.sh] exec start ... " >>/home/hs/opt/dw-etl/dw_batch.log sh /home/hs/opt/dw-etl/tabsrc_strurecord/srctab_stru_pull.sh echo `date +"%Y-%m-%d %H:%M:%S"` >>/home/hs/opt/dw-etl/dw_batch.log echo "$yesterday,[sh /home/hs/opt/dw-etl/tabsrc_strurecord/srctab_stru_pull.sh] exec finished !" >>/home/hs/opt/dw-etl/dw_batch.log
相关文章推荐
- CSP考试 2013年12月第1题 出现次数最多的数 C语言实现
- asp网页跳转
- 读MBA经历回顾(下)做法决定结果——北漂18年(49)
- 早上把书本和桌子放在一起,下午12:00就要出发
- 表单中邮箱自动完成的实现
- 107.01背包变式题型:传纸条
- 83. Remove Duplicates from Sorted List
- [Exploratory Data Analysis] Project 1
- OpenStack 架构 - 每天5分钟玩转 OpenStack(15)
- [Exploratory Data Analysis] Week 1
- OpenStack 架构 - 每天5分钟玩转 OpenStack(15)
- Flume在企业大数据仓库架构中位置及功能
- [译]时间自动机:语义,算法和工具 UPPAAL
- subsets
- permutations
- caffe中如何可视化cnn各层的输出
- 思科路由器的VXR的含义
- laravel眼瞎么?
- pmpi简单实例
- Android 正确的闪屏方式。