使用canal实现MySQL到GreenPlum的增量同步
mysql开启binlog,确保binlog格式为row,并且创建复制的账号
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1
CREATE USER canal IDENTIFIED BY '123';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
mysql表结构转换为gp的格式
使用Navicat Premium,点击工具,结构同步,左边选择mysql,右边选择Greenplum,同时去掉选项中的创建记录,就能在gp中创建表了。
导出mysql的数据进行初始化
查看mysql可以导出的目录
show variables like 'secure_file_priv';
执行导出
mysqldump -uroot -P3306 -p --add-locks=0 --skip-lock-tables --single-transaction --skip-tz-utc --quick --tab=/var/lib/mysql-files/ --fields-terminated-by='|' --master-data=2 dbname tblname
导出完成时会显示binlog位置信息,记录下来
bin.001172', MASTER_LOG_POS=33983444
如果表里含有多行字符,需要额外处理把换行去掉,不然导入到greenplum会报错,先导到开发服把换行符删除
UPDATE tbl SET content = REPLACE(REPLACE(content, CHAR(10), ''), CHAR(13), '');
启动gp文件服务
把导出的数据拷贝到其中一台GP服务器10.230.130.2
scp -r root@10.160.1.1:/var/lib/mysql-files /tmp
mv /tmp/mysql-files/* /home/gpadmin/external_files
在10.230.130.2这台机器上启动gpfdist文件服务
nohup gpfdist -d /home/gpadmin/external_files -p 8081 -l gpfdist.log &
导入数据到GP
使用develop账号登录到gp master(10.230.130.1)导入初始化数据,可以直接用navicat也可以用psql命令行。
--------------------------------------------------------------------------------------------------- 创建修改更新时间的触发器
CREATE OR REPLACE FUNCTION dw_update_at() RETURNS TRIGGER AS
$$
BEGIN
NEW.dw_updated_at = current_timestamp;
RETURN NEW;
END;
$$
language plpgsql;
--------------------------------------------------------------------------------------------------- ods_order
CREATE TABLE ods_order (
id int PRIMARY KEY,
order_no bigint,
uid int,
amount numeric(10),
status smallint,
create_time timestamp,
created_at timestamp,
updated_at timestamp
) DISTRIBUTED BY (id);
COMMENT ON COLUMN "public"."ods_order"."id" IS '订单id';
COMMENT ON COLUMN "public"."ods_order"."order_no" IS '订单编号';
COMMENT ON COLUMN "public"."ods_order"."uid" IS '用户id';
COMMENT ON COLUMN "public"."ods_order"."amount" IS '总金额';
COMMENT ON COLUMN "public"."ods_order"."status" IS '状态';
COMMENT ON COLUMN "public"."ods_order"."create_time" IS '生成时间';
COMMENT ON TABLE "public"."ods_order" IS '订单表';
-- 创建外部表
CREATE READABLE EXTERNAL TABLE ext_ods_order
(LIKE ods_order)
LOCATION ('gpfdist://10.230.130.2/order.txt')
FORMAT 'TEXT' (DELIMITER '|')
LOG ERRORS SEGMENT REJECT LIMIT 5;
-- 从外部表导入数据到本地表
INSERT INTO ods_order (select * from ext_ods_order);
-- 添加创建时间和更新时间字段
ALTER TABLE ods_order ADD COLUMN dw_created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE ods_order ADD COLUMN dw_updated_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP;
-- 创建时间和更新时间利用触发器做到自动更新
CREATE TRIGGER update_ods_order_updated_at BEFORE UPDATE ON ods_order FOR EACH ROW EXECUTE PROCEDURE dw_update_at();
--授予develop权限
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA TO develop;
安装canal
按照官网教程配置好数据库和表映射就可以增量更新了。
常用命令
-- 查看导入外部表的错误信息
SELECT * from gp_read_error_log('ext_ods_order');
-- 清空导入外部表的错误信息
SELECT gp_truncate_error_log('ext_ods_order');
查看kafka消息
/workspace/app/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic egatee
查看kafka消费状态
/workspace/app/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.01:9092 --describe --group ods
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
组 主题 分区 当前已消费的条数 总条数 未消费的条数