canal 是阿里出的一个MySQL 数据库增量日志解析的工具。
我主要用它来监控mysql数据内容的变化。这在重构一个很乱的项目的时候非常有用。
大概流程。下载
https://github.com/alibaba/canal/releases
执行canal\bin
下面的启动工具。我是
.\startup.bat
然后客户端使用的是nodejs,在这里下载
https://github.com/marmot-z/canal-nodejs
# 下载项目依赖
npm install
# 启动 example 文件
npm run example
这样就可以了。中间如果遇到什么问题,根据报错内容一一解决即可。
最重看到的结果是这样的
阅读体验不好。我需要的是更改记录。所以不需要这么详细。所以我改了一下nodejs的客户端的输出代码
位置在
example\example.ts
import CanalConnector from "../src/canal-connector";
import CanalConnectors from "../src/canal-connectors";
import Message from "../src/Message";
import { com } from '../src/gen/canal-proto';
import protocol = com.alibaba.otter.canal.protocol;
const chalk = require('chalk');
(async function () {
const host: string = '127.0.0.1';
const port: number = 11111;
const destination: string = 'example';
const username: string = '';
const password: string = '';
let connector: CanalConnector = CanalConnectors.newSingleConnector({
host,
port,
destination,
username,
password,
filter: '.*\\..*' // 注意这里使用了反斜杠转义点号
});
try {
await connector.connect();
while (true) {
let message: Message = await connector.getWithoutAck(1);
let batchId: number = message.id;
if (batchId !== -1 && message.entries?.length) {
try {
printEntry(message.entries);
} finally {
await connector.ack(batchId);
}
}
await sleep(3000); // 等待 3 秒
}
} catch (e) {
console.error('Occur error', e as Error);
} finally {
if (connector.isConnect()) {
connector.disconnect();
}
}
})();
function sleep(millis: number): Promise<void> {
return new Promise((resolve, _) => setTimeout(resolve, millis));
}
function printEntry(entries: protocol.Entry[]): void {
for (let entry of entries) {
if (
entry.entryType === protocol.EntryType.TRANSACTIONBEGIN ||
entry.entryType === protocol.EntryType.TRANSACTIONEND
) {
console.log('监听事务继续中');
continue;
}
let rowChange = protocol.RowChange.decode(entry.storeValue);
let tableName = entry.header?.tableName ?? ''; // 确保 tableName 是一个字符串
if (rowChange.isDdl) {
console.log('Ddl entry, sql:', rowChange.sql);
}
for (let rowData of rowChange.rowDatas) {
switch (rowChange.eventType) {
case protocol.EventType.DELETE:
printChangedColumns(tableName, rowData.beforeColumns, null, 'DELETE');
break;
case protocol.EventType.INSERT:
printInsertColumns(tableName, rowData.afterColumns);
break;
case protocol.EventType.UPDATE:
printChangedColumns(tableName, rowData.beforeColumns, rowData.afterColumns, 'UPDATE');
break;
default:
console.log('Unsupported event type:', rowChange.eventType);
break;
}
}
}
}
// 处理 INSERT 操作的特殊函数
function printInsertColumns(
tableName: string,
afterColumns: protocol.IColumn[] | null | undefined
): void {
if (afterColumns) {
// 获取 id 列
const idColumn = afterColumns.find((column) => column.name === 'id');
const idValue = idColumn?.value;
// 打印插入操作的开始分隔线,包含表名和 ID
console.log(chalk.green(`------------------- 插入开始 ${tableName}.id=${idValue} -------------------`));
// 打印列内容,不重复包含表名和 ID,并保留颜色
for (const column of afterColumns) {
const output = `${column.name} : ${chalk.green(column.value)}`;
console.log(output);
}
// 打印插入操作的结束分隔线
console.log(chalk.green('------------------- 插入结束 -------------------'));
}
}
// 处理 DELETE 和 UPDATE 操作的函数
function printChangedColumns(
tableName: string,
beforeColumns: protocol.IColumn[] | null | undefined,
afterColumns: protocol.IColumn[] | null | undefined,
eventType: string
): void {
if (eventType === 'DELETE' && beforeColumns) {
// DELETE 操作只显示一次 id 信息
const idColumn = beforeColumns.find((column) => column.name === 'id');
const output = `[${eventType} ${tableName}.id=${idColumn?.value}]`;
console.log(chalk.red(output)); // 删除操作用红色
} else if (beforeColumns && afterColumns) {
const beforeColumnMap = new Map(beforeColumns.map((column) => [column.name, column]));
const afterColumnMap = new Map(afterColumns.map((column) => [column.name, column]));
// 假设 id 字段总是存在
const idColumn = beforeColumnMap.get('id') || afterColumnMap.get('id');
for (const [columnName, beforeColumn] of beforeColumnMap) {
const afterColumn = afterColumnMap.get(columnName);
if (afterColumn && beforeColumn.value !== afterColumn.value) {
// 根据事件类型设置不同的颜色
const output = `[${eventType} ${tableName}.id=${idColumn?.value}] ${columnName} : ${beforeColumn.value} -> ${afterColumn.value}`;
if (eventType === 'DELETE') {
console.log(chalk.red(output));
} else if (eventType === 'INSERT') {
console.log(chalk.green(output));
} else if (eventType === 'UPDATE') {
console.log(chalk.yellow(output));
} else {
console.log(output); // 未知事件类型,保持默认颜色
}
}
}
}
}
最终样子变成了下面这样