Spring Boot, WebFlux 에서 Reactive Redis Sorted Set 연동

Spring Boot WebFlux 를 학습하면서 Redis 의 Sorted Set 을 연동한 기록을 남겨둔다.

이 글은 WebFlux 로 API 를 작성한 지난 글의 소스를 토대로 작성 되었으며 Member API 를 통해 사용자 정보를 DB 와 Redis에 입력하고 입력된 데이터는 Redis에서 조회할 수 있도록 구현하는데 집중했다.

1. 구성 및 설정

1) build.gradle
plugins {
    id 'java'
    id 'org.springframework.boot' version '3.1.4'
    id 'io.spring.dependency-management' version '1.1.3'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'

java {
    sourceCompatibility = '17'
}

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
	// LocalDateTime JSON Serialize/UnSerialize 에 사용
    implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2'
    // Reactive Redis 라이브러리
    implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive:3.1.2'
    implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
    implementation 'org.springframework.boot:spring-boot-starter-validation'
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'io.netty:netty-resolver-dns-native-macos:4.1.95.Final:osx-aarch_64'
    compileOnly 'org.projectlombok:lombok'
    runtimeOnly 'org.mariadb:r2dbc-mariadb:1.1.3'
    runtimeOnly 'org.mariadb.jdbc:mariadb-java-client'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'io.projectreactor:reactor-test'
}

tasks.named('test') {
    useJUnitPlatform()
}
2) application.properties
# Redis 사용여부
redis.cache=true

# R2DBC
spring.r2dbc.url=r2dbc:mariadb://localhost:3306/spring_test
spring.r2dbc.username=root
spring.r2dbc.password=root
spring.r2dbc.pool.enabled=true
spring.r2dbc.pool.initial-size=50
spring.r2dbc.pool.max-size=50

# Redis
spring.data.redis.host=localhost
spring.data.redis.database=0
spring.data.redis.port=6379
spring.data.redis.password=root

# 재시작시 데이터 유지 
spring.session.redis.flush-mode=on_save
# Redis 서버 접속 대기시간 (서버 접속이 어려운 상황을 감안하도록 한다)
spring.data.redis.timeout=500ms
3) schema.sql
CREATE TABLE `member` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(128) NOT NULL,
  `age` int(2) NOT NULL,
  `created_at` datetime NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

Member Table Schema

2. Member Service

1) 디렉토리 구성
src
└── main
    └── java
        └── com.example.webfluxapi
         	├── common
         	├── controller
          	├── dto
        	├── entity
         	├── repository
         	└── service
		        └── impl

패키지 내 디렉토리 구성

2) MemberService Interface

아래 인터페이스를 상속 받아서 두개의 클래스를 작성할 것이고 하나는 DB에 대한 기능을 구현하고 하나는 Redis SortedSet 을 사용해서 저장, 조회, 목록 기능을 구현 할 것이다.

package com.example.webfluxapi.service;

import com.example.webfluxapi.dto.MemberDTO;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public interface MemberService {
    public Mono<MemberDTO.Crud> create(MemberDTO.Crud dto);
    public Mono<MemberDTO.Crud> item(Integer id);
    public Flux<MemberDTO.Crud> list(Integer page, Integer limit);
}

service/MemberService.java

3) MemberServiceDB 구현

아래는 DB 를 사용하기 위한 클래스 구현인데, 특이사항으로 application.properties 에 설정한 redis.cache 설정을 @ConditionalOnProperty 확인하는 부분이 있는데, 이 어노테이션은 설정값에 따라 빈을 생성하거나 제외할 수 있게 해주는 어노테이션이다.

package com.example.webfluxapi.service.impl;

import com.example.webfluxapi.dto.MemberDTO;
import com.example.webfluxapi.mapper.MemberMapper;
import com.example.webfluxapi.repository.MemberRepository;
import com.example.webfluxapi.service.MemberService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@Slf4j
@Service
@ConditionalOnProperty(name = "redis.cache", havingValue = "false")
public class MemberServiceDB implements MemberService {

    private final MemberRepository memberRepository;
    private final MemberMapper memberMapper;

    @Autowired
    public MemberServiceDB(MemberRepository memberRepository, MemberMapper memberMapper) {
        this.memberRepository = memberRepository;
        this.memberMapper = memberMapper;
    }

    @Override
    public Mono<MemberDTO.Item> create(MemberDTO.Create dto) {
        dto.setCreatedAt(LocalDateTime.now());
        return memberRepository.save(memberMapper.ToCreateEntity(dto)).map(memberMapper::ToDTOItem);
    }

    @Override
    public Mono<MemberDTO.Item> item(Integer id) {
        return memberRepository.findById(id).map(memberMapper::ToDTOItem);
    }

    @Override
    public Flux<MemberDTO.Item> list(Integer page, Integer limit) {
        return memberRepository.findAllBy((page - 1) * limit, limit).map(memberMapper::ToDTOItem);
    }
}

service/imple/MemberServiceDB.java

4) MemberServiceCache 구현

MemberService Cache 클래스는 기본적으로 MemberService DB 클래스를 상속 받고 있고, redis.cache 가 true 가 설정 되었을 때 구동 하도록 한다.

3-1) 생성자

Reactive Redis Template 은 Spring Data Redis 모듈에서 제공하는 클래스로 ReactiveRedisOperations 인터페이스를 구현하고 있으며 String, List, Zset, Hash 등의 타입에 대한 인터페이스를 제공한다.

    private final ReactiveZSetOperations<String, String> reactiveRedisZSet;

    @Autowired
    public MemberServiceCache(
            MemberRepository memberRepository,
            MemberMapper memberMapper,
            ReactiveRedisTemplate<String, String> reactiveRedisTemplate
    ) {
        super(memberRepository, memberMapper);
        
        this.objectMapper.registerModule(new com.fasterxml.jackson.datatype.jsr310.JavaTimeModule());

        this.reactiveRedisZSet = reactiveRedisTemplate.opsForZSet();
    }
3-2) 사용자 정보 저장
    @Override
    public Mono<MemberDTO.Crud> create(MemberDTO.Crud dto) {
        return super.create(dto).flatMap(
                member -> {
                    String encode;
                    try {
                        encode = objectMapper.writeValueAsString(member);
                    } catch (JsonProcessingException e) {
                        return Mono.error(new RuntimeException(e));
                    }
                    return this.reactiveRedisZSet
                            .add(KEY, encode, member.getId())
                            .then(Mono.just(member));
                }
        );
    }
3-3) 사용자 정보 조회

아래 구문중 reverseRangeByScore 함수는 아래 Redis 명령 예시와 같은 구현이다.

ZRANGEBYSCORE member 11 11
	@Override
    public Mono<MemberDTO.Crud> item(Integer id) {
        return Mono.from(this.reactiveRedisZSet
                .rangeByScore(KEY, Range.from(Range.Bound.inclusive((double) id)).to(Range.Bound.inclusive((double) id)))
                .handle((member, sink) -> {
                    try {
                        sink.next(objectMapper.readValue(member, MemberDTO.Crud.class));
                    } catch (JsonProcessingException e) {
                        sink.error(new RuntimeException(e));
                    }
                })
        );
    }
3-4) 사용자 목록 구현

아래 구문중 reverseRangeByScore 함수는 아래 Redis 명령 예시와 같은 구현이다.

ZREVRANGEBYSCORE member +inf -inf LIMIT 0 9
    @Override
    public Flux<MemberDTO.Crud> list(Integer page, Integer limit) {
        return this.reactiveRedisZSet
                .reverseRangeByScore(
                        KEY,
                        Range.unbounded(),
                        Limit.limit().offset((page-1)*limit).count(limit)
                ).handle((member, sink) -> {
                    try {
                        sink.next(objectMapper.readValue(member, MemberDTO.Crud.class));
                    } catch (JsonProcessingException e) {
                        sink.error(new RuntimeException(e));
                    }
                });

    }
3-5) MemberServiceCache 전체 소스
package com.example.webfluxapi.service.impl;

import com.example.webfluxapi.dto.MemberDTO;
import com.example.webfluxapi.mapper.MemberMapper;
import com.example.webfluxapi.repository.MemberRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveZSetOperations;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
@Service
@ConditionalOnProperty(name = "redis.cache", havingValue = "true")
public class MemberServiceCache extends MemberServiceDB {

    private final ObjectMapper objectMapper = new ObjectMapper();
    private final String KEY = "member";
    private final ReactiveZSetOperations<String, String> reactiveRedisZSet;

    @Autowired
    public MemberServiceCache(
            MemberRepository memberRepository,
            MemberMapper memberMapper,
            ReactiveRedisTemplate<String, String> reactiveRedisTemplate
    ) {
        super(memberRepository, memberMapper);

        this.objectMapper.registerModule(new com.fasterxml.jackson.datatype.jsr310.JavaTimeModule());

        this.reactiveRedisZSet = reactiveRedisTemplate.opsForZSet();
    }

    @Override
    public Mono<MemberDTO.Crud> create(MemberDTO.Crud dto) {
        return super.create(dto).flatMap(
                member -> {
                    String encode;
                    try {
                        encode = objectMapper.writeValueAsString(member);
                    } catch (JsonProcessingException e) {
                        return Mono.error(new RuntimeException(e));
                    }
                    return this.reactiveRedisZSet
                            .add(KEY, encode, member.getId())
                            .then(Mono.just(member));
                }
        );
    }

    @Override
    public Mono<MemberDTO.Crud> item(Integer id) {
        return Mono.from(this.reactiveRedisZSet
                .rangeByScore(KEY, Range.from(Range.Bound.inclusive((double) id)).to(Range.Bound.inclusive((double) id)))
                .handle((member, sink) -> {
                    try {
                        sink.next(objectMapper.readValue(member, MemberDTO.Crud.class));
                    } catch (JsonProcessingException e) {
                        sink.error(new RuntimeException(e));
                    }
                })
        );
    }

    @Override
    public Flux<MemberDTO.Crud> list(Integer page, Integer limit) {
        return this.reactiveRedisZSet
                .reverseRangeByScore(
                        KEY,
                        Range.unbounded(),
                        Limit.limit().offset((page-1)*limit).count(limit)
                ).handle((member, sink) -> {
                    try {
                        sink.next(objectMapper.readValue(member, MemberDTO.Crud.class));
                    } catch (JsonProcessingException e) {
                        sink.error(new RuntimeException(e));
                    }
                });

    }
}

service/impl/MemberServiceCache.java