Spring Boot, Reactor Kafka 에서의 순서보장과 중복방지에 대한 기록 (feat. redis)

작성한 글의 프로젝트 Webflux 환경에서 Reactor Kafka 기본 연동 에서 Consumer 가 더 고려해야 하는 상황에 대해서 정리를 해본다.

Kafka의 사용 케이스 중에 간단히 구현해 볼만한 예시로 아래 그림과 같은 프로세스를 구성해보기로 한다.

TaskVibes: ADHD Simple Manage - Apps on Google Play
ADHD 를 겪는 사람들을 위한 일정관리 앱

내용을 잠시 살펴보자면, 사용자 생성 API 를 호출해서 DB 사용자 테이블에 요청받은 사용자 정보를 저장하고, 이를 Kafka에 전달하고 동일한 Topic 을 구독하는 Consumer 는 메시지를 전달받아 또 다른 DB 테이블에 저장하여 Partition 별로 Offset 이 순서에 맞게 저장이 됐는지 DB 사용자 테이블과 비교하여 유실은 없는지 중복은 없는지를 살펴 보는 것으로 한다.

A. 순서보장에 대하여

Kafka의 Topic은 기본적으로 여러개의 Partition 을 구성할 수 있고 하나의 파티션 내에서는 당연히도 Offset 순서대로 저장이 되고 소비도 Offset 순서대로 된다.

문제는 일반적으로 처리량을 높이기 위해 Partition 수를 늘리고 그 수만큼 Consumer Group 내 Consumer를 배치하게 되는 데, 이렇게 되는 경우에는 여러개의 Partition 에서 데이터를 소비하게 되니 메시지 순서가 보장이 안되는 상황이 된다.

따라서 이러한 경우에는 순서보장이 필요한 메시지는 파티션 수를 1개로 지정을 하거나 메시지를 발송할때 파티션을 지정해서 보내서 하나의 파티션을 사용하도록 해서 순서를 보장하도록 한다.

TaskVibes: ADHD Simple Manage - Apps on Google Play
ADHD 를 겪는 사람들을 위한 일정관리 앱

1. Producer 구현

해당 프로젝트에서는 회원가입시 10대, 20대 가입자의 경우 순서가 보장되어야 한다고 가정을 해보도록 한다. 다소 억지 스럽지만 이를테면 10대 가입자 중 10번째까지, 20번째까지 그룹별로 실시간으로 알림 메시지를 다르게 보내야 하는 상황을 생각해보면 되겠다.

Producer 가 되는 사용자 가입 API 부분은 아래 내용과 같이 지난 글에 작성했던 코드를 아래 내용에 따라 수정 해준다.

1) Service

메시지 발송시에 partition 과 key 를 지정하도록 수정한다. (key만 지정하면 partition이 바르게 나누어지지 않아서 partition을 수동으로 지정했다)

Mono<SenderResult<Void>> KafkaSendMessage(String topic, int partition, String key, MemberDTO.Item message);

service/MemberService.java

public Mono<SenderResult<Void>> KafkaSendMessage(String topic, int partition, String key, MemberDTO.Item member) {
	return kafkaSender.send(topic, partition, key, member);
}

service/impl/MemberServiceDB.java

2) Controller

메시지 발송시에 partition 값을 10대는 0, 20대는 1, 그 이후는 2로 지정해서 보내서 0과 1 partition 은 순서대로 저장 되도록 한다.

    @PostMapping("/member")
    public Mono<ApiResponse> create(
            @Validated
            @RequestBody
            MemberDTO.Create memberDTO
    ) {

        Mono<MemberDTO.Item> dto = memberService.create(memberDTO);
        return dto.doOnSuccess(member -> {
                    log.info("db insert success");

                    memberService.KafkaSendMessage(
                            memberProducerTopic,
                            // 파티션 분할
                            member.getAge() < 20 ? 0 : member.getAge() < 30 ? 1 : 2,
                            // 나이:InsertId 를 키로 사용
                            String.format("%d:%d", member.getAge(), member.getId()),
                            member
                    )
                    .publishOn(Schedulers.boundedElastic())
                    .doOnSuccess(result -> {
                        RecordMetadata meta = result.recordMetadata();
                        log.info("kafka send success : topic {} / {}", meta.topic(), meta.offset());
                    })
                    .doOnError(error -> {
                        log.info("kafka send error");
                        log.info(error.toString());
                    })
                    .subscribe();
                })
                .doOnError(error -> {
                    log.info("db insert error");
                    log.info(error.toString());
                })
                .map(member -> {
                    log.info("member reponse");
                    return ApiResponse.builder()
                            .code(200)
                            .message("ok")
                            .data(member)
                            .build();
                });

    }

controller/MemberController.java

3) 데이터 삽입

데이터는 locust 로 아래와 같이 1분간 넣어 주도록 한다.

locust -H http://127.0.0.1:8888 -u 100 -r 20 -t 60s --autostart -f tests/locust_write.py
from locust import FastHttpUser, task, between
import random
from faker import Faker

fake = Faker()

class MyTest(FastHttpUser):
    wait_time = between(0, 1)
    @task
    def index(self):
        r = random.randint(10, 99)
        self.client.post("/v1/api/member", json={
            "name": fake.name(),
            "profile_url":f"https://pbs.twimg.com/profile_images/{r}/p7AMiPpV_400x400.jpg",
            "age": r,
        })

locust_write.py

4) 데이터 확인
Partition 별 메시지 수 및 전체 메시지 수

Key 값이 "나이:Id" 값이니 각 3개의 Partition 에 올바른 연령대의 가입자가 DB 내 Insert Id 값에 맞추어 순차적으로 쌓였는지 확인

0 번째 Partition 내 키값 확인
1 번째 Partition 내 키값 확인
TaskVibes: ADHD Simple Manage - Apps on Google Play
ADHD 를 겪는 사람들을 위한 일정관리 앱

2. Consumer 구현

해당 Consumer 에서는 Partition 별로 묶어서 Record를 Offset 순서대로 처리 할 수 있도록 하고 Record를 받아 Copy 테이블에 Partition 및 Offset 와 함께 저장하도록 한다.
이때 Consumer 가 어떤 이유로 동작을 못하는 상황에 대비하여, Record 를 받아 DB 에 저장을 하는 상황이나 외부 API 를 호출 하는 상황 등을 하는 프로세스가 "실행완료" 됐는지 확인 후 수동으로 Record 에 대한 Commit 처리를 해주도록 해서 외부 연동 프로세스가 유실 되지 않도록 한다.

1) Entity
package com.example.consumer.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Column;
import org.springframework.data.relational.core.mapping.Table;

import java.time.LocalDateTime;

@Table("copy")
@Builder
@Getter
@ToString
@AllArgsConstructor
public class Copy {
    @Id
    @Column("id")
    private Integer id;
    @Column("kafka_offset")
    private Integer kafkaOffset;
    @Column("kafka_partition")
    private Integer kafkaPartition;
    @Column("origin_id")
    private Integer originId;
    @Column("name")
    private String name;
    @Column("profile_url")
    private String profileUrl;
    @Column("age")
    private Integer age;
    @CreatedDate
    @Column("created_at")
    private LocalDateTime createdAt;
}

entity/Copy.java

2) DTO
package com.example.consumer.dto;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import org.hibernate.validator.constraints.Length;

import java.time.LocalDateTime;

public class CopyDTO {

    @Builder
    @Setter
    @Getter
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Create {
        @NotNull
        @JsonProperty("origin_id")
        private Integer originId;

        @NotNull
        @JsonProperty("kafka_offset")
        private Integer kafkaOffset;

        @NotNull
        @JsonProperty("kafka_partition")
        private Integer kafkaPartition;

        @NotEmpty
        @Length(min = 5, max = 128)
        @JsonProperty("name")
        private String name;

        @NotEmpty
        @Length(min = 13, max = 1024)
        @JsonProperty("profile_url")
        private String profileUrl;

        @Min(10)
        @Max(90)
        @JsonProperty("age")
        private Integer age;

        @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
        @JsonProperty("created_at")
        private LocalDateTime createdAt;
    }

    @Builder
    @Setter
    @Getter
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Item {
        @JsonProperty("id")
        private Integer id;
        @JsonProperty("kafka_offset")
        private Integer kafkaOffset;
        @JsonProperty("kafka_partition")
        private Integer kafkaPartition;
        @JsonProperty("origin_id")
        private Integer originId;
        @JsonProperty("name")
        private String name;
        @JsonProperty("profile_url")
        private String profileUrl;
        @JsonProperty("age")
        private Integer age;
        @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
        @JsonProperty("created_at")
        private LocalDateTime createdAt;
    }

}

dto/CopyDTO.java

3) Mapper
package com.example.consumer.mapper;

import com.example.consumer.dto.CopyDTO;
import com.example.consumer.entity.Copy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class CopyMapper {
    public CopyDTO.Item ToDTOItem(Copy entity) {
        return CopyDTO.Item.builder()
                .id(entity.getId())
                .kafkaOffset(entity.getKafkaOffset())
                .kafkaPartition(entity.getKafkaPartition())
                .originId(entity.getOriginId())
                .name(entity.getName())
                .age(entity.getAge())
                .profileUrl(entity.getProfileUrl())
                .createdAt(entity.getCreatedAt())
                .build();
    }


    public Copy ToCreateEntity(CopyDTO.Create dto) {
        return Copy.builder()
                .kafkaOffset(dto.getKafkaOffset())
                .kafkaPartition(dto.getKafkaPartition())
                .originId(dto.getOriginId())
                .name(dto.getName())
                .profileUrl(dto.getProfileUrl())
                .age(dto.getAge())
                .createdAt(dto.getCreatedAt())
                .build();
    }

}

mapper/CopyMapper.java

4) Repository
package com.example.consumer.repository;

import com.example.consumer.entity.Copy;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;

public interface CopyRepository extends ReactiveCrudRepository<Copy, Integer> {
}

repository/CopyRepository.java

5) Service
package com.example.consumer.service;


import com.example.consumer.dto.CopyDTO;
import reactor.core.publisher.Mono;

public interface CopyService {
    Mono<CopyDTO.Item> create(CopyDTO.Create dto);
}

service/CopyService.java

package com.example.consumer.service.impl;

import com.example.consumer.dto.CopyDTO;
import com.example.consumer.mapper.CopyMapper;
import com.example.consumer.repository.CopyRepository;
import com.example.consumer.service.CopyService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

import java.time.LocalDateTime;

@Slf4j
@Service
public class CopyServiceDB implements CopyService {

    private final CopyRepository copyRepository;
    private final CopyMapper copyMapper;


    @Autowired
    public CopyServiceDB(CopyRepository copyRepository, CopyMapper copyMapper) {
        this.copyRepository = copyRepository;
        this.copyMapper = copyMapper;
    }

    @Override
    public Mono<CopyDTO.Item> create(CopyDTO.Create dto) {
        dto.setCreatedAt(LocalDateTime.now());
        return copyRepository.save(copyMapper.ToCreateEntity(dto)).map(copyMapper::ToDTOItem);
    }

}

service/impl/CopyServiceDB.java

6) MemberConsumer
package com.example.consumer.service;

import com.example.consumer.dto.CopyDTO;
import com.example.consumer.dto.MemberDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class MemberConsumer implements ApplicationRunner {

    private final ReactiveKafkaConsumerTemplate<String, MemberDTO.Item> memberConsumerTemplate;

    private final CopyService copyService;

    @Autowired
    public MemberConsumer(ReactiveKafkaConsumerTemplate<String, MemberDTO.Item> memberConsumerTemplate, CopyService copyService) {
        this.memberConsumerTemplate = memberConsumerTemplate;
        this.copyService = copyService;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        this.memberConsumerTemplate
                .receive()
                // Partition 별로 groupBy
                .groupBy(message -> message.receiverOffset().topicPartition())
                .flatMap(partitions -> {
                	// Partition 별로 record 를 순차적으로 소비
                    return partitions.concatMap(record -> {

                        MemberDTO.Item member = record.value();

                        long offset = record.offset();
                        int partition = record.partition();

                        CopyDTO.Create item = CopyDTO.Create.builder()
                                .kafkaPartition(partition)
                                .kafkaOffset((int) offset)
                                .originId(member.getId())
                                .name(member.getName())
                                .age(member.getAge())
                                .profileUrl(member.getProfileUrl())
                                .build();

                        // Copy 테이블에 저장
                        return copyService.create(item)
                        .publishOn(Schedulers.boundedElastic())
                        .doOnSuccess(result -> {
                        	// 저장 후 수동 Commit
                        	record.receiverOffset().acknowledge();
                        }).subscribe();

                    });
                }).subscribe();

    }
}

service/MemberConsumer.java

TaskVibes: ADHD Simple Manage - Apps on Google Play
ADHD 를 겪는 사람들을 위한 일정관리 앱

B. 중복방지에 대하여

Consumer 를 만들면서 네트워크 장애 또는 시스템 재시작 같은 상황으로 Application이 죽는 상황을 고려 했는데 이러한 경우 메시지 소비를 비동기로 처리를 하다 보니 메시지를 Offset 순서대로 수신해도 Commit 하는 순서가 달라져서 다시 Application을 실행할때 유실이나 중복의 문제가 발생할 수 있다.

예를 들어 Offset 을 1 2 3 4 5 로 순서대로 받았는데 5 를 commit 을 하고 1 2 3 4 가 각각 처리를 하는 도중에 Application이 죽은 뒤에 다시 Application이 재시작을 하게 되는 경우 6 부터 Offset 을 가져오게 되니 1 2 3 4 는 처리를 못하니 유실이 발생한다.

반대의 상황으로 Offset 을 1 2 3 4 5 로 순서대로 받았는데 5 3 4 를 commit 을 하고 마지막으로 1을 Commit 을 한 뒤 Application이 죽으면 2 부터 Offset 을 가져오게 되니 앞서 이미 처리한 5 3 4 는 중복 처리를 하게 된다.

처리하는 데이터의 중요도에 따라 다르겠지만 주문 데이터와 같이 중요한 데이터의 유실이 의심 될 때는 Offset 을 Reset 해서 중복없이 처리해야 하는 상황이 있을 것이라 생각했고 이를 위해 Redis 의 Hash 를 사용해보기로 했다.

TaskVibes: ADHD Simple Manage - Apps on Google Play
ADHD 를 겪는 사람들을 위한 일정관리 앱
1) Service
package com.example.consumer.service;


import com.example.consumer.dto.CopyDTO;
import reactor.core.publisher.Mono;

public interface CopyService {
    // Redis, Get Key Name
    String getKeyName(String topic, int partition);

    // Redis, Save Offset
    Mono<Boolean> saveOffset(String key, String offset);

    // Redis, IsSaved Offset
    Mono<Boolean> isSavedOffset(String key, String offset);

    Mono<CopyDTO.Item> create(CopyDTO.Create dto);
}

service/CopyService.java

package com.example.consumer.service.impl;

import com.example.consumer.dto.CopyDTO;
import com.example.consumer.mapper.CopyMapper;
import com.example.consumer.repository.CopyRepository;
import com.example.consumer.service.CopyService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ReactiveHashOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

import java.time.LocalDateTime;

@Slf4j
@Service
public class CopyServiceDB implements CopyService {

    private final ReactiveHashOperations<String, String, String> hashOperations;
    private final CopyRepository copyRepository;
    private final CopyMapper copyMapper;
    @Value("${config.consumer.member.group-id:consumer-group}")
    private String memberGroupId;


    @Autowired
    public CopyServiceDB(ReactiveRedisTemplate<String, String> redisTemplate, CopyRepository copyRepository, CopyMapper copyMapper) {
        this.hashOperations = redisTemplate.opsForHash();
        this.copyRepository = copyRepository;
        this.copyMapper = copyMapper;
    }

    @Override
    public String getKeyName(String topic, int partition) {
        return String.format("%s:%s:%d", memberGroupId, topic, partition);
    }

    @Override
    public Mono<Boolean> saveOffset(String key, String offset) {
        return this.hashOperations.put(key, offset, "1");
    }

    @Override
    public Mono<Boolean> isSavedOffset(String key, String offset) {
        return this.hashOperations.hasKey(key, offset);
    }

    @Override
    public Mono<CopyDTO.Item> create(CopyDTO.Create dto) {
        dto.setCreatedAt(LocalDateTime.now());
        return copyRepository.save(copyMapper.ToCreateEntity(dto)).map(copyMapper::ToDTOItem);
    }

}

service/impl/CopyServiceDB.java

2) MemberConsumer

아래 코드의 내용은 보면 알겠지만 Redis Hash 값으로 consumer-group-id:topic-id:partition 을 키값으로 가지는 Hash 를 하나 만들어서 Kafka Offset 을 Field 값으로 등록해서, "무언가 중요한 프로세스"를 처리하기 전에 이미 값이 존재 하는 지 여부를 확인하고 없으면 "무언가 중요한 프로세스" 를 처리하고 Commit 처리를 하면서 Hash Field 값 또한 등록 시키는 단순한 과정이다.

이렇게 처리를 하면 해당 Topic 의 특정 Partition 의 Offset 값을 초기화 해도 유실없이 중복처리를 방지 할 수 있겠다. 물론 Redis 키값의 처리는 일 별로 하던 월 별로 나누어 저장을 해서 필요없는 키값을 지워주는 작업이 필요할 수도 있다. (Redis 의 다른 데이터구조를 사용하는 것도 괜찮겠다. SortedSet 로 Offset 을 Score 로 등록해서 특정구간의 Offset 의 범위를 가져와서 재처리 할 수도 있겠다.)

package com.example.consumer.service;

import com.example.consumer.dto.CopyDTO;
import com.example.consumer.dto.MemberDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.ReceiverRecord;

@Service
@Slf4j
public class MemberConsumer implements ApplicationRunner {

    private final ReactiveKafkaConsumerTemplate<String, MemberDTO.Item> memberConsumerTemplate;

    private final CopyService copyService;

    @Autowired
    public MemberConsumer(ReactiveKafkaConsumerTemplate<String, MemberDTO.Item> memberConsumerTemplate, CopyService copyService) {
        this.memberConsumerTemplate = memberConsumerTemplate;
        this.copyService = copyService;
    }

	// Copy Table 저장 후 Kafka Offset Commit
    private Mono<CopyDTO.Item> fireAndCommit(ReceiverRecord<String, MemberDTO.Item> record, CopyDTO.Create item, String topic, int partition, long offset) {
        return copyService.create(item)
                .publishOn(Schedulers.boundedElastic())
                .doOnSuccess(result -> {
                    String keyName = copyService.getKeyName(topic, partition);
                    copyService.saveOffset(keyName, String.valueOf(offset))
                            .doOnSuccess(
                                    res -> {
                                        log.info("## Offset Save Success: {} {} {}", topic, partition, offset);
                                        // Record Commit
                                        record.receiverOffset().acknowledge();
                                    }
                            )
                            .subscribe();
                });
    }

	// Record 처리 
    private Mono<CopyDTO.Item> processRecord(ReceiverRecord<String, MemberDTO.Item> record) {
        MemberDTO.Item member = record.value();

        String topic = record.topic();
        int partition = record.partition();
        long offset = record.offset();

		// Redis Key 값
        String keyName = copyService.getKeyName(record.topic(), partition);

        return copyService.isSavedOffset(keyName, String.valueOf(offset))
                .publishOn(Schedulers.boundedElastic())
                .flatMap(saved -> {
                    if (!saved) {
                        CopyDTO.Create item = CopyDTO.Create.builder()
                                .kafkaPartition(partition)
                                .kafkaOffset((int) offset)
                                .originId(member.getId())
                                .name(member.getName())
                                .age(member.getAge())
                                .profileUrl(member.getProfileUrl())
                                .build();

                        return fireAndCommit(record, item, topic, partition, offset);
                    }
                    log.info("## Offset Already Saved: {} {} {}", topic, partition, offset);
                    return Mono.empty();
                });

    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        this.memberConsumerTemplate
                .receive()
                .groupBy(message -> message.receiverOffset().topicPartition())
                .flatMap(partitions -> {
                    return partitions.concatMap(this::processRecord);
                }).subscribe();

    }

}

service/MemberConsumer.java

DB 테이블 member 에 쌓인 데이터가 copy에도 동일하게 저장 되어 있는지 연령별로 파티션은 잘 나뉘어 있는지, Offset 을 초기화 했을때 copy 테이블에 데이터가 다시 들어가지는 않았는지 확인해야 한다.

Backpressure 를 고민한다면 다음과 같이 처리해도 될 듯 하다.

package com.example.consumer.service;

import com.example.consumer.dto.CopyDTO;
import com.example.consumer.dto.MemberDTO;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.ReceiverRecord;

@Service
@Slf4j
public class MemberConsumer extends BaseSubscriber<MemberDTO.Item> implements ApplicationRunner {

    private final ReactiveKafkaConsumerTemplate<String, MemberDTO.Item> memberConsumerTemplate;

    private final CopyService copyService;

    @Autowired
    public MemberConsumer(ReactiveKafkaConsumerTemplate<String, MemberDTO.Item> memberConsumerTemplate, CopyService copyService) {
        this.memberConsumerTemplate = memberConsumerTemplate;
        this.copyService = copyService;
    }

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        log.info("## HookOnSubscribe 10 / GetName : {}", Thread.currentThread().getName());
        subscription.request(10);
    }

    @Override
    protected void hookOnNext(MemberDTO.Item item) {
        log.info("## HookOnNext 1 / GetName : {}", Thread.currentThread().getName());
        request(1);
    }

    private Mono<CopyDTO.Item> fireAndCommit(ReceiverRecord<String, MemberDTO.Item> record, CopyDTO.Create item, String topic, int partition, long offset) {
        return copyService.create(item)
                .publishOn(Schedulers.boundedElastic())
                .doOnSuccess(result -> {
                    String keyName = copyService.getKeyName(topic, partition);
                    copyService.saveOffset(keyName, String.valueOf(offset))
                            .doOnSuccess(
                                    res -> {
                                        log.info("## Offset Save Success: {} {} {}", topic, partition, offset);
                                        record.receiverOffset().acknowledge();
                                    }
                            )
                            .subscribe();
                });
    }

    private Mono<MemberDTO.Item> processRecord(ReceiverRecord<String, MemberDTO.Item> record) {
        MemberDTO.Item member = record.value();

        String topic = record.topic();
        int partition = record.partition();
        long offset = record.offset();

        String keyName = copyService.getKeyName(record.topic(), partition);

        return copyService.isSavedOffset(keyName, String.valueOf(offset))
                .publishOn(Schedulers.boundedElastic())
                .flatMap(saved -> {
                    if (!saved) {
                        CopyDTO.Create item = CopyDTO.Create.builder()
                                .kafkaPartition(partition)
                                .kafkaOffset((int) offset)
                                .originId(member.getId())
                                .name(member.getName())
                                .age(member.getAge())
                                .profileUrl(member.getProfileUrl())
                                .build();

                        return fireAndCommit(record, item, topic, partition, offset);
                    }
                    log.info("## Offset Already Saved: {} {} {}", topic, partition, offset);
                    return Mono.empty();
                })
                .thenReturn(member);

    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        this.memberConsumerTemplate
                .receive()
                .groupBy(message -> message.receiverOffset().topicPartition())
                .flatMap(partitions -> {
                    return partitions.concatMap(this::processRecord);
                }).subscribe(this);

    }

}

service/MemberConsumer.java

3) Kafka, Offset Reset

Offset 초기화를 위한 명령어도 기록 해두기로 한다.

./kafka-consumer-groups.sh --bootstrap-server Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092 --group member-consumer-group --describe

kafka-consumer-groups 의 Consumer Group 상세 확인 예시

./kafka-consumer-groups.sh --bootstrap-server Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092 --topic member --group member-consumer-group --reset-offsets --to-earliest --execute

kafka-consumer-groups 의 Offset 초기화 예시

TaskVibes: ADHD Simple Manage - Apps on Google Play
ADHD 를 겪는 사람들을 위한 일정관리 앱