通过DataX同步数据仓库数据

DataX介绍

DataX的Github介绍如下:

DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。

DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

问题

 数仓开发时需要从异构数据源获取数据作为ODS层(原始数据层),同时还需要在数仓间进行ETL(数据抽取、转换、加载)。通过DataX来实现数据同步。

实现

方案1 shell实现

1.写通过DataX将数据同步到ODS层的配置文件(reader、writter)

2.通过shell脚本传入传入target_dir(数据库表名,判断是否是需要全量同步的表,若是,则进行全量同步)和datax_config(即步骤1的配置文件)

3.将数仓间ETL的SQL脚本也封装成shell脚本

4.通过dolphinscheduler调度步骤2和步骤3的shell脚本实现数据同步到数仓ODS层并在数仓间ETL

缺点:系统复杂度高,不易维护

优点:更加灵活,在部分自定义函数功能支持不是很友好的框架(Hive)里也能很好地发挥作用

方案2 python实现

1.写通过DataX将数据同步到ODS层的配置文件(reader、writter)

2.将异构数据源配置信息写成文件,然后在python脚本中调用配置信息,拼接成完整的DataX同步命令。

3.将数仓间ETL的SQL脚本封装成自定义函数,再在步骤2的python脚本中连接数仓执行自定义函数

4.在服务器上通过contrab设置定时任务实现自动执行

缺点:Hive的自定义函数支持不是特别友好,部分功能可能没法实现。通用性差

优点:系统复杂度低,DataX和python脚本都不用一直在线,定时调度任务开始执行时才会启动python脚本,降低了服务器负担

补充:DataX的增量同步

数仓的增量同步一般都是用消息队列+数据库监听框架+数据同步框架(kafka+Maxwell+Flume)。 但是在离线数仓实时性要求不高的场景下也可以用DataX来实现增量同步。 实现原理:配置文件中reader部分支持column和querySQL,可以在querySQL中加入过滤条件来获取较新的数据。

比如如果需要获取最新的每日交易数据,就可以加一个时间为最新日期或者传入日期参数的过滤条件。

如果表中没有时间这种自然增长的字段也可以使用单调递增的id之类的。总之就是获取最新的一批数据就可以。

然后为了保证数据一致性,还需要在wrritter的preSQL中删除符合reader的querySQL的数据。

补充:crontab命令

参考资料:https://cloud.tencent.com/developer/article/2359335

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
(1)语  法:
crontab [-u <用户名称>][配置文件] 或 crontab { -l | -r | -e }
-u   #<用户名称> 是指设定指定<用户名称>的定时任务,这个前提是你必须要有其权限(比如说是 root)才能够指定他人的时程表。如果不使用 -u user 的话,就是表示设定自己的定时任务。
-l  #列出该用户的定时任务设置。
-r  #删除该用户的定时任务设置。
-e  #编辑该用户的定时任务设置。

(2)命令时间格式 :
*     *    *   *   *  command
分   时   日   月  周   命令
第1列表示分钟1~59 每分钟用*或者 */1表示
第2列表示小时1~23(0表示0点)
第3列表示日期1~31
第4列表示月份1~12
第5列标识号星期0~6(0表示星期天)
第6列要运行的命令

(3)一些Crontab定时任务例子:
30 21 * * * /usr/local/etc/rc.d/lighttpd restart  #每晚的21:30 重启apache
45 4 1,10,22 * * /usr/local/etc/rc.d/lighttpd restart  #每月1、10、22日的4 : 45重启apache
10 1 * * 6,0 /usr/local/etc/rc.d/lighttpd restart  #每周六、周日的1 : 10重启apache
0,30 18-23 * * * /usr/local/etc/rc.d/lighttpd restart  #每天18 : 00至23 : 00之间每隔30分钟重启apache
0 23 * * 6 /usr/local/etc/rc.d/lighttpd restart  #每星期六的11 : 00 pm重启apache
* 23-7/1 * * * /usr/local/etc/rc.d/lighttpd restart  #晚上11点到早上7点之间,每隔一小时重启apache
* */1 * * * /usr/local/etc/rc.d/lighttpd restart  #每一小时重启apache
0 11 4 * mon-wed /usr/local/etc/rc.d/lighttpd restart  #每月的4号与每周一到周三的11点重启apache
0 4 1 jan * /usr/local/etc/rc.d/lighttpd restart  #一月一号的4点重启apache

*/30 * * * * /usr/sbin/ntpdate cn.pool.ntp.org  #每半小时同步一下时间
0 */2 * * * /sbin/service httpd restart  #每两个小时重启一次apache 
50 7 * * * /sbin/service sshd start  #每天7:50开启ssh服务 
50 22 * * * /sbin/service sshd stop  #每天22:50关闭ssh服务 
0 0 1,15 * * fsck /home  #每月1号和15号检查/home 磁盘 
1 * * * * /home/bruce/backup  #每小时的第一分执行 /home/bruce/backup这个文件 
00 03 * * 1-5 find /home "*.xxx" -mtime +4 -exec rm {} \;  #每周一至周五3点钟,在目录/home中,查找文件名为*.xxx的文件,并删除4天前的文件。
30 6 */10 * * ls  #每月的1、11、21、31日是的6:30执行一次ls命令
页面浏览量Loading
网站总访客数:Loading
网站总访问量:Loading
使用 Hugo 构建
主题 StackJimmy 设计