最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

Mysql数据库监听binlog的开启步骤

来源:动视网 责编:小采 时间:2020-11-09 21:13:12
文档

Mysql数据库监听binlog的开启步骤

Mysql数据库监听binlog的开启步骤:前言 我们经常需要根据用户对自己数据的一些操作来做一些事情. 比如如果用户删除了自己的账号,我们就给他发短信骂他,去发短信求他回来. 类似于这种功能,当然可以在业务逻辑层实现,在收到用户的删除请求之后执行这一操作,但是数据库的binlog为我们提供了
推荐度:
导读Mysql数据库监听binlog的开启步骤:前言 我们经常需要根据用户对自己数据的一些操作来做一些事情. 比如如果用户删除了自己的账号,我们就给他发短信骂他,去发短信求他回来. 类似于这种功能,当然可以在业务逻辑层实现,在收到用户的删除请求之后执行这一操作,但是数据库的binlog为我们提供了


根据自己的业务,封装一个更好使,更定制的工具类

开始的时候打算贴代码的,,,但是代码越写越多,索性传在github上了,这里只贴部分的实现.代码传送门

实现思路

  1. 支持对单个表的监听,因为我们不想真的对所有数据库中的所有数据表进行监听.
  2. 可以多线程消费.
  3. 把监听到的内容转换成我们喜闻乐见的形式(文中的数据结构不一定很好,我没想到更加合适的了).

所以实现思路大致如下:

  1. 封装个客户端,对外只提供获取方法,屏蔽掉初始化的细节代码.
  2. 提供注册监听器(伪)的方法,可以注册对某个表的监听(重新定义一个监听接口,所有注册的监听器实现这个就好).
  3. 真正的监听器只有客户端,他将此数据库实例上的所有操作,全部监听到并转换成我们想要的格式LogItem放进阻塞队列里面.
  4. 启动多个线程,消费阻塞队列,对某一个LogItem调用对应的数据表的监听器,做一些业务逻辑.

初始化代码:

 public MysqlBinLogListener(Conf conf) {
 BinaryLogClient client = new BinaryLogClient(conf.host, conf.port, conf.username, conf.passwd);
 EventDeserializer eventDeserializer = new EventDeserializer();
 eventDeserializer.setCompatibilityMode(
 EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
 EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
 );
 client.setEventDeserializer(eventDeserializer);
 this.parseClient = client;
 this.queue = new ArrayBlockingQueue<>(1024);
 this.conf = conf;
 listeners = new ConcurrentHashMap<>();
 dbTableCols = new ConcurrentHashMap<>();
 this.consumer = Executors.newFixedThreadPool(consumerThreads);
 }

注册代码:

 public void regListener(String db, String table, BinLogListener listener) throws Exception {
 String dbTable = getdbTable(db, table);
 Class.forName("com.mysql.jdbc.Driver");
 // 保存当前注册的表的colum信息
 Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.host + ":" + conf.port, conf.username, conf.passwd);
 Map<String, Colum> cols = getColMap(connection, db, table);
 dbTableCols.put(dbTable, cols);

 // 保存当前注册的listener
 List<BinLogListener> list = listeners.getOrDefault(dbTable, new ArrayList<>());
 list.add(listener);
 listeners.put(dbTable, list);
 }

在这个步骤中,我们在注册监听者的同时,获得了该表的schema信息,并保存到map里面去,方便后续对数据进行处理.

监听代码:

 @Override
 public void onEvent(Event event) {
 EventType eventType = event.getHeader().getEventType();

 if (eventType == EventType.TABLE_MAP) {
 TableMapEventData tableData = event.getData();
 String db = tableData.getDatabase();
 String table = tableData.getTable();
 dbTable = getdbTable(db, table);
 }

 // 只处理添加删除更新三种操作
 if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) {
 if (isWrite(eventType)) {
 WriteRowsEventData data = event.getData();
 for (Serializable[] row : data.getRows()) {
 if (dbTableCols.containsKey(dbTable)) {
 LogItem e = LogItem.itemFromInsert(row, dbTableCols.get(dbTable));
 e.setDbTable(dbTable);
 queue.add(e);
 }
 }
 }
 }
 }

我偷懒了,,,这里面只实现了对添加操作的处理,其他操作没有写.

消费代码:

 public void parse() throws IOException {
 parseClient.registerEventListener(this);

 for (int i = 0; i < consumerThreads; i++) {
 consumer.submit(() -> {
 while (true) {
 if (queue.size() > 0) {
 try {
 LogItem item = queue.take();
 String dbtable = item.getDbTable();
 listeners.get(dbtable).forEach(l -> {
 l.onEvent(item);
 });

 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 Thread.sleep(1000);
 }
 });
 }
 parseClient.connect();
 }

消费时,从队列中获取item,之后获取对应的一个或者多个监听者,分别消费这个item.

测试代码:

 public static void main(String[] args) throws Exception {
 Conf conf = new Conf();
 conf.host = "hostname";
 conf.port = 3306;
 conf.username = conf.passwd = "hhsgsb";

 MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(conf);
 mysqlBinLogListener.parseArgsAndRun(args);
 mysqlBinLogListener.regListener("pf", "student", item -> {
 System.out.println(new String((byte[])item.getAfter().get("name")));
 logger.info("insert into {}, value = {}", item.getDbTable(), item.getAfter());
 });
 mysqlBinLogListener.regListener("pf", "teacher", item -> System.out.println("teacher ===="));

 mysqlBinLogListener.parse();
 }

在这段很少的代码里,注册了两个监听者,分别监听student和teacher表,并分别进行打印处理,经测试,在teacher表插入数据时,可以独立的运行定义的业务逻辑.

注意:这里的工具类并不能直接投入使用,因为里面有许多的异常处理没有做,且功能仅监听了插入语句,可以用来做实现的参考.

参考文章

  • github.com/shyiko/mysq…
  • https://www.gxlcms.com/article/166761.htm
  • 总结

    文档

    Mysql数据库监听binlog的开启步骤

    Mysql数据库监听binlog的开启步骤:前言 我们经常需要根据用户对自己数据的一些操作来做一些事情. 比如如果用户删除了自己的账号,我们就给他发短信骂他,去发短信求他回来. 类似于这种功能,当然可以在业务逻辑层实现,在收到用户的删除请求之后执行这一操作,但是数据库的binlog为我们提供了
    推荐度:
    标签: 开启 步骤 方法
    • 热门焦点

    最新推荐

    猜你喜欢

    热门推荐

    专题
    Top