Skip to content

canal需要注意的点

Posted on:April 2, 2021 at 11:48 AM

比较坑的点:

1 每次同步的内容会每秒持久化到file或者zk ,binlog一般只保留的几天,如果你持久化到文件/zk的配置的binlog文件在mysql已经不存在了会报错, 报错信息大概如下

java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file

这个时候只能调整配置或者删除mate.dat 文件,然后重启canal , 这个时候他会使用mysql的语句show status去取最新位点

2 重启canal有个非常非常坑的点在于会读information_schema 这个库的内容去读表名和表id等信息 ,而这个往往会很久,不知道是不是测试环境原因,读了挺久的

mate刷新的逻辑

根据配置每秒刷新到mate信息 也就是文件或者zk上,所以重启会有重复消费

找到位点

加载顺序: 1 从mate中获取位点: getLatestIndexBy 也就是从 memeory/zk或者file的mate信息中读取位点 2 根据配置读取:

    protected EntryPosition findStartPositionInternal(ErosaConnection connection) {
        MysqlConnection mysqlConnection = (MysqlConnection) connection;
        LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);
        if (logPosition == null) {// 找不到历史成功记录
            EntryPosition entryPosition = null;
            if (masterInfo != null && mysqlConnection.getConnector().getAddress().equals(masterInfo.getAddress())) {
                entryPosition = masterPosition;
            } else if (standbyInfo != null
                       && mysqlConnection.getConnector().getAddress().equals(standbyInfo.getAddress())) {
                entryPosition = standbyPosition;
            }

            if (entryPosition == null) {
                entryPosition = findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
            }

            // 判断一下是否需要按时间订阅
            if (StringUtils.isEmpty(entryPosition.getJournalName())) {
                // 如果没有指定binlogName,尝试按照timestamp进行查找
                if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) {
                    logger.warn("prepare to find start position {}:{}:{}",
                        new Object[] { "", "", entryPosition.getTimestamp() });
                    return findByStartTimeStamp(mysqlConnection, entryPosition.getTimestamp());
                } else {
                    logger.warn("prepare to find start position just show master status");
                    return findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
                }
            } else {
                if (entryPosition.getPosition() != null && entryPosition.getPosition() > 0L) {
                    // 如果指定binlogName + offest,直接返回
                    entryPosition = findPositionWithMasterIdAndTimestamp(mysqlConnection, entryPosition);
                    logger.warn("prepare to find start position {}:{}:{}",
                        new Object[] { entryPosition.getJournalName(), entryPosition.getPosition(),
                                entryPosition.getTimestamp() });
                    return entryPosition;
                } else {
                    EntryPosition specificLogFilePosition = null;
                    if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) {
                        // 如果指定binlogName +
                        // timestamp,但没有指定对应的offest,尝试根据时间找一下offest
                        EntryPosition endPosition = findEndPosition(mysqlConnection);
                        if (endPosition != null) {
                            logger.warn("prepare to find start position {}:{}:{}",
                                new Object[] { entryPosition.getJournalName(), "", entryPosition.getTimestamp() });
                            specificLogFilePosition = findAsPerTimestampInSpecificLogFile(mysqlConnection,
                                entryPosition.getTimestamp(),
                                endPosition,
                                entryPosition.getJournalName(),
                                true);
                        }
                    }

                    if (specificLogFilePosition == null) {
                        // position不存在,从文件头开始
                        entryPosition.setPosition(BINLOG_START_OFFEST);
                        return entryPosition;
                    } else {
                        return specificLogFilePosition;
                    }
                }
            }
        } else {
            if (logPosition.getIdentity().getSourceAddress().equals(mysqlConnection.getConnector().getAddress())) {
                if (dumpErrorCountThreshold >= 0 && dumpErrorCount > dumpErrorCountThreshold) {
                    // binlog定位位点失败,可能有两个原因:
                    // 1. binlog位点被删除
                    // 2.vip模式的mysql,发生了主备切换,判断一下serverId是否变化,针对这种模式可以发起一次基于时间戳查找合适的binlog位点
                    boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null)
                                    && logPosition.getPostion().getServerId() != null
                                    && !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));
                    if (case2) {
                        EntryPosition findPosition = fallbackFindByStartTimestamp(logPosition, mysqlConnection);
                        dumpErrorCount = 0;
                        return findPosition;
                    }
                    // 处理 binlog 位点被删除的情况,提供自动重置到当前位点的功能
                    // 应用场景: 测试环境不稳定,位点经常被删。强烈不建议在正式环境中开启此控制参数,因为binlog 丢失调到最新位点也即意味着数据丢失
                    if (isAutoResetLatestPosMode()) {
                        dumpErrorCount = 0;
                        return findEndPosition(mysqlConnection);
                    }
                    Long timestamp = logPosition.getPostion().getTimestamp();
                    if (isRdsOssMode() && (timestamp != null && timestamp > 0)) {
                        // 如果binlog位点不存在,并且属于timestamp不为空,可以返回null走到oss binlog处理
                        return null;
                    }
                } else if (StringUtils.isBlank(logPosition.getPostion().getJournalName())
                        && logPosition.getPostion().getPosition() <= 0
                        && logPosition.getPostion().getTimestamp() > 0) {
                    return fallbackFindByStartTimestamp(logPosition,mysqlConnection);
                }
                // 其余情况
                logger.warn("prepare to find start position just last position\n {}",
                    JsonUtils.marshalToString(logPosition));
                return logPosition.getPostion();
            } else {
                // 针对切换的情况,考虑回退时间
                long newStartTimestamp = logPosition.getPostion().getTimestamp() - fallbackIntervalInSeconds * 1000;
                logger.warn("prepare to find start position by switch {}:{}:{}", new Object[] { "", "",
                        logPosition.getPostion().getTimestamp() });
                return findByStartTimeStamp(mysqlConnection, newStartTimestamp);
            }
        }

事件类型

事件有很多类型,我现在只对update和insert 感兴趣

public enum EventType
      implements com.google.protobuf.ProtocolMessageEnum {
    /**
     * <code>INSERT = 1;</code>
     */
    INSERT(0, 1),
    /**
     * <code>UPDATE = 2;</code>
     */
    UPDATE(1, 2),
    /**
     * <code>DELETE = 3;</code>
     */
    DELETE(2, 3),
    /**
     * <code>CREATE = 4;</code>
     */
    CREATE(3, 4),
    /**
     * <code>ALTER = 5;</code>
     */
    ALTER(4, 5),
    /**
     * <code>ERASE = 6;</code>
     */
    ERASE(5, 6),
    /**
     * <code>QUERY = 7;</code>
     */
    QUERY(6, 7),
    /**
     * <code>TRUNCATE = 8;</code>
     */
    TRUNCATE(7, 8),
    /**
     * <code>RENAME = 9;</code>
     */
    RENAME(8, 9),
    /**
     * <code>CINDEX = 10;</code>
     *
     * <pre>
     **CREATE INDEX*
     * </pre>
     */
    CINDEX(9, 10),
    /**
     * <code>DINDEX = 11;</code>
     */
    DINDEX(10, 11),
    /**
     * <code>GTID = 12;</code>
     */
    GTID(11, 12),
    /**
     * <code>XACOMMIT = 13;</code>
     *
     * <pre>
     ** XA *
     * </pre>
     */
    XACOMMIT(12, 13),
    /**
     * <code>XAROLLBACK = 14;</code>
     */
    XAROLLBACK(13, 14),
    /**
     * <code>MHEARTBEAT = 15;</code>
     *
     * <pre>
     ** MASTER HEARTBEAT *
     * </pre>
     */
    MHEARTBEAT(14, 15),
    ;

http://www.tianshouzhi.com/api/tutorials/canal