Capturing MySQL database changes using Debezium, Kafka, and Conduktor

Using Kafka Connect in Conduktor and specifically how to use Debezium to monitor the changes in a MySQL database.

Lorcan
Lorcan Feb 14, 2022

What is Conduktor?

Conduktor is a native desktop application that is a graphical user interface for your Apache Kafka environment and supports the full Kafka ecosystem including Schema Registry, Kafka Connect, Kafka Streams, and ksqlDB.

If you use Apache Kafka, Conduktor will increase productivity, reduce costs and ultimately, accelerate delivery of your critical data streaming projects. Download Conduktor now

What is Kafka Connect?

Kafka Connect is a tool in the Apache Kafka ecosystem which allows users to integrate their data sources between Apache Kafka and other data systems in a reliable way.

Kafka Connect makes it easy to quickly start a connector and integrate real-time data either into Kafka from a source connector or out of Kafka into a sink connector of some kind. Ypu can learn more about Kafka connect and its uses cases in this blog post by Stephane Derosiaux

Kafka Connect Architecture

Figure 1 - Kafka Connect Architecture

What is Debezium?

Debezium is essentially a modern, distributed open source change data capture (CDC) platform that supports the monitoring of a number of database systems.

Debezium is built on top of Apache Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. It turns your existing databases into event streams, so applications can see and respond immediately to each row-level change in the databases.

Debezium records the history of data changes in Kafka logs, from where your application consumes them. This makes it possible for your application to easily consume all of the events correctly and completely. Even if your application stops unexpectedly, it will not miss anything: when the application restarts, it will resume consuming the events where it left off. As the data in the database changes, you will see the resulting change in an event stream.

debezium-architecture

Figure 2: Debezium in action. Source

Why Debezium?

Thanks to Kafka Connect and Debezium, change data capture, or CDC is now a common pattern that allows you to expose database changes as events into Kafka. As your database has a change occur you can track that directly in Kafka.

As illustrated in figure 2, one method of achieving this is by capturing the changelogs of a database upstream in either a Postgres or MySql database using the Debezium Kafka connectors. The changelog itself can be stored in Kafka, where a series of deployed programs are able to transform, aggregate, and join the data together. The processed data can then be streamed out to a sink database such as ElasticSearch or a data warehouse.

Debezium Use Cases

According to Gunnar Morling from Redhat, tech lead for Debezium, CDC means "liberation for your data". He goes into more detail on the plans for Debezium going forward in this talk.

Ultimately, Debezium lets you track data changes, replicate data, update caches, sync data between microservices, and create audit logs among much more.

Tutorial

As an example, we are going to look at utilizing Debezium in Conduktor and accomplishing this with Docker. We will carry out the following four steps.

  • Step 1: Start Apache Kafka, Kafka Connect, and Debezium with Docker
  • Step 2: Open Conduktor
  • Step 3: Add MYSQL Debezium connector in Conduktor
  • Step 4: View all created topics in Conduktor

Prerequisites

Before we can carry out the above we need to have some prerequisites completed as shown below.

For a helpful video on installing conduktor, Docker, and other prerequisites please see this from one of our founders Stephane Maarek. Starting a local Apache Kafka Cluster on Conduktor in 3 minutes using Docker

Step 1: Start Apache Kafka, Kafka Connect, and Debezium with Docker

We want to use the docker-compose file below to start:

  • A Kafka broker instance
  • A Zookeeper instance
  • A Kafka Connect instance
  • A Mysql server
  • A Debezium connector for Mysql

i) Download the YAML file

To do this we will download the YAML file below and save it as docker-compose.yml.

YAML file


---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9999:9999"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: localhost

  connect:
    image: confluentinc/cp-kafka-connect:latest
    hostname: connect
    container_name: connect
    depends_on:
      - broker
    ports:
      - "8083:8083"
    command:
      - bash
      - -c
      - |
        confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
        confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
        confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest
        /etc/confluent/docker/run
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
      KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=connect -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=5555 -Dcom.sun.management.jmxremote.port=5555

  mysql:
    image: debezium/example-mysql:1.7
    hostname: mysql
    container_name: mysql
    depends_on:
      - broker
    environment:
      - MYSQL_ROOT_PASSWORD=debezium
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
    ports:
      - '3306:3306'


ii) Run docker compose

After saving the above YAML file as docker-compose.yml, open your command line interface or terminal and run the following command:

_docker-compose up -d_

Kafka Debezium Console Screenshot 1

iii) Check its running

We will just check that everything is up and running on docker we will enter the following command:

_Docker-compose ps_

Kafka Debezium Console Screenshot 2

Step 2: Opening Conduktor

Moving to the next step we will open up Conduktor and configure the cluster from our Docker compose file.

i) Open Conduktor and sign in Kafka Debezium Conduktor 3 ii) Configuring your Kafka Cluster

Enter the bootstrap server for the Kafka cluster as localhost:29092 as specified in the YAML file. The cluster name can be anything. Kafka Debezium Conduktor 4 Next, enter the URL for the instance of Kafka connect as http://localhost:8083 as specified in the YAML file. Kafka Debezium Conduktor 5 Click on the Save button to save the cluster configuration.

Then click on the cluster itself to connect. Kafka Debezium Conduktor 6 You will come to the Overview tab inside Conduktor with a view similar to below Kafka Debezium Conduktor 7

Step 3: Adding a Debezium Connector for Kafka Connect in Conduktor

i) Kafka Connect Tab

After configuring our cluster we want to add our Debezium connector in Conduktor.

To do this we will go to the Kafka Connect tab as shown below. Kafka Debezium Conduktor 8 ii) Creating a connector in Conduktor

We now have two options to add a connector we can

  • Option 1:Using the connect configuration wizard in Conduktor
  • Option 2: Import the connector in JSON format directly

Option 1: Using the connect configuration wizard we will click on io.debezium.connector.mysql.MySqlConnector:1.7.1.Final and enter the information required in each of the connector wizard tabs. Kafka Debezium Conduktor 9 Kafka Debezium Conduktor 10 Leave the default values for wizard tabs, including Transforms, Predicates, Error Handling, and Topic Creation. When the MySQL tab shows up, provide the following information.

Field Value
Hostname mysql
Port 3306
User debezium
Password dbz
Namespace dbserver1

Kafka Debezium Conduktor 11

In the History Storage tab, enter the following information.

Field Value
Kafka broker addresses broker:9092
Database history topic name schema-changes.inventory

Kafka Debezium Conduktor 12 Leave Events and Custom Config tabs with the defaults. Finally, review the connector configurations before applying them to the cluster. Kafka Debezium Conduktor 13

Option 2: Using the connector configuration in JSON format, we will copy and paste it directly into Conduktor. Kafka Debezium Conduktor 14 Kafka Debezium Conduktor 15 From this screen onwards, continue the wizard with the default values in the tabs.

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "6400",
    "database.server.name": "dbserver1",
    "database.include.list": "inventory",
    "database.history.kafka.bootstrap.servers": "broker:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
  }
}



iii) View Connector running

We can now see our Debezium MySql connector with its current status and all topics. Initially, it might take a few seconds for the topics to get created. Kafka Debezium Conduktor 16 Step 4: View all created topics in Conduktor

Open the Topics tab in Conduktor to see all of the newly created topics that will show the change data capture as part of the debezium connector. Kafka Debezium Conduktor 17 After we have added the Debezium MySQL connector in Conduktor we can see it starts monitoring the database for data change events, in this case, it is a database for inventory with specific topics related to different tables.

The events are written to the following topics with the dbserver1 prefix (the name of the connector):

  • dbserver1: The schema change topic to which all of the Data definition language (DDL) statements are written.
  • dbserver1.inventory.products: Captures change events for the products table in the inventory database.
  • dbserver1.inventory.products_on_hand: Captures change events for the products_on_hand table in the inventory database.
  • dbserver1.inventory.customers: Captures change events for the customers table in the inventory database.
  • dbserver1.inventory.orders: Captures change events for the orders table in the inventory database.

We can also look further into one of the topics by opening a consumer directly in Conduktor. For this example, we will explore the dbserver1.inventory.addresses topic.

To do this click on “Consumer” on the top right corner to create a new consumer. Kafka Debezium Conduktor 18 In the top left corner of this tab click on “Pick a Topic to inspect its data” and then choose db.server1.inventory.addresses.

Kafka Debezium Conduktor 19

We will then choose “Start from The Beginning” and click on “Start” on the bottom right. This will show the different types of change events associated with addresses as follows.

Kafka Debezium Conduktor 20

Conclusion

We have now run an Apache Kafka broker, a Zookeeper broker, Kafka Connect, MySql, and Debezium through docker and created a debezium connector directly through Conduktor and seen the changes illustrated in a consumer.

As shown in this tutorial Conduktor makes it easier for you to manage your instances of Kafka Connect, add and edit them and when using Debezium you can keep track of all topics involved very easily and quickly and filter by time/date or offset.

References

  1. https://debezium.io/documentation/reference/stable/architecture.html
  2. https://subscription.packtpub.com/book/big_data_and_business_intelligence/9781787281349/7/ch07lvl1sec80/kafka-connect
  3. https://docs.conduktor.io/features/kafka-connect/features
  4. https://www.conduktor.io/why-use-kafka-connect
  5. https://www.sderosiaux.com/articles/2020/01/06/learnings-from-using-kafka-connect-debezium-postgresql/
  6. https://debezium.io/documentation/reference/stable/tutorial.html
  7. https://debezium.io/documentation/reference/stable/tutorial.html
  8. https://kafka.apache.org/documentation/#connectconfigs
  9. https://speakerdeck.com/gunnarmorling/practical-change-data-streaming-use-cases-with-apache-kafka-and-debezium-qcon-san-francisco-2019