本文共 3977 字,大约阅读时间需要 13 分钟。
我们面临一个庞大的数据处理任务,涉及两张大型MySQL数据库,分别存储量级达1亿和4.5亿条记录。由于数据库规模过大,直接从MySQL读取数据会导致性能瓶颈严重。为此,我们决定通过Sqoop实现定时增量导入Hive的方式,将数据迁移至Hive存储,进而通过Spark SQL进行处理与分析。
# 删除之前的Jobsqoop job -delete torderincrementjob# 创建Sqoop Jobsqoop job --create torderincrementjob \--connect jdbc:mysql://172.16.*.*:3306/*?useCursorFetch=true \--username root \--password-file /input/sqoop/pwd/109mysql.pwd \--target-dir /mysqldb/t_order \--table t_order \--fields-terminated-by "\t" \--lines-terminated-by "\n" \--null-string '\\N' \--null-non-string '\\N' \--incremental append \--check-column id \--last-value 1281 \-m 1
在第一次执行Job时,Sqoop会生成以下日志信息:
Lower bound value: 1281Upper bound value: 100701508
随着每次Job的执行,Sqoop会记录新的上界值,例如:
Lower bound value: 100701508Upper bound value: 100703035
Lower bound value: 100703035Upper bound value: 100704475
创建Hive外部表,关联HDFS存储的数据:
CREATE external TABLE `t_order` ( `id` bigint, `serial` string, `product_id` int, `product_type` tinyint, `product_name` string, `quantity` double, `buyer_id` bigint, `payer_id` bigint, `price` double, `vip_price` double, `settle_price` double, `currency` string, `payer_level` tinyint, `status` tinyint, `pay_mode` tinyint, `payment_serial` string, `client_type` string, `app_type` tinyint, `seller_id` string, `partner_id` int, `reference` string, `channel_source` string, `note` string, `expiration_time` string, `operator` string, `create_time` string, `pay_time` string, `update_time` string)ROW FORMAT DELIMITEDFIELDS TERMINATED BY '\t'LINES TERMINATED BY '\n'LOCATION 'hdfs://golive-master:8020/mysqldb/t_order';
此时,可以通过Hive查询HDFS存储的数据:
select * from golivecms20.t_order limit 10;
#!/bin/shcurrent_time=$(date +%Y%m%d%H%M%S)echo $current_time > /data/bigdata/app/sqoopjob/timermysqltohdfs.logecho ............................ >> /data/bigdata/app/sqoopjob/timermysqltohdfs.log# t_order表同步/data/bigdata/sqoop-1.4.6-cdh5.7.0/bin/sqoop job -exec torderincrementjob# t_userlogout表同步/data/bigdata/sqoop-1.4.6-cdh5.7.0/bin/sqoop job -exec tuserlogoutincrementjob
00 1,7,13,19 * * * /bin/bash /data/bigdata/app/sqoopjob/timermysqltohdfs.sh >> /data/bigdata/app/sqoopjob/timermysqltohdfs.log 2>&1
创建Sqoop Job:
sqoop job --create tuserlogoutincrementjob \--connect jdbc:mysql://172.16.*.*:3306/*?useCursorFetch=true \--username root \--password-file /input/sqoop/pwd/68mysql.pwd \--target-dir /mysqldb/t_userlogout \--table t_userlogout \--fields-terminated-by "\t" \--lines-terminated-by "\n" \--null-string '\\N' \--null-non-string '\\N' \--incremental append \--check-column ID \--last-value 1 \-m 1
执行第一次Job:
sqoop job -exec tuserlogoutincrementjob
创建Hive外部表:
CREATE external TABLE `t_userlogout` ( `ID` bigint, `GoliveId` string, `InstalmentCode` string, `ManufacturerCode` string, `MacAddress` string, `AreaCode` string, `IpAddress` string, `LoginTime` string, `LogoutTime` string, `DeviceID` string, `VersionType` string, `Version` string, `Platform` string, `PartnerID` int, `BranchType` int, `LicenseProviderCode` string)ROW FORMAT DELIMITEDFIELDS TERMINATED BY '\t'LINES TERMINATED BY '\n'LOCATION 'hdfs://golive-master:8020/mysqldb/t_userlogout';
通过--query选项可以指定特定的SQL查询:
--query "select ID,GoliveId,InstalmentCode,ManufacturerCode,MacAddress,IpAddress,LoginTime,VersionType,Version,PartnerID from t_userlogout where $CONDITIONS"
可以通过--query选项指定特定的字段:
--query "select id,serial,product_id,product_type,product_name,buyer_id,price,vip_price,settle_price,status,pay_mode,client_type,seller_id,partner_id,channel_source,expiration_time,create_time,pay_time,update_time from t_order where $CONDITIONS"
本方案通过Sqoop实现定时增量导入Hive的方式,将大规模MySQL数据高效迁移至Hive存储,解决了直接读取大表带来的性能问题。通过Spark SQL与Hive的结合使用,可以对数据进行高效处理与分析。
转载地址:http://nqefk.baihongyu.com/