1.개요 CDC (Change Data Capture) 를 테스트 해보고자 MS SqlServer 내 테이블을 Debezium SqlServer Connector 로 Kafka Cluster 에 Topic 으로 변경 내역을 전송하는 "소스 커넥터" 를 구동 시키는 과정을 기록한다.
CDC는 Source 데이터의 변경을 감지하고 Kafka Cluster 에 이벤트 레코드로 표현하는 방법이며, CDC 를 활용하면 다양한 Source 에서 다양한 플랫폼으로 데이터를 일관성있게 전달할 수 있다. 이를 위해서 이 글에서는 Kafka Connect 를 활용하기로 했다.
Kafka Connect 를 가장 잘 표현하는 그림이 아래 그림인 듯 해서 가져와 봤다.
https://debezium.io/documentation/reference/stable/architecture.html Kafka Connect 의 Connector에는 Source Connector와 Sink Connector 가 있는데 각각 Source Connector (데이터를 읽는 용도)와 Sink Connector (데이터를 쓰는 용도) 로 나누어 볼 수 있고 다양한 Connector 가 존재하고 있어서 다양한 소스 (RDB, DODB, Log 등) 에서 다양한 목적지(RDB, DODB, Hadoop, Elastic 등) 로 코드 없이 전달 할 수 있는 것이 장점이라 주로 CDC 를 하고자 하거나 ETL 이 필요한 상황에 많이 쓰이고 있다.
위 그림은 Debezium 의 기준으로만 일부의 소스와 싱크만 그려져 있는데 아래와 같이 정말 다양한 소스 커넥터와 싱크 커넥터가 준비되어 있다. (라이센스 확인 필요)
https://docs.tdengine.com/third-party/kafka/ TaskVibes: Daily Planner - Apps on Google Play
daily planner, todo list, check list, day planner
2.환경 구축 1) Kafka Cluster & Kafka Connect, Docker Compose 특이사항으로는 KafkaConnect 정의 부분에 command 를 보면 confluent-hub install 를 통해 필요한 connector 를 설치 하는 것을 볼 수 있고, 필요한 커넥터가 Source 인지 Sink 인지 잘 구분해서 설치하면 된다.
Debezium SQL Server CDC Source Connector
Confluent, founded by the original creators of Apache Kafka®, delivers a complete execution of Kafka for the Enterprise, to help you run your business in real-time.
networks:
kafka_network:
volumes:
Kafka00:
driver: local
Kafka01:
driver: local
Kafka02:
driver: local
services:
##Kafka 00
Kafka00Service:
image: bitnami/kafka:3.5.1-debian-11-r44
restart: unless-stopped
container_name: Kafka00Container
ports:
- '10000:9094'
environment:
- KAFKA_CFG_BROKER_ID=0
- KAFKA_CFG_NODE_ID=0
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka00Service:9092,EXTERNAL://127.0.0.1:10000
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 # https://cwiki.apache.org/confluence/display/KAFKA/KIP-115%3A+Enforce+offsets.topic.replication.factor+upon+__consumer_offsets+auto+topic+creation
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
networks:
- kafka_network
volumes:
- "Kafka00:/bitnami/kafka"
##Kafka 01
Kafka01Service:
image: bitnami/kafka:3.5.1-debian-11-r44
restart: always
container_name: Kafka01Container
ports:
- '10001:9094'
environment:
- KAFKA_CFG_BROKER_ID=1
- KAFKA_CFG_NODE_ID=1
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka01Service:9092,EXTERNAL://127.0.0.1:10001
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 # https://cwiki.apache.org/confluence/display/KAFKA/KIP-115%3A+Enforce+offsets.topic.replication.factor+upon+__consumer_offsets+auto+topic+creation
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
networks:
- kafka_network
volumes:
- "Kafka01:/bitnami/kafka"
##Kafka 02
Kafka02Service:
image: bitnami/kafka:3.5.1-debian-11-r44
restart: always
container_name: Kafka02Container
ports:
- '10002:9094'
environment:
- KAFKA_CFG_BROKER_ID=2
- KAFKA_CFG_NODE_ID=2
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka02Service:9092,EXTERNAL://127.0.0.1:10002
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 # https://cwiki.apache.org/confluence/display/KAFKA/KIP-115%3A+Enforce+offsets.topic.replication.factor+upon+__consumer_offsets+auto+topic+creation
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
networks:
- kafka_network
volumes:
- "Kafka02:/bitnami/kafka"
KafkaConnect:
image: confluentinc/cp-kafka-connect:7.4.3
container_name: KafkaConnect
depends_on:
- Kafka00Service
- Kafka01Service
- Kafka02Service
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
# ---------------
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
# If you want to use the Confluent Hub installer to d/l component, but make them available
# when running this offline, spin up the stack once and then run :
# docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
volumes:
- $PWD/data:/data
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:2.2.1
confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.1.3
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
networks:
- kafka_network
KafkaWebUiService:
image: provectuslabs/kafka-ui:latest
restart: always
container_name: KafkaWebUiContainer
ports:
- '8080:8080'
environment:
- KAFKA_CLUSTERS_0_NAME=Local-Kraft-Cluster
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
- DYNAMIC_CONFIG_ENABLED=true
- KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED=true
- KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED=true
#- KAFKA_CLUSTERS_0_METRICS_PORT=9999
depends_on:
- Kafka00Service
- Kafka01Service
- Kafka02Service
networks:
- kafka_network
docker-compose.yml
2) SqlServer, Docker Compose azure-sql-edge 이미지도 사용 해봤는데, CLR 관련 SYS_PTRACE 사용에 어려움이 있어서 mcr.microsoft.com/mssql/server 를 사용하기로 했다. (MSSQL_AGENT_ENABLED 도 Kafka Connect 에 필수적인 요소이다.)
services:
sql-server-db:
image: mcr.microsoft.com/mssql/server:2022-latest
ports:
- "1433:1433"
environment:
ACCEPT_EULA: "Y"
SA_PASSWORD: "admin12#$"
MSSQL_AGENT_ENABLED: "true"
MSSQL_COLLATION: "LATIN1_GENERAL_100_CI_AS_SC_UTF8"
cap_add:
- SYS_PTRACE
mssql/server
3) SqlServer, Table 정의 test 라는 Database를 만들고 Members Schema 에 member Table을 아래와 같이 비슷하게 생성했다.
4) SqlServer, CDC 설정 -- test 라는 데이터베이스를 사용
USE test;
-- test 데이터베이스에 대해 CDC를 활성화
EXEC sys.sp_cdc_enable_db;
-- test 데이터베이스의 변경 추적 옵션을 설정
-- 변경 추적은 데이터베이스의 테이블에서 변경된 행을 식별하는 데 사용
-- 변경 추적 데이터는 3일 동안 보존되고, 자동으로 정리
ALTER DATABASE test
SET CHANGE_TRACKING = ON
(CHANGE_RETENTION = 3 DAYS, AUTO_CLEANUP = ON)
-- Members 스키마의 member 테이블에 대해 CDC를 활성화
EXEC sys.sp_cdc_enable_table
@source_schema = 'Members',
@source_name = 'member',
@role_name = 'sa';
Enable and Disable change data capture - SQL Server
Enable and Disable change data capture
TaskVibes: Daily Planner - Apps on Google Play
daily planner, todo list, check list, day planner
3.API 서버 수정사항 기존에 테스트 하던 API 프로젝트 를 재활용하려고 Spring Boot, Webflux 기반의 API 를 기준으로 수정사항을 정리 한다.
1) Dependency build.gradle 에 아래 드라이버를 추가한다
implementation 'io.r2dbc:r2dbc-mssql:1.0.2.RELEASE'
GitHub - r2dbc/r2dbc-mssql: R2DBC Driver for Microsoft SQL Server using TDS (Tabular Data Stream) Protocol
R2DBC Driver for Microsoft SQL Server using TDS (Tabular Data Stream) Protocol - GitHub - r2dbc/r2dbc-mssql: R2DBC Driver for Microsoft SQL Server using TDS (Tabular Data Stream) Protocol
2) application.properties # MSSQL
spring.r2dbc.url=r2dbc:mssql://localhost:1433/test
spring.r2dbc.username=SA
spring.r2dbc.password=admin12#$
spring.r2dbc.pool.enabled=true
spring.r2dbc.pool.min-idle=30
spring.r2dbc.pool.max-size=30
application.properties
4.Connector 등록/조회/삭제 1) POST 등록 특이사항으로는 docker 로 mssql 을 올린 상황이라 database.encrypt 을 false 로 설정하였고 hostname 도 편의상 외부 IP를 설정했다. database. 로 시작하는 속성은 mssql 설정에 맞게 설정하고 include.list 로 끝나는 속성은 데이터를 가져올 Schema 나 Table 을 설정할 수 있고 , 로 값을 추가 할 수 있다. 각 properties 에 대해서는 아래 링크해둔 Debezium Connector 문서를 보면 자세히 설명하고 있다.
{
"name": "mssql-cdc-members-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.encrypt": false,
"database.hostname": "192.168.0.3",
"database.port": "1433",
"database.user": "SA",
"database.password": "admin12#$",
"database.names": "test",
"schema.include.list": "Members",
"table.include.list" : "Members.member",
"database.history.kafka.bootstrap.servers":"Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092",
"db.timezone": "Asia/Seoul",
"topic.prefix": "cdc",
"schema.history.internal.kafka.bootstrap.servers":"Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092",
"schema.history.internal.kafka.topic":"schema-history-cdc"
}
}
localhost:8083/connectors 에 POST (Content-Type: application/json) 로 위와 같은 설정을 전달하면 아래와 같은 결과값을 볼 수 있다.
Debezium connector for SQL Server :: Debezium Documentation
2) GET 조회 위에서 이름을 mssql-cdc-members-connector 로 설정을 했으니 GET 메소드로 localhost:8083/connectors/mssql-cdc-members-connector 로 호출하면 등록했던 Connector 의 설정을 조회 할 수 있다.
3) DELETE 삭제 당연하게도 삭제 또한 DELETE 메소드로 localhost:8083/connectors/mssql-cdc-members-connector 를 호출하면 connector 를 삭제 할 수 있다.
5.데이터 삽입 및 토픽 내 데이터 확인 1) SqlServer 데이터 삽입 부하 테스터기로 Member Create API 를 호출하여 임의의 데이터 159,091 개를 넣었다
2) Kafka Cluster 데이터 확인 Source connector 에서 Kafka Cluster 로 전달된 데이터 159,091 개를 확인했다.
3) 데이터 상세 확인 당연히도 Connector 등록시 설정한대로 cdc.test.Members.member 라는 Topic 이 생성되어 개수만큼 메시지가 들어가 있는 것을 확인 할 수 있었다. 해당 메시지의 Value 값을 확인해보면 데이터 구조가 내가 바라던 대로 데이터 오브젝트의 내용이 아닌 Kafka Connect 에서 Source 와 Sink 가 데이터를 주고받기 위한 규약으로 보이는 구조로 메시지가 전달 되었음을 알 수 있었고, 그대로 임의의 Consumer 를 만들어 쓰기에는 손이 많이 갈 듯 해서 가능하면 이미 만들어진 다양한 Sink Connector 를 연동하는 것이 좋을 것 같다는 생각이 든다.
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "member_id"
},
{
"type": "string",
"optional": true,
"field": "nickname"
},
{
"type": "string",
"optional": true,
"field": "city"
},
{
"type": "string",
"optional": true,
"field": "phone"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.NanoTimestamp",
"version": 1,
"field": "modified_at"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.NanoTimestamp",
"version": 1,
"field": "created_at"
}
],
"optional": true,
"name": "cdc.test.Members.member.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "member_id"
},
{
"type": "string",
"optional": true,
"field": "nickname"
},
{
"type": "string",
"optional": true,
"field": "city"
},
{
"type": "string",
"optional": true,
"field": "phone"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.NanoTimestamp",
"version": 1,
"field": "modified_at"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.NanoTimestamp",
"version": 1,
"field": "created_at"
}
],
"optional": true,
"name": "cdc.test.Members.member.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "change_lsn"
},
{
"type": "string",
"optional": true,
"field": "commit_lsn"
},
{
"type": "int64",
"optional": true,
"field": "event_serial_no"
}
],
"optional": false,
"name": "io.debezium.connector.sqlserver.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "cdc.test.Members.member.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"member_id": 159091,
"nickname": "조민준",
"city": "인천광역시",
"phone": "011-xxx-8667",
"modified_at": 1702493422322207000,
"created_at": 1702493422322207000
},
"source": {
"version": "2.2.1.Final",
"connector": "sqlserver",
"name": "cdc",
"ts_ms": 1702461481917,
"snapshot": "last",
"db": "test",
"sequence": null,
"schema": "Members",
"table": "member",
"change_lsn": null,
"commit_lsn": null,
"event_serial_no": null
},
"op": "r",
"ts_ms": 1702461489302,
"transaction": null
}
}
메시지 Value 에 들어가 있는 데이터 구조
6.Create, Modify, Delete 에 대한 변경로그 CDC 라면 변경사항에 대한 데이터도 전달 받아야 할 것이고 그 데이터를 어떤방식으로 전달하는지 살펴봤다. 동일한 Row에 대해서 payload.before 와 payload.after 를 보면 생성, 변경, 삭제를 통해 어떤 변경사항이 있는지 확인할 수 있다.
1) Create Message
2) Modify Message
3) Delete Message