canal
本文最后更新于 133 天前,如有失效请评论区留言。

canal的入门

从mysql开始说起,mysql可以进行主从的配置,主节点内部有一个binlog日志是用来做持久化处理,如果是row格式下就是对应每一条记录的修改,然后主从同步可以在主节点插入后进行从节点同步操作(dump sync)。也就是开启一个线程去发送binlog日志流事件给slave节点,slave收到后同步然后完成ack,整个主从同步的流程就是这样。

canal就是通过模拟slave节点接收主节点的修改记录从而接收到监听记录。如果要实现高可用就得去搭建zookeeper canal集群,还有kafka等mq队列,如果不是就是基于内存的队列,容易丢失。

场景

  1. 实现mysql与redis数据一致性
  2. 完成多数据库如es等数据同步操作

canal

img

整体大的机构如上

  • instance 数据库实例,每个会使用socket连接到mysql模拟一个slave
  • parset,解析binlog二进制流事件
  • sink过滤
  • store存储

当 Canal Server 启动后,会根据配置启动 N 个 CanalInstance, 每个 CanalInstance 都会使用 socket 连接 mysql,dump binlog,然后将数据交给 parser 解析,sink 过滤,store 存储,当 client 连接时,会从 zk 上读取该 client 的信息,而 metaManager 元数据管理就是管理 zk(当然有多种实现,比如存储在文件中) 信息的,如果发生错误了,就调用 Alarm 发送报警信息。

一些具体细节后面再仔细学习

安装canal

  1. https://github.com/alibaba/canal/releases ,下载deployer版

  2. 修改配置文件

    image-20240911132646942

​ 一个文件夹就是一个监听库,我们需要到里面修改配置,也就是instance.properties,主要就是修改数据库信息而已,要是有消息可靠性需要可以去canal总配置文件修改队列配置,可以看出基本的队列还是支持的

image-20240911132846727

如果是集群模式,还得用zk去协调节点

image-20240911133039463

后面直接启动就行

image-20240911133726273

代码

1.依赖

 <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>

2.编写监听类

根据实际编写

public CanalConnector getConn() {
        String host = "127.0.0.1";
        int port = 11111;
        String username = "admin";
        String password = "4ACFE3202A5FF5CF467898FC58AAB1D615029441";
        String instance = "example";
        return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);
    }

循环阻塞获取

public void run( ) throws Exception {
        CanalConnector conn = getConn();
        conn.connect();
        //订阅实例中所有的数据库和表
//        conn.subscribe(".*\\..");
        conn.subscribe("d-0.bean_1");
        // 回滚到未进行ack的地方
        conn.rollback();
        try {
            while (true) {
                // 获取数据 每次获取一百条改变数据
                Message message = conn.getWithoutAck(1);
                long id = message.getId();
                int size = message.getEntries().size();
                if (id != -1 && size > 0) {
                    // 数据解析
                    analysis(message.getEntries());
                    // 确认消息
                    conn.ack(message.getId());
                } else {
                    Thread.sleep(1000);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭连接
            conn.disconnect();
        }
    }

解析数据

    private void analysis(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            // 只解析mysql事务的操作,其他的不解析
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN) {
                continue;
            }
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            // 解析binlog
            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);
            }
            if (rowChange != null) {
                // 获取操作类型
                CanalEntry.EventType eventType = rowChange.getEventType();
                // 获取当前操作所属的数据库
                String dbName = entry.getHeader().getSchemaName();
                // 获取当前操作所属的表
                String tableName = entry.getHeader().getTableName();
                // 事务提交时间
                long timestamp = entry.getHeader().getExecuteTime();
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    dataDetails(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), dbName, tableName, eventType, timestamp);
                    System.out.println("-------------------------------------------------------------");
                }
            }
        }
    }

分发操作

private static void dataDetails(List<CanalEntry.Column> beforeColumns,
                                    List<CanalEntry.Column> afterColumns,
                                    String dbName,
                                    String tableName,
                                    CanalEntry.EventType eventType,
                                    long timestamp) {

        System.out.println("数据库:" + dbName);
        System.out.println("表名:" + tableName);
        System.out.println("操作类型:" + eventType);
        if (CanalEntry.EventType.INSERT.equals(eventType)) {
            System.out.println("新增数据:");
            printColumn(afterColumns);
        } else if (CanalEntry.EventType.DELETE.equals(eventType)) {
            System.out.println("删除数据:");
            printColumn(beforeColumns);
        } else {
            System.out.println("更新数据:更新前数据--");
            printColumn(beforeColumns);
            System.out.println("更新数据:更新后数据--");
            printColumn(afterColumns);
        }
        System.out.println("操作时间:" + timestamp);
    }

然后在具体操作编写对应业务就就行

image-20240911134147342

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇