部署canal server 1.1.5,消费mysql信息,订阅测试
一、Canal Server 的核心架构
Canal Server 是阿里巴巴开源的 MySQL binlog 增量订阅与消费组件,其架构设计围绕 高可用、高性能、低延迟 三大目标构建,主要包含以下核心组件:
Canal Server(主服务)
- 负责管理多个 Canal Instance,每个 Instance 对应一个 MySQL 数据库实例的 binlog 解析任务。
- 支持 集群化部署,依赖 Zookeeper 进行状态管理,确保高可用性(HA)。
- 采用 Netty 实现高性能网络通信,支持 TCP/HTTP 协议与下游系统交互。
Canal Instance(数据队列)
- 每个 Instance 独立解析一个 MySQL 的 binlog 流,包含以下子模块:
- EventParser:模拟 MySQL Slave 协议,从 Master 拉取并解析 binlog。
- EventSink:过滤、加工数据(如按表名/字段过滤),并分发给下游。
- EventStore:暂存未消费的变更事件(默认基于内存 RingBuffer,支持持久化到 ZK/File)。
- MetaManager:管理消费位点(binlog position),确保故障恢复后数据不丢失。
高可用设计
- Zookeeper 协调:管理 Server 实例状态,故障时自动切换(临时节点 + 选举机制)。
- 位点持久化:消费进度保存到 ZK,宕机后可从断点继续同步。
Canal 的核心价值
Canal 在数据同步领域的关键价值体现在以下场景:
数据库实时同步
支持 MySQL → MySQL/Oracle/其他数据库 的异地多活、读写分离架构。
比传统 ETL 工具(如 DataX)更轻量,延迟可控制在毫秒级。
大数据与实时计算
将 binlog 变更推送到 Kafka/RocketMQ,供 Flink/Spark 实时处理。
用于 数据仓库(Hive/HBase)实时更新,替代批量拉取模式。
缓存与搜索优化
缓存预热:数据库变更后,自动更新 Redis/Elasticsearch,避免缓存穿透。
价格/库存实时同步:电商场景中保障数据一致性。
监控与告警
基于数据变更触发业务规则(如风控系统实时检测异常交易)。
技术优势
- 低侵入性:仅依赖 MySQL binlog,不影响主库性能。
- 灵活性:支持过滤特定表/字段,减少无效数据传输。
- 扩展性:可通过增加 Canal Server 实例横向扩展吞吐量。
二、这次测试采用的方案
通过云服务器搭建canal集群,在canal 集群上部署ZK集群,在源端建设mysql 数据库,把数据库同步到canal集群,在目标端通过java 客户端进行消费。
三、整体的效果如下:
四、部署的过程
1、部署基础ZK环境:
这里:10.0.0.8、10.0.0.12、10.0.0.16 分别是3个节点的内网IP
一、选择操作系统 7.9
二、准备好前提条件
#2.1 更新系统
sudo yum update -y
# 2.2 安装 OpenJDK 8
sudo yum install -y java-1.8.0-openjdk-devel
三、安装zookeeper 集群
部署 Zookeeper 集群(3节点)
步骤:
1. 下载 Zookeeper 3.5.9 :
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz
2.
tar -zxvf apache-zookeeper-3.5.9-bin.tar.gz -C /opt/
3.
mv /opt/apache-zookeeper-3.5.9-bin /opt/zookeeper
4. 创建数据与日志目录 :
mkdir -p /opt/zookeeper/data
mkdir -p /opt/zookeeper/logs
5. 配置 zoo.cfg (所有节点):
# 创建文件并写入内容
cat > /opt/zookeeper/conf/zoo1.cfg << 'EOF'
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/opt/zookeeper/data
dataLogDir=/opt/zookeeper/logs
# the port at which the clients will connect
clientPort=2181
clientPortAddress=0.0.0.0
# the maximum number of client connections.
#maxClientCnxns=60
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
#autopurge.purgeInterval=1
server.1=10.0.0.8:2888:3888
server.2=10.0.0.12:2888:3888
server.3=10.0.0.16:2888:3888
EOF
6. 创建 myid 文件 (每个节点唯一):
# 节点1:
echo "1" > /opt/zookeeper/data/myid
# 节点2:
echo "2" > /opt/zookeeper/data/myid
# 节点3:
echo "3" > /opt/zookeeper/data/myid
7. 启动 Zookeeper (所有节点):
/opt/zookeeper/bin/zkServer.sh start
/opt/zookeeper/bin/zkServer.sh status # 检查状态(Leader/Follower)
安装之后的效果图如下,表示3个集群的ZK节点已经部署好了, 10.0.0.12 是leader、10.0.0.8 和 10.0.0.16 是follower。
2、部署canal-admin 节点
在1台服务器上部署canal-admin 节点,此节点IP 是 10.0.0.16
另外一个前提条件,预装了一个mysql 数据库(内网IP 地址10.0.0.2),先配置了用户名“canal”,并为canal 配置了admin的权限,密码为 tencent@123456.
1、部署Canal Admin(节点1)
2.1 下载与解压
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz
mkdir -p /opt/canal-admin && tar -zxvf canal.admin-1.1.5.tar.gz -C /opt/canal-admin
yum install mysql -y
mysql -h10.0.0.2 -ucanal -ptencent@123456
2.2 初始化元数据库
在MySQL中执行初始化脚本:
CREATE DATABASE canal_manager;
USE canal_manager;
SOURCE /opt/canal-admin/conf/canal_manager.sql;
2.3 修改配置
# 覆盖写入application.yml
cat > /opt/canal-admin/conf/application.yml << 'EOF'
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 10.0.0.2:3306
database: canal_manager
username: canal
password: tencent@123456
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
EOF
2.4 启动
/opt/canal-admin/bin/startup.sh
访问 http://节点1:8089(默认账号:admin/123456).
配置好后可以正常登陆到canal-admin 节点了。
3、登录 Canal Admin WebUI
登陆 http://节点1:8089(默认账号:admin/123456)
3.1、进入【集群管理】> 配置集群名称(如 canal)
3.2、修改集群配置,主配置:
3.3、修改canal.properties的参数
核对 canal_local.properties:
canal.admin.manager = 10.0.0.12:8089 # Canal Admin地址
canal.admin.user = admin # 用户名需与Canal Admin一致
canal.admin.passwd = [正确密码] # 确保密码与Canal Admin的`adminUser`加密值匹配
canal.zkServers=10.0.0.12:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
3.4、 在admin server 重启服务
cd /opt/canal-server/
./bin/stop.sh && ./bin/startup.sh local
4、配置canal server信息
节点1 :
4.部署Canal Server(3节点)
4.1 下载与解压
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
mkdir -p /opt/canal-server && tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/canal-server
4.2 配置集群
# 覆盖写入canal_local.properties(所有节点):
cat > /opt/canal-server/conf/canal_local.properties << 'EOF'
#register ip
canal.register.ip = 10.0.0.8
# canal admin config
canal.admin.manager = 10.0.0.12:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = canal
canal.admin.register.name = canal_server_03
# zookeeper config
canal.zkServers = 10.0.0.12:2181
# instance mode (cluster)
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
canal.id=3 # 节点1设为1,节点2设为2,以此类推
EOF
4.3 启动Server
/opt/canal-server/bin/startup.sh local # 关键参数`local`启用集群模式
节点2 :
4.部署Canal Server(3节点)
4.1 下载与解压
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
mkdir -p /opt/canal-server && tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/canal-server
4.2 配置集群
# 覆盖写入canal_local.properties(所有节点):
cat > /opt/canal-server/conf/canal_local.properties << 'EOF'
#register ip
canal.register.ip = 10.0.0.12
# canal admin config
canal.admin.manager = 10.0.0.12:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = canal
canal.admin.register.name = canal_server_02
# zookeeper config
canal.zkServers = 10.0.0.12:2181
# instance mode (cluster)
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
canal.id=2 # 节点1设为1,节点2设为2,以此类推
EOF
4.3 启动Server
/opt/canal-server/bin/startup.sh local # 关键参数`local`启用集群模式
节点3 :
4.部署Canal Server(3节点)
4.1 下载与解压
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
mkdir -p /opt/canal-server && tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/canal-server
4.2 配置集群
# 覆盖写入canal_local.properties(所有节点):
cat > /opt/canal-server/conf/canal_local.properties << 'EOF'
#register ip
canal.register.ip = 10.0.0.16
# canal admin config
canal.admin.manager = 10.0.0.12:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = canal
canal.admin.register.name = canal_server_01
# zookeeper config
canal.zkServers = 10.0.0.12:2181
# instance mode (cluster)
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
canal.id=1 # 节点1设为1,节点2设为2,以此类推
EOF
4.3 启动Server
/opt/canal-server/bin/startup.sh local # 关键参数`local`启用集群模式
配置好后可以自动发现,效果如下图:
添加Server:自动注册后,在“Server管理”中确认3个节点状态为Active/Standby.
在canal admin 界面,新建Instance:
名称example,绑定集群canal.
绑定后效果如下:
5、配置canal,订阅mysql 的数据
1.开启binlog并设为ROW模式
修改MySQL配置文件(如my.cnf):
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1 # 需唯一,不与Canal的slaveId重复
重启MySQL后验证:
SHOW VARIABLES LIKE 'binlog_format%';
2.创建Canal专用账户
授权Canal作为MySQL Slave的权限:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'canal'@'%';
FLUSH PRIVILEGES;
Canal Server配置
3. 修改canal.properties
设置运行模式(如TCP或MQ)和Zookeeper地址(集群模式需配置):
canal.serverMode = tcp # 或kafka/rocketMQ
canal.zkServers = 127.0.0.1:2181 # 集群模式需配置
4. 配置实例文件instance.properties (/opt/canal-server/conf/example 这里example 为之前在admin创建的实例名)
指定MySQL连接信息和过滤规则:
canal.instance.mysql.slaveId=123456 # 需唯一
canal.instance.master.address=10.0.0.2:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=tencent@123456
canal.instance.filter.regex=.\\.. # 监听所有库表
5、启动与验证
启动Canal Server
sh bin/startup.sh local
6、检查日志确认无报错:
tail -f logs/example/example.log
7. 客户端消费数据
使用Java代码连接Canal Server(TCP模式示例):
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
connector.connect();
connector.subscribe(".\\..");
while (true) {
Message message = connector.getWithoutAck(100);
// 处理message.getEntries()
connector.ack(message.getId());
8. zk 里面创建一个持久节点(这条命令是在ZooKeeper中创建了一个持久节点,用于存储Canal客户端的消费位点信息。)
8.1
/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181
8.2 命令行操作
[zk: 127.0.0.1:2181(CONNECTED) 5] create /otter/canal/destinations/example/1001/cursor '{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","postion":{"journalName":"mysql-bin.000001","position":120}}'
Created /otter/canal/destinations/example/1001/cursor
效果如下:
[zk: 127.0.0.1:2181(CONNECTED) 5] create /otter/canal/destinations/example/1001/cursor '{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","postion":{"journalName":"mysql-bin.000001","position":120}}'
Created /otter/canal/destinations/example/1001/cursor
6、 在源端插入一条数据,观察效果如下
这里贴下java 的代码
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.google.protobuf.InvalidProtocolBufferException;
public class CanalClientDemo {
public static void main(String[] args) {
// 创建连接器,指定clientId=100
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("159.75.*.*", 11111), // Canal服务端地址
"example", // 实例名(与Canal服务端配置一致)
"", // 用户名(空表示无认证)
"" // 密码
// 指定clientId
);
try {
connector.connect();
connector.subscribe(".*\\..*"); // 订阅所有表
while (true) {
Message message = connector.getWithoutAck(1000); // 批量获取数据
long batchId = message.getId();
if (batchId != -1) {
// 处理数据逻辑
System.out.println(message.getEntries());
for (Entry entry : message.getEntries()) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
System.out.println("捕获变更: " + rowChange); // 打印变更详情
}
}
connector.ack(batchId); // 确认消费
}
}
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
} finally {
connector.disconnect();
}
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>canalclient</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version> <!-- 需与服务端版本一致 -->
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.6</version>
</dependency>
</dependencies>
</project>