Kafka Connect 와 CDC 에 대해서

Kafka Connect 와 CDC 에 대해서
Photo by Alina Grubnyak / Unsplash

1.개요

CDC (Change Data Capture) 를 테스트 해보고자 MS SqlServer 내 테이블을 Debezium SqlServer Connector 로 Kafka Cluster 에 Topic 으로 변경 내역을 전송하는 "소스 커넥터" 를 구동 시키는 과정을 기록한다.

CDC는 Source 데이터의 변경을 감지하고 Kafka Cluster 에 이벤트 레코드로 표현하는 방법이며, CDC 를 활용하면 다양한 Source 에서 다양한 플랫폼으로 데이터를 일관성있게 전달할 수 있다. 이를 위해서 이 글에서는 Kafka Connect 를 활용하기로 했다.

Kafka Connect 를 가장 잘 표현하는 그림이 아래 그림인 듯 해서 가져와 봤다.

https://debezium.io/documentation/reference/stable/architecture.html

Kafka Connect 의 Connector에는 Source Connector와 Sink Connector 가 있는데 각각 Source Connector (데이터를 읽는 용도)와 Sink Connector (데이터를 쓰는 용도) 로 나누어 볼 수 있고 다양한 Connector 가 존재하고 있어서 다양한 소스 (RDB, DODB, Log 등) 에서 다양한 목적지(RDB, DODB, Hadoop, Elastic 등) 로 코드 없이 전달 할 수 있는 것이 장점이라 주로 CDC 를 하고자 하거나 ETL 이 필요한 상황에 많이 쓰이고 있다.

위 그림은 Debezium 의 기준으로만 일부의 소스와 싱크만 그려져 있는데 아래와 같이 정말 다양한 소스 커넥터와 싱크 커넥터가 준비되어 있다. (라이센스 확인 필요)

https://docs.tdengine.com/third-party/kafka/
TaskVibes: Daily Planner - Apps on Google Play
daily planner, todo list, check list, day planner

2.환경 구축

1) Kafka Cluster & Kafka Connect, Docker Compose

특이사항으로는 KafkaConnect 정의 부분에 command 를 보면 confluent-hub install 를 통해 필요한 connector 를 설치 하는 것을 볼 수 있고, 필요한 커넥터가 Source 인지 Sink 인지 잘 구분해서 설치하면 된​다.

Debezium SQL Server CDC Source Connector
Confluent, founded by the original creators of Apache Kafka®, delivers a complete execution of Kafka for the Enterprise, to help you run your business in real-time.
networks:
  kafka_network:

volumes:
  Kafka00:
    driver: local
  Kafka01:
    driver: local
  Kafka02:
    driver: local
    
services:
##Kafka 00    
  Kafka00Service:
    image: bitnami/kafka:3.5.1-debian-11-r44
    restart: unless-stopped
    container_name: Kafka00Container
    ports:
      - '10000:9094'
    environment:     
      - KAFKA_CFG_BROKER_ID=0
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka00Service:9092,EXTERNAL://127.0.0.1:10000
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 # https://cwiki.apache.org/confluence/display/KAFKA/KIP-115%3A+Enforce+offsets.topic.replication.factor+upon+__consumer_offsets+auto+topic+creation
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    networks:
      - kafka_network
    volumes:
      - "Kafka00:/bitnami/kafka"
##Kafka 01
  Kafka01Service:
    image: bitnami/kafka:3.5.1-debian-11-r44
    restart: always
    container_name: Kafka01Container
    ports:
      - '10001:9094'
    environment:
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka01Service:9092,EXTERNAL://127.0.0.1:10001
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 # https://cwiki.apache.org/confluence/display/KAFKA/KIP-115%3A+Enforce+offsets.topic.replication.factor+upon+__consumer_offsets+auto+topic+creation
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    networks:
      - kafka_network
    volumes:
      - "Kafka01:/bitnami/kafka"
##Kafka 02
  Kafka02Service:
    image: bitnami/kafka:3.5.1-debian-11-r44
    restart: always
    container_name: Kafka02Container
    ports:
      - '10002:9094'
    environment:
      - KAFKA_CFG_BROKER_ID=2
      - KAFKA_CFG_NODE_ID=2
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka02Service:9092,EXTERNAL://127.0.0.1:10002
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 # https://cwiki.apache.org/confluence/display/KAFKA/KIP-115%3A+Enforce+offsets.topic.replication.factor+upon+__consumer_offsets+auto+topic+creation
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    networks:
      - kafka_network
    volumes:
      - "Kafka02:/bitnami/kafka"

  KafkaConnect:
    image: confluentinc/cp-kafka-connect:7.4.3
    container_name: KafkaConnect
    depends_on:
      - Kafka00Service
      - Kafka01Service
      - Kafka02Service
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
    #  ---------------
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
    # If you want to use the Confluent Hub installer to d/l component, but make them available
    # when running this offline, spin up the stack once and then run : 
    #   docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
    volumes:
      - $PWD/data:/data
    # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
    command:
      - bash
      - -c
      - |
        echo "Installing Connector"
        confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:2.2.1
        confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
        confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.1.3
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        sleep infinity
    networks:
      - kafka_network
      
  KafkaWebUiService:
    image: provectuslabs/kafka-ui:latest
    restart: always
    container_name: KafkaWebUiContainer
    ports:
      - '8080:8080'
    environment:
      - KAFKA_CLUSTERS_0_NAME=Local-Kraft-Cluster
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
      - DYNAMIC_CONFIG_ENABLED=true
      - KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED=true
      - KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED=true
      #- KAFKA_CLUSTERS_0_METRICS_PORT=9999
    depends_on:
      - Kafka00Service
      - Kafka01Service
      - Kafka02Service
    networks:
      - kafka_network

docker-compose.yml

2) SqlServer, Docker Compose

azure-sql-edge 이미지도 사용 해봤는데, CLR 관련 SYS_PTRACE 사용에 어려움이 있어서 mcr.microsoft.com/mssql/server 를 사용하기로 했다. (MSSQL_AGENT_ENABLED 도 Kafka Connect 에 필수적인 요소이다.)

services:
  sql-server-db:
    image: mcr.microsoft.com/mssql/server:2022-latest 
    ports:
      - "1433:1433"
    environment:
      ACCEPT_EULA: "Y"
      SA_PASSWORD: "admin12#$"
      MSSQL_AGENT_ENABLED: "true"
      MSSQL_COLLATION: "LATIN1_GENERAL_100_CI_AS_SC_UTF8"
    cap_add:
      - SYS_PTRACE

mssql/server

3) SqlServer, Table 정의

test 라는 Database를 만들고 Members Schema 에 member Table을 아래와 같이 비슷하게 생성했다.

4) SqlServer, CDC 설정
-- test 라는 데이터베이스를 사용
USE test;
-- test 데이터베이스에 대해 CDC를 활성화
EXEC sys.sp_cdc_enable_db;

-- test 데이터베이스의 변경 추적 옵션을 설정
-- 변경 추적은 데이터베이스의 테이블에서 변경된 행을 식별하는 데 사용
-- 변경 추적 데이터는 3일 동안 보존되고, 자동으로 정리
ALTER DATABASE test
SET CHANGE_TRACKING = ON
(CHANGE_RETENTION = 3 DAYS, AUTO_CLEANUP = ON)

-- Members 스키마의 member 테이블에 대해 CDC를 활성화
EXEC sys.sp_cdc_enable_table
@source_schema = 'Members',
@source_name = 'member',
@role_name = 'sa';
Enable and Disable change data capture - SQL Server
Enable and Disable change data capture
TaskVibes: Daily Planner - Apps on Google Play
daily planner, todo list, check list, day planner

3.API 서버 수정사항

기존에 테스트 하던 API 프로젝트를 재활용하려고 Spring Boot, Webflux 기반의 API 를 기준으로 수정사항을 정리 한다.

1) Dependency

build.gradle 에 아래 드라이버를 추가한다

implementation 'io.r2dbc:r2dbc-mssql:1.0.2.RELEASE'
GitHub - r2dbc/r2dbc-mssql: R2DBC Driver for Microsoft SQL Server using TDS (Tabular Data Stream) Protocol
R2DBC Driver for Microsoft SQL Server using TDS (Tabular Data Stream) Protocol - GitHub - r2dbc/r2dbc-mssql: R2DBC Driver for Microsoft SQL Server using TDS (Tabular Data Stream) Protocol
2) application.properties
# MSSQL
spring.r2dbc.url=r2dbc:mssql://localhost:1433/test
spring.r2dbc.username=SA
spring.r2dbc.password=admin12#$
spring.r2dbc.pool.enabled=true
spring.r2dbc.pool.min-idle=30
spring.r2dbc.pool.max-size=30

application.properties

4.Connector 등록/조회/삭제

1) POST 등록

특이사항으로는 docker 로 mssql 을 올린 상황이라 database.encrypt 을 false 로 설정하였고 hostname 도 편의상 외부 IP를 설정했다.
database. 로 시작하는 속성은 mssql 설정에 맞게 설정하고 include.list 로 끝나는 속성은 데이터를 가져올 Schema 나 Table 을 설정할 수 있고 , 로 값을 추가 할 수 있다.
각 properties 에 대해서는 아래 링크해둔 Debezium Connector 문서를 보면 자세히 설명하고 있다.

{
    "name": "mssql-cdc-members-connector",
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max": "1",
        "database.encrypt": false,
        "database.hostname": "192.168.0.3",
        "database.port": "1433",
        "database.user": "SA",
        "database.password": "admin12#$",
        "database.names": "test",
        "schema.include.list": "Members",
        "table.include.list" : "Members.member",
        "database.history.kafka.bootstrap.servers":"Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092",
        "db.timezone": "Asia/Seoul",
        "topic.prefix": "cdc",
        "schema.history.internal.kafka.bootstrap.servers":"Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092",
        "schema.history.internal.kafka.topic":"schema-history-cdc"
    }
}

localhost:8083/connectors 에 POST (Content-Type: application/json) 로 위와 같은 설정을 전달하면 아래와 같은 결과값을 볼 수 있다.

Debezium connector for SQL Server :: Debezium Documentation
2) GET 조회

위에서 이름을 mssql-cdc-members-connector 로 설정을 했으니 GET 메소드로 localhost:8083/connectors/mssql-cdc-members-connector 로 호출하면 등록했던 Connector 의 설정을 조회 할 수 있다.

3) DELETE 삭제

당연하게도 삭제 또한 DELETE 메소드로 localhost:8083/connectors/mssql-cdc-members-connector 를 호출하면 connector 를 삭제 할 수 있다.

5.데이터 삽입 및 토픽 내 데이터 확인

1) SqlServer 데이터 삽입

부하 테스터기로 Member Create API 를 호출하여 임의의 데이터 159,091 개를 넣었다

2) Kafka Cluster 데이터 확인

Source connector 에서 Kafka Cluster 로 전달된 데이터 159,091 개를 확인했다.

3) 데이터 상세 확인

당연히도 Connector 등록시 설정한대로 cdc.test.Members.member 라는 Topic 이 생성되어 개수만큼 메시지가 들어가 있는 것을 확인 할 수 있었다.
해당 메시지의 Value 값을 확인해보면 데이터 구조가 내가 바라던 대로 데이터 오브젝트의 내용이 아닌 Kafka Connect 에서 Source 와 Sink 가 데이터를 주고받기 위한 규약으로 보이는 구조로 메시지가 전달 되었음을 알 수 있었고, 그대로 임의의 Consumer 를 만들어 쓰기에는 손이 많이 갈 듯 해서 가능하면 이미 만들어진 다양한 Sink Connector 를 연동하는 것이 좋을 것 같다는 생각이 든다.

{
	"schema": {
		"type": "struct",
		"fields": [
			{
				"type": "struct",
				"fields": [
					{
						"type": "int64",
						"optional": false,
						"field": "member_id"
					},
					{
						"type": "string",
						"optional": true,
						"field": "nickname"
					},
					{
						"type": "string",
						"optional": true,
						"field": "city"
					},
					{
						"type": "string",
						"optional": true,
						"field": "phone"
					},
					{
						"type": "int64",
						"optional": true,
						"name": "io.debezium.time.NanoTimestamp",
						"version": 1,
						"field": "modified_at"
					},
					{
						"type": "int64",
						"optional": true,
						"name": "io.debezium.time.NanoTimestamp",
						"version": 1,
						"field": "created_at"
					}
				],
				"optional": true,
				"name": "cdc.test.Members.member.Value",
				"field": "before"
			},
			{
				"type": "struct",
				"fields": [
					{
						"type": "int64",
						"optional": false,
						"field": "member_id"
					},
					{
						"type": "string",
						"optional": true,
						"field": "nickname"
					},
					{
						"type": "string",
						"optional": true,
						"field": "city"
					},
					{
						"type": "string",
						"optional": true,
						"field": "phone"
					},
					{
						"type": "int64",
						"optional": true,
						"name": "io.debezium.time.NanoTimestamp",
						"version": 1,
						"field": "modified_at"
					},
					{
						"type": "int64",
						"optional": true,
						"name": "io.debezium.time.NanoTimestamp",
						"version": 1,
						"field": "created_at"
					}
				],
				"optional": true,
				"name": "cdc.test.Members.member.Value",
				"field": "after"
			},
			{
				"type": "struct",
				"fields": [
					{
						"type": "string",
						"optional": false,
						"field": "version"
					},
					{
						"type": "string",
						"optional": false,
						"field": "connector"
					},
					{
						"type": "string",
						"optional": false,
						"field": "name"
					},
					{
						"type": "int64",
						"optional": false,
						"field": "ts_ms"
					},
					{
						"type": "string",
						"optional": true,
						"name": "io.debezium.data.Enum",
						"version": 1,
						"parameters": {
							"allowed": "true,last,false,incremental"
						},
						"default": "false",
						"field": "snapshot"
					},
					{
						"type": "string",
						"optional": false,
						"field": "db"
					},
					{
						"type": "string",
						"optional": true,
						"field": "sequence"
					},
					{
						"type": "string",
						"optional": false,
						"field": "schema"
					},
					{
						"type": "string",
						"optional": false,
						"field": "table"
					},
					{
						"type": "string",
						"optional": true,
						"field": "change_lsn"
					},
					{
						"type": "string",
						"optional": true,
						"field": "commit_lsn"
					},
					{
						"type": "int64",
						"optional": true,
						"field": "event_serial_no"
					}
				],
				"optional": false,
				"name": "io.debezium.connector.sqlserver.Source",
				"field": "source"
			},
			{
				"type": "string",
				"optional": false,
				"field": "op"
			},
			{
				"type": "int64",
				"optional": true,
				"field": "ts_ms"
			},
			{
				"type": "struct",
				"fields": [
					{
						"type": "string",
						"optional": false,
						"field": "id"
					},
					{
						"type": "int64",
						"optional": false,
						"field": "total_order"
					},
					{
						"type": "int64",
						"optional": false,
						"field": "data_collection_order"
					}
				],
				"optional": true,
				"name": "event.block",
				"version": 1,
				"field": "transaction"
			}
		],
		"optional": false,
		"name": "cdc.test.Members.member.Envelope",
		"version": 1
	},
	"payload": {
		"before": null,
		"after": {
			"member_id": 159091,
			"nickname": "조민준",
			"city": "인천광역시",
			"phone": "011-xxx-8667",
			"modified_at": 1702493422322207000,
			"created_at": 1702493422322207000
		},
		"source": {
			"version": "2.2.1.Final",
			"connector": "sqlserver",
			"name": "cdc",
			"ts_ms": 1702461481917,
			"snapshot": "last",
			"db": "test",
			"sequence": null,
			"schema": "Members",
			"table": "member",
			"change_lsn": null,
			"commit_lsn": null,
			"event_serial_no": null
		},
		"op": "r",
		"ts_ms": 1702461489302,
		"transaction": null
	}
}

메시지 Value 에 들어가 있는 데이터 구조

6.Create, Modify, Delete 에 대한 변경로그

CDC 라면 변경사항에 대한 데이터도 전달 받아야 할 것이고 그 데이터를 어떤방식으로 전달하는지 살펴봤다.
동일한 Row에 대해서 payload.before 와 payload.after 를 보면 생성, 변경, 삭제를 통해 어떤 변경사항이 있는지 확인할 수 있다.

1) Create Message

2) Modify Message

3) Delete Message