A year and a half with Debezium: CDC With MySQL

I have just realized that We (at Bigbasket) have been using Debezium for a year and a half, and we have used it for creating a customized change data pipeline from an AWS MySQL RDS for a reporting framework. I think it’s the right time to talk about it.

For the people who don’t know what’s Debezium, it’s an open source distributed tool for change data capture. It uses Apache Kafka to store the data.

How does it work?

If I can bring up a generic definition for how it works, then I will put it in this way: A tool which reads a database and produce messages for every transaction. Also, don’t forget, it brings the luxury of taking snapshot of existing data/database.

It has two high level modes to operate: complete snapshot+ Ongoing data (this mode is known as “initial”) and Just ongoing data (“schema_only” mode).

Some important points to remember:

  • Debezium creates Kafka Topics per Database table.
  • The Kafka message can be in JSON/Avro/protobuf format (I chose JSON, so I prefer to talk about that)
  • Where we can find the actual DB data? The JSON has a key called “payload” and the value payload is again a hashmap (dictionary), which contains two keys: “before” and “after”, like the name indicates those contain the previous and latest data respectively.
  • In snapshot mode, it reads the data from the database using a “select *” query. Also, keep in mind that Debezium locks the database to prevent any kind of schema changes in between. We can override these queries in table level and I will cover part later in this blog.
  • For the ongoing data changes, it reads the binlog, so it won’t hammer the database with any LRQs.
  • Debezium maintains a database history topic. This helps to maintain and recover the data flow.

The Debezium supports the following databases:

I have used it for the MySQL databases, so I will explain the data flow in the notion of MySQL.

Let me first get into the use case we were trying to solve. We had to set up a stitched table: A denormalized table contains data from hundreds of tables which also contain close to 100 columns to store these data pieces.

In general, we can classify tables of a database into mainly two categories:

  • Reference Table: The tables mainly used by other tables as references (Foreign Keys) and used by the queries to filter the data. Usually, the data changes are limited in it, and the number of rows in such tables will be limited as well. For example, consider that we have a table to store the information about the countries. Less likely any data update happens on that table and mainly that one will be used to in queries to filter out the data geographically.
  • Data flowing Table: The tables to which the data flow often. Something like Purchase History is an example of it.

The data transformation I was expecting looks something like this:

Data Transformation: Source DB to Stich DB
Data Transformation: Source DB to Stich DB
Transformation: Source DB to Stitched DB

In the above example, you can clearly identify that, the tables countries and state are reference tables and the tables customers and purchase are more of a Data Flowing Tables.

So I have decided to take the snapshot these reference tables using the Debezium in the initial mode and the connector configuration looks something like below:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8084/connectors/ \
-d '{"name": "mysql-connector-db1",
"config":{
"name":"mysql-connector-db1",
"database.port":"3306",
"database.user":"db_user_name",
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"rows.fetch.size":"2",
"table.whitelist":"database1.countries,database1.state",
"database.hostname":"db_host1",
"database.password":"password",
"database.server.id":"1000",
"database.whitelist":"database1",
"database.server.name":"db1",
"decimal.handling.mode":"string",
"include.schema.changes":"true",
"database.history.kafka.topic":"dbhistory.db1",
"database.history.kafka.bootstrap.servers":"localhost:9092",
"database.history.kafka.recovery.attempts":"20",
"database.history.kafka.recovery.poll.interval.ms":"10000000",
"snapshot.mode": "initial"
}}'

I need only ongoing data for the tables customers and purchase, so I have created a connector in schema_only mode:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8084/connectors/ \
-d '{"name": "mysql-connector-db2",
"config":{
"name":"mysql-connector-db2",
"database.port":"3306",
"database.user":"db_user_name",
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"rows.fetch.size":"2",
"table.whitelist":"database1.customers,database1.purchase",
"database.hostname":"db_host1",
"database.password":"password",
"database.server.id":"1001",
"database.whitelist":"database1",
"database.server.name":"db2",
"decimal.handling.mode":"string",
"include.schema.changes":"true",
"database.history.kafka.topic":"dbhistory.db2",
"database.history.kafka.bootstrap.servers":"localhost:9092",
"database.history.kafka.recovery.attempts":"20",
"database.history.kafka.recovery.poll.interval.ms":"10000000",
"snapshot.mode": "schema_only"
}}'

We can actually combine these two connectors into one using snapshot.select.statement.overrides:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8084/connectors/ \
-d '{"name": "mysql-connector-db3",
"config":{
"name":"mysql-connector-db3",
"database.port":"3306",
"database.user":"db_user_name",
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"rows.fetch.size":"2",
"table.whitelist":"database1.countries,database1.state,database1.customers,database1.purchase",
"database.hostname":"db_host1",
"database.password":"password",
"database.server.id":"1002",
"database.whitelist":"database1",
"database.server.name":"db3",
"decimal.handling.mode":"string",
"include.schema.changes":"true",
"snapshot.select.statement.overrides": "database1.customers,database1.purchase",
"snapshot.select.statement.overrides.database1.customers": "select * from customers where id < 0",
"snapshot.select.statement.overrides.database1.purchase":
"select * from purchase where id < 0",
"database.history.kafka.topic":"dbhistory.db3",
"database.history.kafka.bootstrap.servers":"localhost:9092",
"database.history.kafka.recovery.attempts":"20",
"database.history.kafka.recovery.poll.interval.ms":"10000000",
"snapshot.mode": "initial"
}}'
High Level Architecture

As Debezium runs on Kafka connect and Kafka connect allows you to create multiple connectors, we can have multiple Debezium connectors running in parallel. Like I have mentioned earlier, Debezium produces messages for the snapshot/transaction to Kafka broker(s).

I have written a customized python consumer script(I can open source this one, let me know in the comments) to consume the data from the Kafka brokers. The consumer takes care of parsing the Kafka messages and does the data transformation like mentioned in the previous diagram. And at the end the consumer dumps the data into the Stitched DB.

Like we all know, things may not work as expected always. So before using any tool for any critical project, we always need to check the possibility of a recovery in case of crash or a failure. So how does Debezium tackle any failure?

It has “schema_only_recovery” mode!!

Irrespective of the mode in which we are running the connector(s), we can recover it using schema_only_recovery mode.

But you need to follow certain steps to start the recovery process:

  • Delete the crashed connector(s), use the command below:
curl -i -X DELETE http://localhost:8084/connectors/mysql-connector-db3
  • Delete the database history topic(s).
  • Start the connector(s) in “schema_only_recovery” mode, the sample connector is given below (It’s a recovery, so use the same configuration used in the older connector):
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8084/connectors/ \
-d '{"name": "mysql-connector-db3",
"config":{
"name":"mysql-connector-db3",
"database.port":"3306",
"database.user":"db_user_name",
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"rows.fetch.size":"2",
"table.whitelist":"database1.countries,database1.state,database1.customers,database1.purchase",
"database.hostname":"db_host1",
"database.password":"password",
"database.server.id":"1002",
"database.whitelist":"database1",
"database.server.name":"db3",
"decimal.handling.mode":"string",
"include.schema.changes":"true",
"snapshot.select.statement.overrides": "database1.customers,database1.purchase",
"snapshot.select.statement.overrides.database1.customers": "select * from customers where id < 0",
"snapshot.select.statement.overrides.database1.purchase": "select * from purchase where id < 0",
"database.history.kafka.topic":"dbhistory.db3",
"database.history.kafka.bootstrap.servers":"localhost:9092",
"database.history.kafka.recovery.attempts":"20",
"database.history.kafka.recovery.poll.interval.ms":"10000000",
"snapshot.mode": "schema_only_recovery"
}}'

I have faced some Deserialization issues with Debezium. The connectors were configured to read the data from one AWS RDS and it had some non-standard statements in the binlog. And some errors were getting triggered while reading the logs. I have solved it by setting “event.processing.failure.handling.mode” as “warn”: In this case, it will log the error and move on (the default value is “fail”, in this case, the connector crashes in case it encounter an error).

That’s most of it! It’s one of the coolest tools to play with and it can be handy in many CDC/Data pipeline scenarios/use cases. So try to explore it. You can find the official documentation here.

A techie who talks a bit of philosophy

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store