Kafka Streams, KTable-KStream Join

Kafka Streams, KTable-KStream Join
Photo by Marc Sendra Martorell / Unsplash

1. 개요

Stateful 한 처리를 해야 하는 상황에 Kafka 를 활용할 수 있는 방법으로 Kafka Streams 를 사용해보기로 했다.

Stateful, Stateless 하다는 표현이 있는데 여기서 Stateless 하다는 것은 다른 데이터 없이 하나의 이벤트나 데이터로 해당 프로세스를 마감 가능하다는 의미이고, 반대로 Stateful 하다는 것은 이전의 여러 상태의 값들을 저장하고 있다가 이벤트나 데이터가 들어왔을때 활용하는 경우를 이야기 하는 것으로 이해했다.

다양한 유즈케이스가 있겠지만, 가령 여러 사용자가 쇼핑몰에 들어와서 이런 저런 상품을 둘러보고 특정 상품을 구매 하는 경험(둘러본 관심상품과 최종 구매상품)을 상태로서 저장하고 관리 한다면, 사용자가 상품 상세페이지를 둘러보는 행위가 되는 Stream 과 합쳐졌을때, 목적이 광고든 추천이든 그 무엇이 되었든 다른 사용자의 경험을 고객에게 제안 할 수 있는 도구도 될 수 있고, 다른 다양한 목적으로도 사용할 수 있지 않을까 하는 생각도 해본다. 물론 실시간 통계나 모니터링에서도 빛을 발할 것이라 생각한다.

이 글을 통해 남기고자 하는 경험의 과정을 간단히 정리하면 아래와 같다.

  • Producer 역할을 하는 사용자 가입 API 를 통하여 가입시 RDB Table과 Kafka Topic "member" 로 사용자 정보를 흘려 보낸다.
  • 게시물 작성 API 을 통해서도 DODB Collection 과 Kafka Topic "board" 로 게시물 정보를 보낸다.
  • 이 두 테이블 간의 접점은 member_id 이며 게시물 작성 정보가 들어오는 Stream (KStream) 을 member_id 키를 기준으로 또 다른 Stream (KTable) 인 Member 와 JOIN 을 하여 게시물 작성 정보에 구체적인 사용자 정보를 추가해서 새로운 Topic으로 발행을 할 것이다.
  • 마지막으로 이 새로운 Topic 을 또 RDB 로 전달하여 유실되는 데이터는 없었는 지 최종 확인을 하려고 한다.

2.공통 설정

1) Dependency

Kafka Streams 를 사용하기 위한 추가적인 dependency 를 설정하기 위해 org.apache.kafka:kafka-streams 를 추가하여 사용하기로 한다.

implementation 'org.apache.kafka:kafka-streams:3.5.1'
implementation 'io.projectreactor.kafka:reactor-kafka:1.3.19'
implementation 'org.springframework.kafka:spring-kafka:3.0.9'

build.gradle

2) 메시지 유실에 대해서

부하 테스터기로 랜덤 데이터를 RDB 에 넣고 Kafka Topic 에도 전송을 하고 있는데 Kafka Topic 쪽에 메시지 유실이 발생해서 아래와 같이  Producer 속성값을 조정했다.
acks 의 경우 -1 로 설정했다가 브로커의 응답을 모두 기다리는 설정 all 로 변경했다. 로컬 기준(클러스터 3대)의 벤치마크시 성능에도 지연이 발생 한다던지 하는 영향이 전혀 없어보인다. 그 외에 다양한 속성값이 있는데 이는 따로 정리 해보기로 한다. 아마도 환경에 맞추어 따로 튜닝을 해야하는 사안으로 보인다.

spring.kafka.bootstrap-servers=127.0.0.1:10000,127.0.0.1:10001,127.0.0.1:10002
# 메시지 헤더 내 __typeid__ 제거
spring.kafka.producer.properties.spring.json.add.type.headers=false
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
# 메시지 발송 후 확인 
spring.kafka.producer.acks=all
# 메시지 발송 실패 후 재시도, -1 로 설정하고 재시도가 길어지면 순서가 바뀔 수 있다
spring.kafka.producer.retries=8
# 메시지 압축
spring.kafka.producer.compression-type=snappy

application.properties, producer 설정

3) TypeId 헤더에 대하여

스프링 카프카에서 자동으로 추가 되어 카프카 토픽으로 발송 되며, 메시지를 받는 쪽에서는 이 헤더를 통헤 Payload 를 적절한 타입으로 변환하게 하는 목적이다.
이 헤더가 붙으면 발송서버의 패키지의 클래스를 참조하는 관계로 자바 라이브러리를 사용하는 메시지 컨슈머 입장에서는 해당 패키지 경로에 클래스가 없으면 이를 계속 시도하는 경향이 있어서 발송 자체에서 제거 하도록 했다.

spring.kafka.producer.properties.spring.json.add.type.headers=false

application.properties, producer 단에서 __typeid__ 헤더 제거 설정

메시지 헤더에 __TypeId__ 라는 키값을 가지는 해당 Value 메시지를 만든 클래스 경로가 붙는다

1. Member Producer

1) member 적재 프로세스

단순히 RDB, member 테이블에 가입정보를 저장하고 나서 member 라는 topic 에 메시지를 보내는 프로세스이다. 여기서 차후에 Stream 간의 JOIN 을 하는 기준이 레코드의 키 값이 되므로 null 이 되서는 안되고 명확하게 키 값을 정의해서 전달해야한다. 여기서는 키값을 RDB 에 적재된 member_id 값을 입력했다.

	@Override
	public Mono<MemberDTO.Create> createItem(MemberDTO.Create dto) {
		LocalDateTime now = LocalDateTime.now();
		dto.setModifiedAt(now);
		dto.setCreatedAt(now);

		return memberRepository.save(memberMapper.itemToEntity(dto))
			.publishOn(Schedulers.boundedElastic())
			.doOnSuccess(
				item -> kafkaSender.send("member", String.valueOf(item.getMemberId()), memberMapper.toCreateDTO(item))
					.doOnError(e -> log.error("kafka send error: {}", e.getMessage()))
					.subscribe()
			)
			.map(memberMapper::toCreateDTO);
	}

Member, Create Item

2) RDB 적재

부하 테스터기로 가짜 이름과 도시 전화번호를 생성하였고 총 442,128 건 RDB 테이블에 적재되었다.

Member RDB
3) Kafka Topic, member 적재

RDB 테이블의 member_id 값을 키 값으로 가지는 Kafka Topic (member)에도 유실없이 총 442,128 건 적재되었다.

Member Topic
4) Kafka Topic, member 적재 상세

member 의 payload 는 아래와 같고 payload의 member_id 가 79 이면서 동시에 레코드의 키값 또한 79 인것을 볼 수 있다.

2. Board Producer

1) board 적재 프로세스

DODB, board 컬렉션에 게시물 작성정보를 저장하고 나서 board 라는 topic 에 메시지를 보내는 프로세스이다. 키값을 컬렉션에 적재된 _id 값을 입력했다.

@Override
	public Mono<BoardDocumentDTO.Create> createItem(BoardDocumentDTO.Create dto) {
		long key = System.currentTimeMillis();
		LocalDateTime now = LocalDateTime.now();
		// dto.setScore(counter);
		dto.setModifiedAt(now);
		dto.setCreatedAt(now);
		dto.setCreatedTs(key);
		return boardRepository.save(boardMapper.itemToEntity(dto))
			.publishOn(Schedulers.boundedElastic())
			.doOnSuccess(board -> {
				dto.setId(board.getId());

				kafkaSender.send("board", board.getId(), dto)
					.timeout(java.time.Duration.ofMillis(1600), Mono.error(new RuntimeException("Kafka Send Timeout")))
					.subscribe(result -> {
						if (result.exception() != null) {
							log.error("Kafka Send Error", result.exception());
						}
					});

			}).map(boardMapper::toCreateDTO);
	}
2) DODB 적재

부하 테스터기로 임의의 제목과 본문 그리고 member_id 를 RDB에 넣은 member 테이블의 크기 범위 내에서 랜덤으로 생성하여 다수의 사용자가 게시물을 작성한것으로 구성하였고 총 447,017 건 DODB 테이블에 적재되었다.

Board DODB
3) Kafka Topic, board 적재

DODB 컬렉션의 _id 값을 키 값으로 가지는 Kafka Topic (board)에도 유실없이 총 447,017 건 적재되었다.

Board Topic
4) Kafka Topic, board 적재 상세

board 의 payload 는 아래와 같고 payload의 id 가 레코드의 키값과 동일한 것 또한 볼 수 있다.

3. KTable - KStream Join

1) KTable-KStream Join 프로세스

어플리케이션이 구동되면 Bean 으로 등록되어 자동으로 두 스트림을 조인하고 새로운 Topic 에 결과물을 발송하도록 했다. (두 토픽의 파티션수가 달라서 리파티셔닝이 발생했다. 가능하면 코파티셔닝을 고려해야한다.) 어떻게 토픽을 KTable 또는 KStream 으로 변환하고 직렬화/역직렬화 하는 지 그리고 Key 값이 다른 두 Stream 을 어떻게 Join 하는지는 주석으로 남겨두었다.

package com.example.consumer.config;

import java.util.Properties;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.support.serializer.JsonSerializer;

import com.example.consumer.dto.BoardDTO;
import com.example.consumer.dto.BoardDetailDTO;
import com.example.consumer.dto.MemberDTO;

import lombok.extern.slf4j.Slf4j;

@Configuration
@Slf4j
public class KafkaStreamsConfig {

	public static final String TABLE_TOPIC = "member";
	public static final String STREAM_TOPIC = "board";
	public static final String JOINED_TOPIC = "board-detail";
	@Value("${spring.kafka.bootstrap-servers}")
	public String BOOTSTRAP_SERVERS;
	@Value("${spring.kafka.streams.application-id}")
	public String APPLICATION_ID;

	@Bean
	public void initBoardDetailStream() {
		// Kafka Streams의 속성을 설정하는 객체
		Properties props = new Properties();
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
		// 문자열 데이터를 직렬화하고 역직렬화
		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		props.put(
			StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
			LogAndContinueExceptionHandler.class
		);

		// Kafka Streams의 토폴로지를 정의하는 객체
		StreamsBuilder builder = new StreamsBuilder();

		// Member 역직렬화 설정
		JsonDeserializer<MemberDTO.Create> memberDeserializer = new JsonDeserializer<>(MemberDTO.Create.class);
		memberDeserializer.addTrustedPackages("com.example.*");

		// "member" 토픽에서 문자열 데이터를 읽어서 KTable로 변환
		KTable<String, MemberDTO.Create> table = builder.table(
			TABLE_TOPIC,
			Consumed.with(
				Serdes.String(),
				Serdes.serdeFrom(new JsonSerializer<>(), memberDeserializer)
			)
		);

		// Board 역직렬화 설정
		JsonDeserializer<BoardDTO.Create> boardDeserializer = new JsonDeserializer<>(BoardDTO.Create.class);
		boardDeserializer.addTrustedPackages("com.example.*");

		// 스트림 토픽에서 문자열 데이터를 읽어서 KStream으로 변환
		KStream<String, BoardDTO.Create> stream = builder.stream(
			STREAM_TOPIC,
			Consumed.with(
				Serdes.String(),
				Serdes.serdeFrom(new JsonSerializer<>(), boardDeserializer)
			)
		);

		// KTable과 KStream을 inner join
		KStream<String, BoardDetailDTO.Create> joined = stream
			// board stream의 키를 memberId로 변경
			.selectKey((key, value) -> value.getMemberId().toString())
			// board stream과 member table을 member_id 키 값으로 조인
			.join(table, (streamValue, tableValue) -> {
				if (streamValue.getMemberId().equals(tableValue.getMemberId())) {
					return BoardDetailDTO.Create.builder()
						.boardId(streamValue.getId())
						.memberId(streamValue.getMemberId())
						.nickname(tableValue.getNickName())
						.city(tableValue.getCity())
						.phone(tableValue.getPhone())
						.title(streamValue.getTitle())
						.body(streamValue.getBody())
						.modifiedAt(streamValue.getModifiedAt())
						.createdAt(streamValue.getCreatedAt())
						.build();
				} else {
					return null;
				}
			});

		// 조인된 스트림을 BoardDetailDTO.Create로 직렬화해서 joined 토픽으로 전송
		joined.to(
			JOINED_TOPIC,
			Produced.with(
				Serdes.String(),
				new JsonSerde<>(BoardDetailDTO.Create.class)
			)
		);

		// 토폴로지를 빌드하여 Kafka Streams 객체 생성
		KafkaStreams streams = new KafkaStreams(builder.build(), props);

		// Kafka Streams를 실행
		streams.start();

		// 애플리케이션 종료 시 Kafka Streams를 정지
		Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
	}

}
2) Kafka Topic, board-detail 적재

위에 board 에 대한 설명을 보면 Kafka Topic "board" 의 메시지 수가 총 447,017 건이었고, 447,017 건의 KStream board 와 KTable member 를 조인한 결과인 board-detail 토픽의 개수 또한 447,017 건으로 유실없이 board 개수만큼 적재 되었다.

0
3) Kafka Topic, board-detail 적재 상세

아래는 Kafka Streams 로 member 와 board 를 join 한 결과이며 null 인 것들은 board-detail consumer 를 통해 채워서 RDB에 전달 해보고자 한다.

4. Board Detail, Consumer

1) Consumer 프로세스

단순히 board-detail Topic 을 구독하고 파티션값, 오프셋값을 채워 RDB에 전달하는 역할을 한다.

@Service
@Slf4j
public class BoardDetailService implements ApplicationRunner {

	private final ReactiveKafkaConsumerTemplate<String, BoardDetailDTO.Create> boardDetailTemplate;

	private final BoardDetailRepository boardDetailRepository;

	private final BoardDetailMapper boardDetailMapper;

	@Autowired
	public BoardDetailService(
		ReactiveKafkaConsumerTemplate<String, BoardDetailDTO.Create> getBoardDetailConsumerTemplate,
		BoardDetailRepository boardDetailRepository,
		BoardDetailMapper boardDetailMapper
	) {
		this.boardDetailTemplate = getBoardDetailConsumerTemplate;
		this.boardDetailRepository = boardDetailRepository;
		this.boardDetailMapper = boardDetailMapper;
	}

	@Override
	public void run(ApplicationArguments args) throws Exception {
		this.boardDetailTemplate
			.receive()
			.groupBy(m -> m.receiverOffset().topicPartition())
			.flatMap(partition -> partition.concatMap(msg -> {
				BoardDetailDTO.Create dto = msg.value();

				dto.setKafkaPartition(msg.partition());
				dto.setKafkaOffset(msg.offset());

				return this.boardDetailRepository.save(boardDetailMapper.createToEntity(dto))
					.publishOn(Schedulers.boundedElastic())
					.doOnSuccess(item -> {
						msg.receiverOffset().commit();
					});
			}))
			.subscribe();
	}

}
2) board_detail 적재

RDB에 Topic 에 존재하던 총 447,017 건이 그대로 적재 되었다.

이로서 두개의 스트림을 받아 조금 정적일 수 있는 Member 정보를 KTable 로 정의 하고 동적인 Board 정보를 KStream 으로 정의해서 두 개의 Stream 을 Join 하여 기본적인 Kafka Streams 의 사용예시를 작성해봤다.

전체의 과정을 거치며 다소 오해가 있을 수 있는 것이 Kafka Streams 가 마치 Kafka Cluster 에서 제공하는 서버의 일부분 처럼 느껴질 수 있는데, Kafka Streams 는 Java 기준이라면 JVM 에서 동작하는 일반적인 클라이언트 쪽의 어플리케이션 이라는 점이다.

또한 이 때문에 불안정하게 느껴질 수 있는데 Streams 에는 RocksDB 라는 Key Value Store 인 Persistent Store가 있어서 데이터를 디스크에 적재 할 수 있고 쿠버네티스 환경에서 서비스 컨테이너가 셧다운 되거나 삭제 후 재생성 되거나 혹은 정말 오랜만에 서버가 올라가도, Kafka 에 저장되어 있는 Chagelog Topic을 기준으로 다시 복구가 된다.
다만 이 Chagelog Topic 을 기록하는 것도 Streams 어플리케이션 이므로 오로지 1대의 Streams 어플리케이션을 유지하고 있다가 컨테이너가 죽거나 하는 문제가 생기면 연계된 프로세스에 변경되지 않은 데이터를 전달하게 되는 상황이 발생하므로 Streams 어플리케이션의 고가용성을 보장해야 한다.

Kafka Streams 어플리케이션의 고가용성에 대해서 잠깐 이야기 하자면 예를 들어 연령대 성별등 사용자 정보에 최근 조회한 상품 정보를 JOIN 해서 새로운 Topic "liking"을 작성하는 어플리케이션 A 가 있고, 이러한 "liking" 정보를 토대로 최근 30분간 연령대 성별 기준으로 어떤 상품이 주목받고 있는지를 판단해서 Redis 에 적재하는 어플리케이션 B 가 있다고 하자.
이때 어플리케이션 A 가 돌연 중단 되는 상황이 발생하면 어플리케이션 B는 계속 오래된 데이터를 바라보고 있을 것이고 서비스에 문제가 생길여지가 충분하다. 또한 A 는 다시 서버가 올라와도 ChangeLog 또한 중단 전까지만 기록되어 있는 상황이라 StateStore 는 복구 되지 않을 것이다.
B도 마찬가지지만 이를 위해서 A 어플리케이션을 여러 컨테이너로 증설해야 하고 아래와 같은 항목을 점검해야 한다.

  • A 어플리케이션의 ID 를 모든 컨테이너가 동일하게 가져야 한다
    : 사용자정보 토픽과 조회상품정보 토픽을 구독해서 가져올때 모든 어플리케이션 컨테이너의 서버가 동일한 Consumer Group 으로 묶여 있음을 정의하고 이에 따라 각 컨테이너 어플리케션은 서로 다른 파티션을 소비하게 될 것이다. (어플리케이션 ID는 StreamsConfig 의 APPLICATION_ID_CONFIG 을 의미한다)
  • A 어플리케이션은 컨테이너 환경에서 Persistent Volumes 을 공유해야 한다.
    : A, B 어플리케이션 서버의 컨테이너도 여러가지 상황으로 중단되고 삭제 될 수 있다. 따라서 새로운 이미지의 컨테이너가 올라가도 이전 컨테이너에서 사용하던 Persistent Volumes 로 지정된 Kafka Streams의 RocksDB 를 사용할 수 있다.

이렇게 A 어플리케이션을 A1, A2, A3 으로 고가용성을 보장한 상황에서 A2 서버가 중단되는 상황이라면 아래와 같이 동작하게 된다.

  • A2 가 소비하던 사용자정보, 조회상품정보 토픽의 파티션을 나누어 가지게 되고 ChangeLog 토픽을 통해서 A2가 StateStore 까지 A1, A3에게 재할당한다.

다시 A2 가 올라온다면 아래와 같이 동작한다.

  • A2 가 다시 컨슈머 그룹에 참여하고 A1, A3 는 A2에게 받은 파티션과 StateStore 를 분리하고 A2 는 ChangeLog 토픽에서 StateStore 를 복구한다.

따라서 고가용성을 보장하면 늘 최신의 데이터를 안정적으로 어플리케이션 B에게 전달 할 수 있는 상황을 만들게 된다.