文章目录
- 一. MySQL的准备
- 1.1 binlog格式
- 1.2 创建库表
- 1.3 赋权限
- 二. Canal安装及配置
- 2.1 Canal下载及安装
- https://github.com/alibaba/canal/releases
- 2.2 修改 canal.properties 的配置
- 2.3 修改 instance.properties
- 2.4 启动 Canal
- 2.5 看到 CanalLauncher 你表示启动成功 , 同时会创建 canal_test 主题
- 2.6 启动 Kafka 消费客户端测试 , 查看消费情况
- 2.7 向 MySQL 中插入数据后查看消费者控制台
- 参考:
一. MySQL的准备 MySQL版本 5.7.31
1.1 binlog格式 vim /etc/my.cnf
server-id=1log-bin=mysql-bin binlog_format=row binlog-do-db=canal_test 重启mysql服务测试记录:
mysql> show variables like '%log_bin%';+---------------------------------+-------------------------------------+| Variable_name| Value|+---------------------------------+-------------------------------------+| log_bin| ON|| log_bin_basename| /home/mysql/data/3306/hp8-bin|| log_bin_index| /home/mysql/data/3306/hp8-bin.index || log_bin_trust_function_creators | OFF|| log_bin_use_v1_row_events| OFF|| sql_log_bin| ON|+---------------------------------+-------------------------------------+6 rows in set (0.01 sec)mysql> show variables like '%binlog_format%';+---------------+-------+| Variable_name | Value |+---------------+-------+| binlog_format | ROW|+---------------+-------+1 row in set (0.00 sec)mysql> 1.2 创建库表 代码:create database canal_test default character set utf8;show create database canal_test\Guse canal_test;CREATE TABLE user_info(`id` VARCHAR(255),`name` VARCHAR(255),`sex` VARCHAR(255)); 测试记录:mysql> create database canal_test default character set utf8;Query OK, 1 row affected (0.00 sec)mysql> show create database canal_test\G*************************** 1. row ***************************Database: canal_testCreate Database: CREATE DATABASE `canal_test` /*!40100 DEFAULT CHARACTER SET utf8 */1 row in set (0.00 sec)mysql> mysql> use canal_test;Database changedmysql> CREATE TABLE user_info(-> `id` VARCHAR(255),-> `name` VARCHAR(255),-> `sex` VARCHAR(255)-> );Query OK, 0 rows affected (0.01 sec)mysql> show create table user_info\G*************************** 1. row ***************************Table: user_infoCreate Table: CREATE TABLE `user_info` (`id` varchar(255) DEFAULT NULL,`name` varchar(255) DEFAULT NULL,`sex` varchar(255) DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf81 row in set (0.00 sec)mysql> 1.3 赋权限 set global validate_password_length=4; set global validate_password_policy=0;GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO'canal'@'%' IDENTIFIED BY 'canal' ; 二. Canal安装及配置 2.1 Canal下载及安装 下载地址:https://github.com/alibaba/canal/releases
cd /usr/local/srcgunzip canal.deployer-1.1.2.tar.gztar -xvf canal.deployer-1.1.2.tar 2.2 修改 canal.properties 的配置 cd /usr/local/src/confvim canal.propertiescanal.id = 1 canal.ip = canal.port = 11111canal.metrics.pull.port = 11112 canal.zkServers =# flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false# tcp, kafka, RocketMQ canal.serverMode = kafka# flush meta cursor/parse position to filecanal.mq.servers = hp2:9092,hp3:9092,hp4:9092 说明:这个文件是 canal 的基本通用配置 , canal 端口号默认就是 11111 , 修改 canal 的 输出 model , 默认 tcp , 改为输出到 kafka多实例配置如果创建多个实例 , 通过前面 canal 架构 , 我们可以知道 , 一个 canal 服务 中可以有多个 instance , conf/下的每一个 example 即是一个实例 , 每个实例下面都有独立的 配置文件 。默认只有一个实例 example , 如果需要多个实例处理不同的 MySQL 数据的话 , 直 接拷贝出多个 example , 并对其重新命名 , 命名和配置文件中指定的名称一致 , 然后修改 canal.properties 中的 canal.destinations=实例 1 , 实例 2 , 实例 3 。
2.3 修改 instance.properties 我们这里只读取一个 MySQL 数据 , 所以只有一个实例 , 这个实例的配置文件在conf/example 目录下
cd /usr/local/src/conf/examplevim instance.propertiescanal.instance.master.address=127.0.0.1:3306canal.instance.dbUsername=canalcanal.instance.dbPassword=canalcanal.instance.connectionCharset = UTF-8canal.instance.defaultDatabaseName =canal_test# mq config canal.mq.topic=canal_test canal.mq.partitionsNum=2canal.mq.partition=2# hash partition config#canal.mq.partitionHash=mytest.person:id,mytest.role:id 备注:canal.mq.partitionsNum 不能操作kafka主题的分区数 , 不然会报错
ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - Invalid partition given with record: 3 is not in the range [0…1).
kafka默认的分区数是1 , 此时需要修改
kafka-topics.sh --zookeeperlocalhost:2181 --alter --partitions 3 --topiccanal_test 2.4 启动 Canal cd /usr/local/src/bin/startup.sh2.5 看到 CanalLauncher 你表示启动成功 , 同时会创建 canal_test 主题
[root@hp8 ~]# jps 11498 BrokerBootstrap15662 Jps[root@hp8 ~]# 2.6 启动 Kafka 消费客户端测试 , 查看消费情况 cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin./kafka-console-consumer.sh --bootstrap-server hp2:9092 --topic canal_test 2.7 向 MySQL 中插入数据后查看消费者控制台 MySQLINSERT INTO user_info VALUES('1001','zhangsan','male');INSERT INTO user_info VALUES('1002','lishi','male'); Kafka{"data":[{"id":"1002","name":"lishi","sex":"male"}],"database":"canal_test","es":1646041639000,"id":1,"isDdl":false,"mysqlType":{"id":"varchar(255)","name":"varchar(255)","sex":"varchar(255)"},"old":null,"sql":"","sqlType":{"id":12,"name":12,"sex":12},"table":"user_info","ts":1646042284004,"type":"INSERT"} 【Canal系列2-Canal同步到Kafka】参考:
- https://blog.csdn.net/libizhide/article/details/109555562
- https://www.dazhuanlan.com/wendyemir/topics/1422034
- 春季老年人吃什么养肝?土豆、米饭换着吃
- 三八妇女节节日祝福分享 三八妇女节节日语录
- 老人谨慎!选好你的“第三只脚”
- 校方进行了深刻的反思 青岛一大学生坠亡校方整改校规
- 脸皮厚的人长寿!有这特征的老人最长寿
- 长寿秘诀:记住这10大妙招 100%增寿
- 春季老年人心血管病高发 3条保命要诀
- 眼睛花不花要看四十八 老年人怎样延缓老花眼
- 香槟然能防治老年痴呆症? 一天三杯它人到90不痴呆
- 老人手抖的原因 为什么老人手会抖
