Adding change-log to a legacy mysql based application using Kafka and Debezium
Adding change-log to a legacy APP using Kafka and Debezium (without modifying the codebase)
Mission:
- Emmit events everytime data changes in an existing application, without touching the original codebase.
- Use those events to create a live changelog of the app
High level milestones:
- Get a legacy app running (MYSQL based)
- Setup Kafka and Debezium
Let´s start
STEP #1 - Get a legacy app based on MYSQL database
The application should follow some rules:
- Have a MYSQL database
- MYSQL binlog should be enabled
If you don’t have any app like that, this is as simple drupal example to load with docker-compose
a. create a file named docker-compose.yml
with the following content inside:
version: '3.1'
services:
drupal:
image: drupal
ports:
- 8080:80
volumes:
- /var/www/html/modules
- /var/www/html/profiles
- /var/www/html/themes
- /var/www/html/sites
restart: always
mysql:
image: mysql
ports:
- 23306:3306
command: --default-authentication-plugin=mysql_native_password
restart: always
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_ALLOW_EMPTY_PASSWORD: "true"
MYSQL_DATABASE: drupal_db
b. Run docker-compose up -d
on the same folder as the file lives
c. When the drupal container is up, you can access the setup page at: http://localhost:8080 and complete the setup:
profile: "Demo"
db_type: "MySQL"
db_name: "drupal_db"
user: "root"
pass: "root"
host: "mysql"
d. After finish this experiment, we can cleanup all the containers we just created by running: docker-compose stop && docker-compose rm -f
e. Validate database is accessible from the host machine (I’m using mycli but you can use any other client) :
# connect
mycli mysql://root@127.0.0.1:23306/drupal_db
# check the tables are there
SHOW TABLES;
# check the binlog is enabled
SHOW VARIABLES LIKE 'log_bin';
#should be like:
+-----------------+---------+
| Variable_name | Value |
|-----------------+---------|
| log_bin | ON |
+-----------------+---------+
1 row in set
Time: 0.019s
Once finished we will get this:
This is a good moment to get up , stretch your legs
, and grab another cup of
because we’re just about to start the fun part!
STEP #2 - Setup Kafka and debezium
For our purposes we will use the Confluent Open Source Kafka quick start bundle https://www.confluent.io/download/
- Download and unzip the confluent package (in my case
confluent-oss-5.0.1-2.11.zip
) - Add Debezium plugin (this will be usefull to also apply changes to the streamed data). Download the latest from https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/ (in my case
debezium-connector-mysql-0.9.0.Alpha2-plugin.tar.gz
) unzip it and move the extracted folder toshare/java/
Start Kafka: ./bin/confluent start
Also you can access http://localhost:8083/ to check the rest API is working.
Testing producer/consumer w/ simple messages
let’s use 2 teminal sessions to teest kafka is working fine:
In one terminal start the Kafka avro console consumer: ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
Then in another terminal, let’s start the producer: ./bin/kafka-console-producer --broker-list localhost:9092 --topic test
At this point you can type anything and every time you hit enter the message will be sent to the test
topic and will appear in the console consumer
Stop the producer hit Ctrl + C.
Configure Debezium to listen database changes
First create a new configuration file /tmp/debezium-source.json
with the conection details and the whitelist of tables to listen.
{
"name": "debezium-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "23306",
"database.user": "root",
"database.password": "root",
"database.server.id": "184054",
"database.dbname" : "drupal_db",
"database.server.name": "changelogExample",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.drupal_db",
"snapshot.mode" : "when_needed"
}
}
and load that configuration into Connect:
./bin/confluent load debezium-source -d /tmp/debezium-source.json
Other useful commands are:
# get the status
./bin/confluent status debezium-source
# unload config
./bin/confluent unload debezium-source
# relaod config
./bin/confluent config debezium-source -d /tmp/debezium-source.json
Let’s see the topics list:
./bin/kafka-topics --zookeeper localhost:2181 --list
Click to see results
__confluent.support.metrics __consumer_offsets _confluent-ksql-default__command_topic _schemas changelogExample changelogExample.drupal_db.block_content changelogExample.drupal_db.block_content__field_banner_image changelogExample.drupal_db.block_content__field_content_link changelogExample.drupal_db.block_content__field_copyright changelogExample.drupal_db.block_content__field_disclaimer changelogExample.drupal_db.block_content__field_promo_image changelogExample.drupal_db.block_content__field_summary changelogExample.drupal_db.block_content__field_title changelogExample.drupal_db.block_content_field_data changelogExample.drupal_db.block_content_field_revision changelogExample.drupal_db.block_content_revision changelogExample.drupal_db.block_content_revision__field_banner_image changelogExample.drupal_db.block_content_revision__field_content_link changelogExample.drupal_db.block_content_revision__field_copyright changelogExample.drupal_db.block_content_revision__field_disclaimer changelogExample.drupal_db.block_content_revision__field_promo_image changelogExample.drupal_db.block_content_revision__field_summary changelogExample.drupal_db.block_content_revision__field_title changelogExample.drupal_db.cache_bootstrap changelogExample.drupal_db.cache_config changelogExample.drupal_db.cache_container changelogExample.drupal_db.cache_data changelogExample.drupal_db.cache_default changelogExample.drupal_db.cache_discovery changelogExample.drupal_db.cache_entity changelogExample.drupal_db.cache_menu changelogExample.drupal_db.cache_render changelogExample.drupal_db.cachetags changelogExample.drupal_db.config changelogExample.drupal_db.content_moderation_state changelogExample.drupal_db.content_moderation_state_field_data changelogExample.drupal_db.content_moderation_state_field_revision changelogExample.drupal_db.content_moderation_state_revision changelogExample.drupal_db.file_managed changelogExample.drupal_db.file_usage changelogExample.drupal_db.history changelogExample.drupal_db.key_value changelogExample.drupal_db.key_value_expire changelogExample.drupal_db.menu_tree changelogExample.drupal_db.node changelogExample.drupal_db.node__body changelogExample.drupal_db.node__field_cooking_time changelogExample.drupal_db.node__field_difficulty changelogExample.drupal_db.node__field_image changelogExample.drupal_db.node__field_ingredients changelogExample.drupal_db.node__field_number_of_servings changelogExample.drupal_db.node__field_preparation_time changelogExample.drupal_db.node__field_recipe_category changelogExample.drupal_db.node__field_recipe_instruction changelogExample.drupal_db.node__field_summary changelogExample.drupal_db.node__field_tags changelogExample.drupal_db.node_access changelogExample.drupal_db.node_field_data changelogExample.drupal_db.node_field_revision changelogExample.drupal_db.node_revision changelogExample.drupal_db.node_revision__body changelogExample.drupal_db.node_revision__field_cooking_time changelogExample.drupal_db.node_revision__field_difficulty changelogExample.drupal_db.node_revision__field_image changelogExample.drupal_db.node_revision__field_ingredients changelogExample.drupal_db.node_revision__field_number_of_servings changelogExample.drupal_db.node_revision__field_preparation_time changelogExample.drupal_db.node_revision__field_recipe_category changelogExample.drupal_db.node_revision__field_recipe_instruction changelogExample.drupal_db.node_revision__field_summary changelogExample.drupal_db.node_revision__field_tags changelogExample.drupal_db.router changelogExample.drupal_db.search_dataset changelogExample.drupal_db.search_index changelogExample.drupal_db.search_total changelogExample.drupal_db.semaphore changelogExample.drupal_db.sequences changelogExample.drupal_db.sessions changelogExample.drupal_db.shortcut changelogExample.drupal_db.shortcut_field_data changelogExample.drupal_db.taxonomy_index changelogExample.drupal_db.taxonomy_term__parent changelogExample.drupal_db.taxonomy_term_data changelogExample.drupal_db.taxonomy_term_field_data changelogExample.drupal_db.url_alias changelogExample.drupal_db.user__roles changelogExample.drupal_db.users changelogExample.drupal_db.users_field_data changelogExample.drupal_db.watchdog connect-configs connect-offsets connect-statuses dbhistory.drupal_dbAnd now check the changelog of one of the tables:
# Please note I'm using jq tool to format json output :
./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic changelogExample.drupal_db.node__body --from-beginning | jq "."
Final Notes
Despite this example is a proof of concept about relational databases and changelogs, I want to highlight the power and flexibility of Kafka and Debezium.
Other uses of events generated by Debezium:
(extracted from https://debezium.io/docs/faq/#what_are_some_uses_of_debezium)
- Update search indexes with the data.
- Update a derived data store with the same information or with information computed from the changing data, such as with Command Query Responsibility Separation (CQRS).
- Send a push notification to one or more mobile devices.
- They might aggregate the changes and produce a stream of patches for entities.
Resources and sites I used to build this:
- https://docs.confluent.io/current/quickstart/cos-quickstart.html#cos-quickstart
- https://rmoff.net/2018/03/24/streaming-data-from-mysql-into-kafka-with-kafka-connect-and-debezium/
- https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/
- https://debezium.io/docs/configuration/event-flattening/
- https://kafka.apache.org/quickstart
- https://iamninad.com/how-debezium-kafka-stream-can-help-you-write-cdc/