All articles

Capturing MySQL database changes using Debezium, Kafka, and Conduktor

~ NaN minutes

Using Kafka Connect in Conduktor and specifically how to use Debezium to monitor the changes in a MySQL database.

Written by
Author's avatarLorcan
Published onFeb 14, 2022
Blog's image cover

    What is Conduktor?

    Conduktor is a native desktop application that is a graphical user interface for your Apache Kafka environment and supports the full Kafka ecosystem including Schema Registry, Kafka Connect, Kafka Streams, and ksqlDB.

    If you use Apache Kafka, Conduktor will increase productivity, reduce costs and ultimately, accelerate delivery of your critical data streaming projects. Download Conduktor now

    What is Kafka Connect?

    Kafka Connect is a tool in the Apache Kafka ecosystem which allows users to integrate their data sources between Apache Kafka and other data systems in a reliable way.

    Kafka Connect makes it easy to quickly start a connector and integrate real-time data either into Kafka from a source connector or out of Kafka into a sink connector of some kind. You can learn more about Kafka connect and its uses cases in this blog post by Stephane Derosiaux

    Kafka Connect Architecture

    Figure 1 - Kafka Connect Architecture

    What is Debezium?

    Debezium is essentially a modern, distributed open source change data capture (CDC) platform that supports the monitoring of a number of database systems.

    Debezium is built on top of Apache Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. It turns your existing databases into event streams, so applications can see and respond immediately to each row-level change in the databases.

    Debezium records the history of data changes in Kafka logs, from where your application consumes them. This makes it possible for your application to easily consume all of the events correctly and completely. Even if your application stops unexpectedly, it will not miss anything: when the application restarts, it will resume consuming the events where it left off. As the data in the database changes, you will see the resulting change in an event stream.

    debezium-architecture

    Figure 2: Debezium in action. Source

    Why Debezium?

    Thanks to Kafka Connect and Debezium, change data capture, or CDC is now a common pattern that allows you to expose database changes as events into Kafka. As your database has a change occur you can track that directly in Kafka.

    As illustrated in figure 2, one method of achieving this is by capturing the changelogs of a database upstream in either a Postgres or MySql database using the Debezium Kafka connectors. The changelog itself can be stored in Kafka, where a series of deployed programs are able to transform, aggregate, and join the data together. The processed data can then be streamed out to a sink database such as ElasticSearch or a data warehouse.

    Debezium Use Cases

    According to Gunnar Morling from Redhat, tech lead for Debezium, CDC means "liberation for your data". He goes into more detail on the plans for Debezium going forward in this talk.

    Ultimately, Debezium lets you track data changes, replicate data, update caches, sync data between microservices, and create audit logs among much more.

    Tutorial

    As an example, we are going to look at utilizing Debezium in Conduktor and accomplishing this with Docker. We will carry out the following four steps.

    • Step 1: Start Apache Kafka, Kafka Connect, and Debezium with Docker
    • Step 2: Open Conduktor
    • Step 3: Add MYSQL Debezium connector in Conduktor
    • Step 4: View all created topics in Conduktor

    Prerequisites

    Before we can carry out the above we need to have some prerequisites completed as shown below.

    For a helpful video on installing conduktor, Docker, and other prerequisites please see this from one of our founders Stephane Maarek. Starting a local Apache Kafka Cluster on Conduktor in 3 minutes using Docker

    Step 1: Start Apache Kafka, Kafka Connect, and Debezium with Docker

    We want to use the docker-compose file below to start:

    • A Kafka broker instance
    • A Zookeeper instance
    • A Kafka Connect instance
    • A Mysql server
    • A Debezium connector for Mysql

    i) Download the YAML file

    To do this we will download the YAML file below and save it as docker-compose.yml.

    YAML file

    
    ---
    version: '2'
    
    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:6.0.0
        hostname: zookeeper
        container_name: zookeeper
        ports:
          - "2181:2181"
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
    
      broker:
        image: confluentinc/cp-kafka:6.0.0
        hostname: broker
        container_name: broker
        depends_on:
          - zookeeper
        ports:
          - "29092:29092"
          - "9999:9999"
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_JMX_PORT: 9999
          KAFKA_JMX_HOSTNAME: localhost
    
      connect:
        image: confluentinc/cp-kafka-connect:latest
        hostname: connect
        container_name: connect
        depends_on:
          - broker
        ports:
          - "8083:8083"
        command:
          - bash
          - -c
          - |
            confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
            confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
            confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest
            /etc/confluent/docker/run
        environment:
          CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
          CONNECT_REST_ADVERTISED_HOST_NAME: connect
          CONNECT_REST_PORT: 8083
          CONNECT_GROUP_ID: compose-connect-group
          CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
          CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
          CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
          CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
          CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
          CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
          CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
          CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
          CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
          CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
          CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
          KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=connect -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=5555 -Dcom.sun.management.jmxremote.port=5555
    
      mysql:
        image: debezium/example-mysql:1.7
        hostname: mysql
        container_name: mysql
        depends_on:
          - broker
        environment:
          - MYSQL_ROOT_PASSWORD=debezium
          - MYSQL_USER=mysqluser
          - MYSQL_PASSWORD=mysqlpw
        ports:
          - '3306:3306'
    


    ii) Run docker compose

    After saving the above YAML file as docker-compose.yml, open your command line interface or terminal and run the following command:

    _docker-compose up -d_
    

    Kafka Debezium Console Screenshot 1

    iii) Check its running

    We will just check that everything is up and running on docker we will enter the following command:

    _Docker-compose ps_
    

    Kafka Debezium Console Screenshot 2

    Step 2: Opening Conduktor

    Moving to the next step we will open up Conduktor and configure the cluster from our Docker compose file.

    i) Open Conduktor and sign in Kafka Debezium Conduktor 3 ii) Configuring your Kafka Cluster

    Enter the bootstrap server for the Kafka cluster as localhost:29092 as specified in the YAML file. The cluster name can be anything. Kafka Debezium Conduktor 4 Next, enter the URL for the instance of Kafka connect as http://localhost:8083 as specified in the YAML file. Kafka Debezium Conduktor 5 Click on the Save button to save the cluster configuration.

    Then click on the cluster itself to connect. Kafka Debezium Conduktor 6 You will come to the Overview tab inside Conduktor with a view similar to below Kafka Debezium Conduktor 7

    Step 3: Adding a Debezium Connector for Kafka Connect in Conduktor

    i) Kafka Connect Tab

    After configuring our cluster we want to add our Debezium connector in Conduktor.

    To do this we will go to the Kafka Connect tab as shown below. Kafka Debezium Conduktor 8 ii) Creating a connector in Conduktor

    We now have two options to add a connector we can

    • Option 1:Using the connect configuration wizard in Conduktor
    • Option 2: Import the connector in JSON format directly

    Option 1: Using the connect configuration wizard we will click on io.debezium.connector.mysql.MySqlConnector:1.7.1.Final and enter the information required in each of the connector wizard tabs. Kafka Debezium Conduktor 9 Kafka Debezium Conduktor 10 Leave the default values for wizard tabs, including Transforms, Predicates, Error Handling, and Topic Creation. When the MySQL tab shows up, provide the following information.

    Field Value
    Hostname mysql
    Port 3306
    User debezium
    Password dbz
    Namespace dbserver1

    Kafka Debezium Conduktor 11

    In the History Storage tab, enter the following information.

    Field Value
    Kafka broker addresses broker:9092
    Database history topic name schema-changes.inventory

    Kafka Debezium Conduktor 12 Leave Events and Custom Config tabs with the defaults. Finally, review the connector configurations before applying them to the cluster. Kafka Debezium Conduktor 13

    Option 2: Using the connector configuration in JSON format, we will copy and paste it directly into Conduktor. Kafka Debezium Conduktor 14 Kafka Debezium Conduktor 15 From this screen onwards, continue the wizard with the default values in the tabs.

    {
      "name": "inventory-connector",
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "6400",
        "database.server.name": "dbserver1",
        "database.include.list": "inventory",
        "database.history.kafka.bootstrap.servers": "broker:9092",
        "database.history.kafka.topic": "schema-changes.inventory"
      }
    }
    
    


    iii) View Connector running

    We can now see our Debezium MySql connector with its current status and all topics. Initially, it might take a few seconds for the topics to get created. Kafka Debezium Conduktor 16 Step 4: View all created topics in Conduktor

    Open the Topics tab in Conduktor to see all of the newly created topics that will show the change data capture as part of the debezium connector. Kafka Debezium Conduktor 17 After we have added the Debezium MySQL connector in Conduktor we can see it starts monitoring the database for data change events, in this case, it is a database for inventory with specific topics related to different tables.

    The events are written to the following topics with the dbserver1 prefix (the name of the connector):

    • dbserver1: The schema change topic to which all of the Data definition language (DDL) statements are written.
    • dbserver1.inventory.products: Captures change events for the products table in the inventory database.
    • dbserver1.inventory.products_on_hand: Captures change events for the products_on_hand table in the inventory database.
    • dbserver1.inventory.customers: Captures change events for the customers table in the inventory database.
    • dbserver1.inventory.orders: Captures change events for the orders table in the inventory database.

    We can also look further into one of the topics by opening a consumer directly in Conduktor. For this example, we will explore the dbserver1.inventory.addresses topic.

    To do this click on “Consumer” on the top right corner to create a new consumer. Kafka Debezium Conduktor 18 In the top left corner of this tab click on “Pick a Topic to inspect its data” and then choose db.server1.inventory.addresses.

    Kafka Debezium Conduktor 19

    We will then choose “Start from The Beginning” and click on “Start” on the bottom right. This will show the different types of change events associated with addresses as follows.

    Kafka Debezium Conduktor 20

    Conclusion

    We have now run an Apache Kafka broker, a Zookeeper broker, Kafka Connect, MySql, and Debezium through docker and created a debezium connector directly through Conduktor and seen the changes illustrated in a consumer.

    As shown in this tutorial Conduktor makes it easier for you to manage your instances of Kafka Connect, add and edit them and when using Debezium you can keep track of all topics involved very easily and quickly and filter by time/date or offset.

    References

    1. https://debezium.io/documentation/reference/stable/architecture.html
    2. https://subscription.packtpub.com/book/big_data_and_business_intelligence/9781787281349/7/ch07lvl1sec80/kafka-connect
    3. https://docs.conduktor.io/features/kafka-connect/features
    4. https://www.conduktor.io/why-use-kafka-connect
    5. https://www.sderosiaux.com/articles/2020/01/06/learnings-from-using-kafka-connect-debezium-postgresql/
    6. https://debezium.io/documentation/reference/stable/tutorial.html
    7. https://debezium.io/documentation/reference/stable/tutorial.html
    8. https://kafka.apache.org/documentation/#connectconfigs
    9. https://speakerdeck.com/gunnarmorling/practical-change-data-streaming-use-cases-with-apache-kafka-and-debezium-qcon-san-francisco-2019