您的位置:首页 > 其它

源系统表结构比对跟踪并进行邮件发送

2016-04-01 07:05 288 查看
在这里源系统均为mysql数据库,并将每日的表结构信息拉取到目标mysql数据库中;在目标mysql库中通过存储过程代码查询出两日内表结构之间的差异,并通过python脚本使用邮件将相关信息发送给相关人员。

1、数据拉取、装载、mysqlsp运算及邮件发送调度的shell脚本

路径:/home/hs/opt/dw-etl/tabsrc_strurecord/srctab_stru_pull.sh

srctab_stru_pull.sh

#!/bin/bash
export yesterday=`date -d last-day +%Y-%m-%d`

rm -rf /home/hs/opt/dw-etl/tabsrc_strurecord/db99src_$yesterday.dat
#db99huanx src tab stru data pull ...
/usr/local/bin/mysql -hsrcipaddress.mysql.rds.aliyuncs.com -udbreader -piloveyou -N -e"select
CONCAT(
'rdsb0o1vpjxpecbe9uq0.mysql.rds.aliyuncs.com' ,'|',
ifnull(replace(replace(replace(a1.TABLE_SCHEMA,char(13),''),char(10),''),'|',''),'') ,'|',
ifnull(replace(replace(replace(a1.TABLE_NAME,char(13),''),char(10),''),'|',''),'') ,'|',
ifnull(replace(replace(replace(a1.TABLE_COMMENT,char(13),''),char(10),''),'|',''),'') ,'|',
ifnull(a1.TABLE_TYPE,'') ,'|',
ifnull(a1.\`ENGINE\`,'') ,'|',
ifnull(a1.CREATE_TIME,'') ,'|',
ifnull(a1.UPDATE_TIME,'') ,'|',
ifnull(a1.TABLE_COLLATION,'') ,'|',
ifnull(replace(replace(replace(a2.COLUMN_NAME,char(13),''),char(10),''),'|',''),'') ,'|',
ifnull(replace(replace(replace(a2.COLUMN_COMMENT,char(13),''),char(10),''),'|',''),'') ,'|',
ifnull(a2.ORDINAL_POSITION,'') ,'|',
ifnull(a2.COLUMN_TYPE,'') ,'|',
${yesterday//-/} ,'|',
NOW()) src_tab_stru_info
from information_schema.\`TABLES\` a1
left join information_schema.\`COLUMNS\` a2 on a1.TABLE_SCHEMA=a2.TABLE_SCHEMA and a1.TABLE_NAME=a2.TABLE_NAME
where a1.TABLE_SCHEMA in('db99huanx','db99yinping');
" >> /home/hs/opt/dw-etl/tabsrc_strurecord/db99src_$yesterday.dat

#db99finance src tab stru data pull ...
......(同上一段)

/usr/local/mysql/bin/mysql -hipaddress.mysql.rds.aliyuncs.com -udatauser -piloveyou -e "use sor;delete from src_tab_stru_info where data_date=${yesterday//-/};load data local infile '/home/hs/opt/dw-etl/tabsrc_strurecord/db99src_$yesterday.dat' into table src_tab_stru_info fields terminated by '|' enclosed by '' lines terminated by '\n' ignore 0 lines;"

/usr/local/mysql/bin/mysql -hipaddress.mysql.rds.aliyuncs.com -udatauser -piloveyou -e "use dm;call p_srctab_change_info(99);"

python /home/hs/opt/dw-etl/tabsrc_strurecord/srctab_change_sendmail.py


2、目标mysql上的存储过程及相关表结构

2.1、src_tab_stru_info(sor库)


CREATE TABLE `src_tab_stru_info` (
`IP_ADDRESS` varchar(43) NOT NULL DEFAULT '',
`TABLE_SCHEMA` varchar(64) NOT NULL DEFAULT '',
`TABLE_NAME` varchar(64) NOT NULL DEFAULT '',
`TABLE_COMMENT` text NOT NULL,
`TABLE_TYPE` varchar(64) NOT NULL DEFAULT '',
`DB_ENGINE` varchar(64) NOT NULL DEFAULT '',
`CREATE_TIME` varchar(19) NOT NULL DEFAULT '',
`UPDATE_TIME` varchar(19) NOT NULL DEFAULT '',
`TABLE_COLLATION` varchar(32) NOT NULL DEFAULT '',
`COLUMN_NAME` varchar(64) NOT NULL DEFAULT '',
`COLUMN_COMMENT` text NOT NULL,
`COLUMN_ORDINAL_POSITION` varchar(21) NOT NULL DEFAULT '',
`COLUMN_TYPE` longtext NOT NULL,
`DATA_DATE` int(8) NOT NULL DEFAULT '0',
`ETL_TIME` datetime NOT NULL DEFAULT '0000-00-00 00:00:00',
KEY `idx_src_tab` (`IP_ADDRESS`,`TABLE_SCHEMA`,`TABLE_NAME`,`COLUMN_NAME`,`DATA_DATE`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT=' 源系统表结构信息';


2.2、srctab_change_info(dm库)

CREATE TABLE `srctab_change_info` (
`CHANGE_DATE` date NOT NULL DEFAULT '0000-00-00',
`ADD_TYPE` varchar(80) NOT NULL DEFAULT '',
`IP_ADDRESS` varchar(100) NOT NULL DEFAULT '',
`TABLE_SCHEMA` varchar(100) NOT NULL DEFAULT '',
`TABLE_NAME` varchar(100) NOT NULL DEFAULT '',
`COLUMN_NAME` varchar(100) NOT NULL DEFAULT '',
`COLUMN_TYPE` longtext NOT NULL,
`ETL_TIME` datetime NOT NULL DEFAULT '0000-00-00 00:00:00'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


2.3、p_srctab_change_info(dm库)

CREATE DEFINER=`datauser`@`%` PROCEDURE `p_srctab_change_info`(IN `p_etl_date` datetime)
BEGIN

#declare variable and set
#set @v_etldate=CURDATE();
set @v_etldate=date_sub(CURDATE(),interval 1 day);
#set @v_etldate=p_etl_date;

#delete data from srctab_change_info
delete from srctab_change_info where CHANGE_DATE=@v_etldate;

#insert data to srctab_change_info
insert into srctab_change_info
select @v_etldate CHANGE_DATE,'column' ADD_TYPE,a1.IP_ADDRESS,a1.TABLE_SCHEMA,a1.TABLE_NAME,a1.COLUMN_NAME,a1.COLUMN_TYPE,now() ETL_TIME
from (select * from sor.src_tab_stru_info where  DATA_DATE=date_sub(@v_etldate,interval 0 day)) a1
LEFT JOIN (select * from sor.src_tab_stru_info where  DATA_DATE=date_sub(@v_etldate,interval 1 day)) a2 on a1.IP_ADDRESS=a2.IP_ADDRESS and a1.TABLE_SCHEMA=a2.TABLE_SCHEMA and a1.TABLE_NAME=a2.TABLE_NAME and a1.COLUMN_NAME=a2.COLUMN_NAME
where a2.COLUMN_NAME is null
union all
select @v_etldate CHANGE_DATE,'table' ADD_TYPE,a1.IP_ADDRESS,a1.TABLE_SCHEMA,a1.TABLE_NAME,null COLUMN_NAME,null COLUMN_TYPE,now() ETL_TIME
from (select distinct IP_ADDRESS,TABLE_SCHEMA,TABLE_NAME from sor.src_tab_stru_info where  DATA_DATE=date_sub(@v_etldate,interval 0 day)) a1
LEFT JOIN (select distinct IP_ADDRESS,TABLE_SCHEMA,TABLE_NAME from sor.src_tab_stru_info where  DATA_DATE=date_sub(@v_etldate,interval 1 day)) a2 on a1.IP_ADDRESS=a2.IP_ADDRESS and a1.TABLE_SCHEMA=a2.TABLE_SCHEMA and a1.TABLE_NAME=a2.TABLE_NAME
where a2.TABLE_NAME is null
union all
select @v_etldate CHANGE_DATE,'nochange' ADD_TYPE,null IP_ADDRESS,null TABLE_SCHEMA,null TABLE_NAME,null COLUMN_NAME,null COLUMN_TYPE,now() ETL_TIME
;

END


3、获取信息发送邮件的python脚本

路径:/home/hs/opt/dw-etl/tabsrc_strurecord/srctab_change_sendmail.py

srctab_change_sendmail.py

# -*- coding=utf-8 -*-
import smtplib
import MySQLdb
import json
import warnings
import datetime
from email.header import Header
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

warnings.filterwarnings("ignore")

db_config = {
'host': 'rdsipaddress.mysql.rds.aliyuncs.com',
'user': 'datauser',
'passwd': 'iloveyou',
'port': 3306,
'db': 'dm'
}

def getDB():
try:
conn = MySQLdb.connect(host=db_config['host'],user=db_config['user'],passwd=db_config['passwd'],port=db_config['port'])
conn.autocommit(True)
curr = conn.cursor()
curr.execute("SET NAMES utf8");
curr.execute("USE %s" % db_config['db']);

return conn, curr
except MySQLdb.Error,e:
print "Mysql Error %d: %s" % (e.args[0], e.args[1])
return None, None

conn,curr = getDB()

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

print today
print yesterday

sql_text = "select count(*) from srctab_change_info where add_type in('column','table') and CHANGE_DATE='%s';" % (yesterday)
curr.execute(sql_text)
chg = curr.fetchall()
for chg_num in chg:
chg_num0 = chg_num[0]
print chg_num0

if (chg_num0 >= 1):
sql_text = "select CHANGE_DATE,ADD_TYPE,IP_ADDRESS,TABLE_SCHEMA,TABLE_NAME,COLUMN_NAME,COLUMN_TYPE from srctab_change_info where add_type in('column','table') and CHANGE_DATE='%s';" % (yesterday)
curr.execute(sql_text)
html_data = "<tr style='font-weight:bold;'><td>变动日期</td><td>变动类型</td><td>IP地址</td><td>模式</td><td>表名</td><td>列名</td><td>列类型</td></tr>"
items = curr.fetchall()
for item in items:
item0 = str(item[0])
item1 = str(item[1])
item2 = item[2]
item3 = item[3]
item4 = str(item[4])
item5 = str(item[5])
item6 = str(item[6])

html_data += "<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s</td></tr>" % (item0,item1,item2,item3,item4,item5,item6)

curr.close()
conn.close()

sender = "nisj@99hong.com"
rcpt = ["nisj@99hong.com","chenhx@99hong.com","yanc@99hong.com","zhangmm@99hong.com"]

msg = MIMEMultipart('alternatvie')
msg['Subject'] = Header("源系统表结构变化信息日汇报","utf-8") #组装信头
msg['From'] = sender #使用国际化编码
msg['To'] = ', '.join(rcpt)

html = "Hi All:<br>今日源系统表结构有变化,请根据具体情况进行数据仓库各层的相应变动及更改!<br><br><table border='1' style='background-color:#22B8DD'>"+html_data+"</table>"
html_part = MIMEText(html,'html') #实例化为html部分
html_part.set_charset('utf-8') #设置编码
msg.attach(html_part) #绑定到message里

#构造附件
att_path = "/home/hs/opt/dw-etl/tabsrc_strurecord/db59src_%s.dat" % (yesterday)
att_file_name = "attachment; filename=\"db59src_%s.dat\"" % (yesterday)
att = MIMEText(open(att_path, 'rb').read(), 'base64', 'utf-8')
att["Content-Type"] = 'application/octet-stream'
att["Content-Disposition"] = att_file_name
msg.attach(att)

try:
s = smtplib.SMTP('smtp.exmail.qq.com') #登录SMTP服务器,发信
s.login('nisj@99hong.com','mte$p%iiu2feizd%')
s.sendmail(sender,rcpt,msg.as_string())
except Exception,e:
print e
else:
print "%s,no src tab stru change!" % (yesterday)


4、定时每日进行数据的处理及邮件的发送

9 3 * * * sh /home/hs/opt/dw-etl/dw_batch.sh

cat /home/hs/opt/dw-etl/dw_batch.sh
#!/bin/bash
export today=`date +%Y-%m-%d`
export yesterday=`date -d last-day +%Y-%m-%d`
export beforeytd=`date -d "2 days ago" +%Y-%m-%d`
#echo -n "please enter a day for runing :"
#read yesterday

export ytd=${yesterday//-/}

......

#pull tab stru data from src and load in rds-sor
echo `date +"%Y-%m-%d %H:%M:%S"` >>/home/hs/opt/dw-etl/dw_batch.log
echo "$yesterday,[sh /home/hs/opt/dw-etl/tabsrc_strurecord/srctab_stru_pull.sh] exec start ... " >>/home/hs/opt/dw-etl/dw_batch.log
sh /home/hs/opt/dw-etl/tabsrc_strurecord/srctab_stru_pull.sh
echo `date +"%Y-%m-%d %H:%M:%S"` >>/home/hs/opt/dw-etl/dw_batch.log
echo "$yesterday,[sh /home/hs/opt/dw-etl/tabsrc_strurecord/srctab_stru_pull.sh] exec finished !" >>/home/hs/opt/dw-etl/dw_batch.log
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: