es01-canal-由工作内容引入


es01-canal-由工作内容引入

工作中用到了es,需求是将一些文章放到es里便于检索,对于如何从数据库同步数据到es有一些不同的选择,同事说用cannal来同步数据。
我之前只接触过mq发送到es,及logstash的方案,对于这种实现方案还没有试过。
那什么是cannal呢。

.canal

  1. canal是什么,官网上显示如下。

    canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
    这里我们可以简单地把canal理解为一个用来同步增量数据的一个工具。

  2. canal的工作原理就是把自己伪装成MySQL slave

  • 模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,
  • MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。
    es01

可以用canal来做

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护
  • 业务cache(缓存)刷新
  • 带业务逻辑的增量数据处理
  1. 搭建一个canal
    前提:
    • jvm
    • mysql
    • mq es 等
  • 去官网下载页面进行下载:https://github.com/alibaba/canal/releases
    在自己电脑上解压运行 canal.deployer

  • 可以使用java客户端来操作canal

     <dependency>
       <groupId>com.alibaba.otter</groupId>
       <artifactId>canal.client</artifactId>
       <version>1.1.4</version>
    </dependency>
  • 使用客户端的demo

    搭建一个springboot项目,然后建立下面的类。

    @Component
    public class CannalClient implements InitializingBean {
    
        private final static int BATCH_SIZE = 1000;
    
        @Override
        public void afterPropertiesSet() throws Exception {
            // 创建链接
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
            try {
                //打开连接
                connector.connect();
                //订阅数据库表,全部表
                connector.subscribe(".*\\..*");
                //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
                connector.rollback();
                while (true) {
                    // 获取指定数量的数据
                    Message message = connector.getWithoutAck(BATCH_SIZE);
                    //获取批量ID
                    long batchId = message.getId();
                    //获取批量的数量
                    int size = message.getEntries().size();
                    //如果没有数据
                    if (batchId == -1 || size == 0) {
                        try {
                            //线程休眠2秒
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    } else {
                        //如果有数据,处理数据
                        printEntry(message.getEntries());
                    }
                    //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                    connector.ack(batchId);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                connector.disconnect();
            }
        }
    
        /**
         * 打印canal server解析binlog获得的实体类信息
         */
        private static void printEntry(List<Entry> entrys) {
            for (Entry entry : entrys) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    //开启/关闭事务的实体类型,跳过
                    continue;
                }
                //RowChange对象,包含了一行数据变化的所有特征
                //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
                RowChange rowChage;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
                }
                //获取操作类型:insert/update/delete类型
                EventType eventType = rowChage.getEventType();
                //打印Header信息
                System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));
                //判断是否是DDL语句
                if (rowChage.getIsDdl()) {
                    System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
                }
                //获取RowChange对象里的每一行数据,打印出来
                for (RowData rowData : rowChage.getRowDatasList()) {
                    //如果是删除语句
                    if (eventType == EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                        //如果是新增语句
                    } else if (eventType == EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                        //如果是更新的语句
                    } else {
                        //变更前的数据
                        System.out.println("------->; before");
                        printColumn(rowData.getBeforeColumnsList());
                        //变更后的数据
                        System.out.println("------->; after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    
        private static void printColumn(List<Column> columns) {
            for (Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    }
  • canal Adapter
    为了便于大家的使用,官方做了一个独立的组件Adapter,Adapter是可以将canal server端获取的数据转换成几个常用的中间件数据源,现在支持kafka、rocketmq、rebitmq、hbase、elasticsearch,针对这几个中间件的支持,直接配置即可,无需开发。上文中,如果需要将mysql的数据同步到elasticsearch,直接运行 canal Adapter,修改相关的配置即可。

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ
本项目使用的即为canal 使用了mq发送数据来处理。
本质上和之前接触的没有太大的差距
https://github.com/alibaba/canal/wiki/
wiki写的很棒

es02

如上图所示

  • 3是mq消费者,取到数据后实例化为2所示dto类
  • 4是一个接口,定义了一些同步的增删改查的方法,如果要实现基本上是一个表对应一个实现。
  • 1会将4的实现类放到一个map中,key为实现类对应的表名。在3中根据消息的表名,调用这个map中的实现类处理数据。

Author: 向天歌
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint policy. If reproduced, please indicate source 向天歌 !
  TOC