canal的入门
从mysql开始说起,mysql可以进行主从的配置,主节点内部有一个binlog日志是用来做持久化处理,如果是row格式下就是对应每一条记录的修改,然后主从同步可以在主节点插入后进行从节点同步操作(dump sync)。也就是开启一个线程去发送binlog日志流事件给slave节点,slave收到后同步然后完成ack,整个主从同步的流程就是这样。
canal就是通过模拟slave节点接收主节点的修改记录从而接收到监听记录。如果要实现高可用就得去搭建zookeeper canal集群,还有kafka等mq队列,如果不是就是基于内存的队列,容易丢失。
场景
- 实现mysql与redis数据一致性
- 完成多数据库如es等数据同步操作
canal
整体大的机构如上
- instance 数据库实例,每个会使用socket连接到mysql模拟一个slave
- parset,解析binlog二进制流事件
- sink过滤
- store存储
当 Canal Server 启动后,会根据配置启动 N 个 CanalInstance, 每个 CanalInstance 都会使用 socket 连接 mysql,dump binlog,然后将数据交给 parser 解析,sink 过滤,store 存储,当 client 连接时,会从 zk 上读取该 client 的信息,而 metaManager 元数据管理就是管理 zk(当然有多种实现,比如存储在文件中) 信息的,如果发生错误了,就调用 Alarm 发送报警信息。
一些具体细节后面再仔细学习
安装canal
-
https://github.com/alibaba/canal/releases ,下载deployer版
-
修改配置文件
一个文件夹就是一个监听库,我们需要到里面修改配置,也就是instance.properties,主要就是修改数据库信息而已,要是有消息可靠性需要可以去canal总配置文件修改队列配置,可以看出基本的队列还是支持的
如果是集群模式,还得用zk去协调节点
后面直接启动就行
代码
1.依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
2.编写监听类
根据实际编写
public CanalConnector getConn() {
String host = "127.0.0.1";
int port = 11111;
String username = "admin";
String password = "4ACFE3202A5FF5CF467898FC58AAB1D615029441";
String instance = "example";
return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);
}
循环阻塞获取
public void run( ) throws Exception {
CanalConnector conn = getConn();
conn.connect();
//订阅实例中所有的数据库和表
// conn.subscribe(".*\\..");
conn.subscribe("d-0.bean_1");
// 回滚到未进行ack的地方
conn.rollback();
try {
while (true) {
// 获取数据 每次获取一百条改变数据
Message message = conn.getWithoutAck(1);
long id = message.getId();
int size = message.getEntries().size();
if (id != -1 && size > 0) {
// 数据解析
analysis(message.getEntries());
// 确认消息
conn.ack(message.getId());
} else {
Thread.sleep(1000);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭连接
conn.disconnect();
}
}
解析数据
private void analysis(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
// 只解析mysql事务的操作,其他的不解析
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN) {
continue;
}
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
// 解析binlog
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);
}
if (rowChange != null) {
// 获取操作类型
CanalEntry.EventType eventType = rowChange.getEventType();
// 获取当前操作所属的数据库
String dbName = entry.getHeader().getSchemaName();
// 获取当前操作所属的表
String tableName = entry.getHeader().getTableName();
// 事务提交时间
long timestamp = entry.getHeader().getExecuteTime();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
dataDetails(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), dbName, tableName, eventType, timestamp);
System.out.println("-------------------------------------------------------------");
}
}
}
}
分发操作
private static void dataDetails(List<CanalEntry.Column> beforeColumns,
List<CanalEntry.Column> afterColumns,
String dbName,
String tableName,
CanalEntry.EventType eventType,
long timestamp) {
System.out.println("数据库:" + dbName);
System.out.println("表名:" + tableName);
System.out.println("操作类型:" + eventType);
if (CanalEntry.EventType.INSERT.equals(eventType)) {
System.out.println("新增数据:");
printColumn(afterColumns);
} else if (CanalEntry.EventType.DELETE.equals(eventType)) {
System.out.println("删除数据:");
printColumn(beforeColumns);
} else {
System.out.println("更新数据:更新前数据--");
printColumn(beforeColumns);
System.out.println("更新数据:更新后数据--");
printColumn(afterColumns);
}
System.out.println("操作时间:" + timestamp);
}
然后在具体操作编写对应业务就就行