2.3全量表数据同步2.3.1数据通道全量表数据由DataX从MySQL业务数据库直接同步到HDFS具体数据流向如下图所示。2.3.2DataX配置文件我们需要为每张全量表编写一个DataX的json配置文件此处以 base_province 为例配置文件内容如下{ job: { content: [ { reader: { name: mysqlreader, parameter: { column: [ id, name, region_id, area_code, iso_code, iso_3166_2 ], connection: [ { jdbcUrl: [ jdbc:mysql://hadoop101:3306/edu ], table: [ base_province ] } ], password: 123456, splitPk: , username: root } }, writer: { name: hdfswriter, parameter: { column: [ { name: id, type: bigint }, { name: name, type: string }, { name: region_id, type: string }, { name: area_code, type: string }, { name: iso_code, type: string }, { name: iso_3166_2, type: string } ], compress: gzip, defaultFS: hdfs://hadoop101:8020, fieldDelimiter: \t, fileName: base_province, fileType: text, path: ${targetdir}, path: /base_province, writeMode: append } } } ], setting: { speed: { channel: 1 } } } } 启动Hadoop后targetdir要先创建好然后在执行 hadoop fs -mkdir /base_province vim job/base_provice.json bin/datax.py job/base_provice.json 查看数据zcat查看解压后的数据 hadoop fs -cat /base_province/base_province__b51a0a8c_916a_4131_a6a9_9481f851880a.gz | zcat注由于目标路径包含一层日期用于对不同天的数据加以区分故path参数并未写死需在提交任务时通过参数动态传入参数名称为targetdir。2.3.3DataX配置文件生成脚本方便起见此处提供了DataX配置文件批量生成脚本脚本内容及使用方式如下。1在~/bin目录下创建gen_import_config.py脚本[atguiguhadoop102 bin]$ vim ~/bin/gen_import_config.py脚本内容如下# codingutf-8 import json import getopt import os import sys import MySQLdb #MySQL相关配置需根据实际情况作出修改 mysql_host hadoop101 mysql_port 3306 mysql_user root mysql_passwd 123456 #HDFS NameNode相关配置需根据实际情况作出修改 hdfs_nn_host hadoop101 hdfs_nn_port 8020 #生成配置文件的目标路径可根据实际情况作出修改 output_path /opt/module/datax/job/import def get_connection(): return MySQLdb.connect(hostmysql_host, portint(mysql_port), usermysql_user, passwdmysql_passwd) def get_mysql_meta(database, table): connection get_connection() cursor connection.cursor() sql SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA%s AND TABLE_NAME%s ORDER BY ORDINAL_POSITION cursor.execute(sql, [database, table]) fetchall cursor.fetchall() cursor.close() connection.close() return fetchall def get_mysql_columns(database, table): return map(lambda x: x[0], get_mysql_meta(database, table)) def get_hive_columns(database, table): def type_mapping(mysql_type): mappings { bigint: bigint, int: bigint, smallint: bigint, tinyint: bigint, decimal: string, double: double, float: float, binary: string, char: string, varchar: string, datetime: string, time: string, timestamp: string, date: string, text: string } return mappings[mysql_type] meta get_mysql_meta(database, table) return map(lambda x: {name: x[0], type: type_mapping(x[1].lower())}, meta) def generate_json(source_database, source_table): job { job: { setting: { speed: { channel: 3 }, errorLimit: { record: 0, percentage: 0.02 } }, content: [{ reader: { name: mysqlreader, parameter: { username: mysql_user, password: mysql_passwd, column: get_mysql_columns(source_database, source_table), splitPk: , connection: [{ table: [source_table], jdbcUrl: [jdbc:mysql:// mysql_host : mysql_port / source_database] }] } }, writer: { name: hdfswriter, parameter: { defaultFS: hdfs:// hdfs_nn_host : hdfs_nn_port, fileType: text, path: ${targetdir}, fileName: source_table, column: get_hive_columns(source_database, source_table), writeMode: append, fieldDelimiter: \t, compress: gzip } } }] } } if not os.path.exists(output_path): os.makedirs(output_path) with open(os.path.join(output_path, ..join([source_database, source_table, json])), w) as f: json.dump(job, f) def main(args): source_database source_table options, arguments getopt.getopt(args, -d:-t:, [sourcedb, sourcetbl]) for opt_name, opt_value in options: if opt_name in (-d, --sourcedb): source_database opt_value if opt_name in (-t, --sourcetbl): source_table opt_value generate_json(source_database, source_table) if __name__ __main__: main(sys.argv[1:])注1安装Python Mysql驱动由于需要使用Python访问Mysql数据库故需安装驱动命令如下[atguiguhadoop102 bin]$ sudo yum install MySQL-python2脚本使用说明python gen_import_config.py -d database -t table通过-d传入数据库名-t传入表名执行上述命令即可生成该表的DataX同步配置文件。2在~/bin目录下创建gen_import_config.sh脚本[atguiguhadoop102 bin]$ vim ~/bin/gen_import_config.sh脚本内容如下#!/bin/bash python ~/bin/gen_import_config.py -d edu -t base_category_info python ~/bin/gen_import_config.py -d edu -t base_source python ~/bin/gen_import_config.py -d edu -t base_province python ~/bin/gen_import_config.py -d edu -t base_subject_info python ~/bin/gen_import_config.py -d edu -t cart_info python ~/bin/gen_import_config.py -d edu -t chapter_info python ~/bin/gen_import_config.py -d edu -t course_info python ~/bin/gen_import_config.py -d edu -t knowledge_point python ~/bin/gen_import_config.py -d edu -t test_paper python ~/bin/gen_import_config.py -d edu -t test_paper_question python ~/bin/gen_import_config.py -d edu -t test_point_question python ~/bin/gen_import_config.py -d edu -t test_question_info python ~/bin/gen_import_config.py -d edu -t user_chapter_process python ~/bin/gen_import_config.py -d edu -t test_question_option python ~/bin/gen_import_config.py -d edu -t video_info3为gen_import_config.sh脚本增加执行权限[atguiguhadoop102 bin]$ chmod x ~/bin/gen_import_config.sh4执行gen_import_config.sh脚本生成配置文件[atguiguhadoop102 bin]$ gen_import_config.sh5观察生成的配置文件[atguiguhadoop102 bin]$ ll /opt/module/datax/job/import/总用量 60-rw-rw-r--. 1 atguigu atguigu 845 3月 2 21:06 edu2077.base_category_info.json-rw-rw-r--. 1 atguigu atguigu 867 3月 2 21:06 edu2077.base_province.json-rw-rw-r--. 1 atguigu atguigu 717 3月 2 21:06 edu2077.base_source.json-rw-rw-r--. 1 atguigu atguigu 899 3月 2 21:06 edu2077.base_subject_info.json-rw-rw-r--. 1 atguigu atguigu 1133 3月 2 21:06 edu2077.cart_info.json-rw-rw-r--. 1 atguigu atguigu 1047 3月 2 21:06 edu2077.chapter_info.json-rw-rw-r--. 1 atguigu atguigu 1431 3月 2 21:06 edu2077.course_info.json-rw-rw-r--. 1 atguigu atguigu 1059 3月 2 21:06 edu2077.knowledge_point.json-rw-rw-r--. 1 atguigu atguigu 939 3月 2 21:06 edu2077.test_paper.json-rw-rw-r--. 1 atguigu atguigu 943 3月 2 21:06 edu2077.test_paper_question.json-rw-rw-r--. 1 atguigu atguigu 897 3月 2 21:06 edu2077.test_point_question.json-rw-rw-r--. 1 atguigu atguigu 1075 3月 2 21:06 edu2077.test_question_info.json-rw-rw-r--. 1 atguigu atguigu 957 3月 2 21:06 edu2077.test_question_option.json-rw-rw-r--. 1 atguigu atguigu 1007 3月 2 21:06 edu2077.user_chapter_process.json-rw-rw-r--. 1 atguigu atguigu 1341 3月 2 21:06 edu2077.video_info.json2.3.4测试生成的DataX配置文件以base_province为例测试用脚本生成的配置文件是否可用。1创建目标路径由于DataX同步任务要求目标路径提前存在故需手动创建路径当前base_province表的目标路径应为/origin_data/edu/db/base_province_full/2022-02-21。[atguiguhadoop102 bin]$ hadoop fs -mkdir -p /origin_data/edu/db/base_province_full/2022-02-212执行DataX同步命令[atguiguhadoop102 bin]$ python /opt/module/datax/bin/datax.py-p-Dtargetdir/origin_data/edu/db/base_province_full/2022-02-21/opt/module/datax/job/import/edu.base_province.json3观察同步结果观察HFDS目标路径是否出现数据。2.3.5全量表数据同步脚本为方便使用以及后续的任务调度此处编写一个全量表数据同步脚本。1在~/bin目录创建mysql_to_hdfs_full.sh[atguiguhadoop102 bin]$ vim ~/bin/mysql_to_hdfs_full.sh脚本内容如下#!/bin/bash DATAX_HOME/opt/module/datax DATAX_DATA/opt/module/datax/job #清理脏数据 handle_targetdir() { hadoop fs -rm -r $1 /dev/null 21 hadoop fs -mkdir -p $1 } #数据同步 import_data() { local datax_config$1 local target_dir$2 handle_targetdir $target_dir echo 正在处理$1 python $DATAX_HOME/bin/datax.py -p-Dtargetdir$target_dir $datax_config /tmp/datax_run.log 21 if [ $? -ne 0 ] then echo 处理失败, 日志如下: cat /tmp/datax_run.log fi rm /tmp/datax_run.log } #接收表名变量 tab$1 # 如果传入日期则do_date等于传入的日期否则等于前一天日期 if [ -n $2 ] ;then do_date$2 else do_date$(date -d -1 day %F) fi case ${tab} in base_category_info | base_province | base_source | base_subject_info | cart_info | chapter_info | course_info | knowledge_point | test_paper | test_paper_question | test_point_question | test_question_info | test_question_option | user_chapter_process | video_info) import_data $DATAX_DATA/import/edu.${tab}.json /origin_data/edu/db/${tab}_full/$do_date ;; all) for tmp in base_category_info base_province base_source base_subject_info cart_info chapter_info course_info knowledge_point test_paper test_paper_question test_point_question test_question_info test_question_option user_chapter_process video_info do import_data $DATAX_DATA/import/edu.${tmp}.json /origin_data/edu/db/${tmp}_full/$do_date done ;; esac2为mysql_to_hdfs_full.sh增加执行权限[atguiguhadoop102 bin]$ chmod x ~/bin/mysql_to_hdfs_full.sh3测试同步脚本[atguiguhadoop102 bin]$ mysql_to_hdfs_full.sh all 2022-02-214检查同步结果查看HDFS目表路径是否出现全量表数据全量表共15张。2.3.6全量表同步总结全量表同步逻辑比较简单只需每日执行全量表数据同步脚本mysql_to_hdfs_full.sh即可。2.4增量表数据同步2.4.1数据通道2.4.2 Maxwell配置默认情况下Maxwell会同步binlog中的所有表的数据变更记录按照规划有cart_info、order_info等共计11张表需进行增量同步按理我们应对Maxwell进行配置令其只同步这特定的11张表但为了与实时数仓架构保持一致此处不做相应配置而令 Maxwell 对 binlog 中所有表的数据变更记录进行同步并将数据全部发往 topic_db 主题。Maxwell最终配置如下1修改Maxwell配置文件config.properties[atguiguhadoop102 maxwell]$ vim /opt/module/maxwell/config.properties2全部配置参数如下log_levelinfo producerkafka kafka.bootstrap.servershadoop102:9092,hadoop103:9092,hadoop104:9092 #kafka topic配置业务数据发往的目标主题 kafka_topictopic_db # mysql login info hosthadoop102 usermaxwell passwordmaxwell jdbc_optionsuseSSLfalseserverTimezoneAsia/Shanghai # 教学环境添加的配置项使 Maxwell 输出数据中时间戳对应日期与业务数据的日期相同 mock_date2022-02-213重新启动Maxwell[atguiguhadoop102 bin]$ mxw.sh restart4通道测试1启动Zookeeper以及Kafka集群2启动一个Kafka Console Consumer消费 topic_db 主题的数据[atguiguhadoop103 kafka]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topictopic_db3生成模拟数据[atguiguhadoop102 bin]$ cd /opt/module/data_mocker/[atguiguhadoop102 data_mocker]$ java -jar edu2021-mock-2022-06-18.jar4观察Kafka消费者是否能消费到数据