본문 바로가기
개발관련 이것저것

Springboot + Redis + Kafka 설치부터 설정, 실행 과정

by 이상한나라의개발자 2024. 10. 16.

최근 수정이 적고 반복적으로 질의 되는 DB 데이터를 Cache할 일이 생겼습니다. 어떤 방식으로 할까 고민하다. 최근 많이 사용되는 Redis Cache, Kafka 방식을 선택하였고 그 과정을 간단하게 기록으로 남깁니다.

 

애플리케이션이 A, B 두개가 있다고 가정하고 B에서는 API를 통해 반복적으로 조회가 일어납니다. 반복적인 질의는 불필요하므로 캐시를 적용합니다. 이때, A서버에서 데이터의 저장, 수정, 삭제 등과 같은 트랜잭션이 일어나게 되면 Kafka를 통해 B 서버로 변경을 알리게 됩니다. B서버는 이를 감지하고 Redis 캐시를 무효화 하고 새롭게 데이터를 Redis Cache에 적재합니다.

 

Redis 설정

Redis 설치

Redis는 메모리 내 데이터 구조 저장소로, 캐시, 데이터베이스, 메시지 브로커 역할을 합니다.

 

  • macOS
brew install redis

 

  • Ubuntu
sudo apt-get update
sudo apt-get install redis-server

 

Redis 설정 및 실행

  • Redis 서버 시작
redis-server
  • Redis 클라이언트 접속
redis-cli
KEYS * ( 현재 저장 중인 전체 키 조회 )
FLUSHALL

 

Springboot에서 Redis 사용 설정

spring:
  cache:
    type: redis
  redis:
    host: localhost
    port: 6379

 

Kafka 설정

Kafka 설치

Apache Kafka는 분산 이벤트 스트리밍입니다.

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

Kafka 설정 및 실행

  • Kafka 2.5 이상 버전에서는 Zookeeper를 별도로 설치해야 합니다.
  • 바이너리 파일을 다운 받고 해당 폴더의 압축을 해제 합니다.
  • Kafka를 실행하기 전에 Zookeeper를 먼저 실행해야 합니다
bin/zookeeper-server-start.sh config/zookeeper.properties

 

  • Kafka 서버 실행 전 server.properties 파일을 열어서 설정 정보를 확인합니다.
  • broker.id는 고유해야 하며, 단일 브로커인 경우 0으로 설정합니다.
  • advertised.listeners 설정이 클라이언트에서 접근 가능한 주소로 설정되어 있는지 확인합니다.
broker.id=0
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092

 

  • Kafka 서버 실행
bin/kafka-server-start.sh config/server.properties
 
  • Springboot Kafka 사용 설정
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: comnKeywordGroup
      auto-offset-reset: earliest

 

  • 사용할 토픽 추가 
in/kafka-topics.sh --create --topic comnKeyword-updates --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1

 

  • 토픽 생성 확인
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

== 토픽 리스트 출력 ==
__consumer_offsets
comnKeyword-updates
test-topic

 

Springboot 애플리케이션 개발 ( 캐시를 적용할 서버 애플리케이션 )

의존성 추가

// Redis를 사용하기 위한 의존성 추가
implementation 'org.springframework.boot:spring-boot-starter-data-redis'

// Spring Kafka 의존성 추가
implementation 'org.springframework.kafka:spring-kafka'

 

Redis 캐시 설정

@Configuration
@EnableCaching
public class RedisCacheConfig {

    @Bean
    public RedisCacheManager cacheManager(RedisConnectionFactory connectionFactory) {
        // ObjectMapper 설정
        ObjectMapper objectMapper = new ObjectMapper();

        // 커스텀 TypeResolverBuilder 생성
        ObjectMapper.DefaultTypeResolverBuilder typeResolverBuilder = new ObjectMapper.DefaultTypeResolverBuilder(ObjectMapper.DefaultTyping.NON_FINAL, LaissezFaireSubTypeValidator.instance) {
            @Override
            public boolean useForType(JavaType t) {
                // 컬렉션 타입에 대해서도 타입 정보를 포함하도록 설정
                return t.isCollectionLikeType() || super.useForType(t);
            }
        };
        typeResolverBuilder.init(JsonTypeInfo.Id.CLASS, null);
        typeResolverBuilder.inclusion(JsonTypeInfo.As.PROPERTY);

        // ObjectMapper에 커스텀 TypeResolverBuilder 적용
        objectMapper.setDefaultTyping(typeResolverBuilder);

        // 기타 설정
        objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        objectMapper.enable(DeserializationFeature.READ_ENUMS_USING_TO_STRING);
        objectMapper.enable(SerializationFeature.WRITE_ENUMS_USING_TO_STRING);

        // Serializer 설정
        GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer(objectMapper);

        // RedisCacheConfiguration 설정
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofMinutes(30))
                .serializeValuesWith(
                        RedisSerializationContext.SerializationPair.fromSerializer(serializer)
                );

        return RedisCacheManager.builder(connectionFactory)
                .cacheDefaults(config)
                .build();
    }
}

 

Kafka 프로듀서 및 컨슈머 설정 

@Configuration
public class KafkaConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "comnKeywordGroup");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

캐시 무효화를 위한 Kafka 리스너 구현

@Service
@RequiredArgsConstructor
public class CommonKeywordUpdateListener {

    private final CacheManager cacheManager;

    @KafkaListener(topics = "comnKeyword-updates", groupId = "comnKeywordGroup")
    public void listenComnKeywordUpdates(String message) {
        // 캐시 무효화
        Objects.requireNonNull(cacheManager.getCache("comnKeywordsCache")).clear();
    }
}

 

적용할 캐시 함수 설정

@Builder
@Service
@RequiredArgsConstructor
@Transactional(readOnly = true)
public class AprExceptComnKeywordServiceImpl implements AutoApprovalKeywordExService {

    private final AprExceptComnKeywordRepository aprExceptComnKeywordRepository;
    private final AprExceptPcNuseKeywordRepository aprExceptPcNuseKeywordRepository;
    private final AprExceptComnKeywordCacheBean aprExceptComnKeywordCacheBean;

    /**
     * 문장을 받아서 예외 처리된 키워드와 그렇지 않은 키워드를 반환하는 메서드
     */
    @Override
    public AprExceptComnKeywordSentenceResponse getAutoApprovalSentenceKeywordEx(String pcCode, String sentence) {
        List<AprExceptComnKeyword> comnKeywords = aprExceptComnKeywordCacheBean.getAprExceptComnKeywords();

        // 문장 내에서 공통 키워드가 포함되는지 확인하고, 예외 여부에 따라 분류
        Map<Boolean, List<AprExceptComnKeywordDto>> keywordPartition = comnKeywords.stream()
                .filter(comnKeyword -> AprExceptComnKeyword.containsKeyword(sentence, comnKeyword.getKeyword())) // 문장에 키워드 포함 여부 확인
                .collect(partitioningBy(
                        comnKeyword -> isKeywordInPcNuseTable(pcCode, comnKeyword.getAprExceptComnKeywordIdx()), // 예외 처리 여부에 따른 분류
                        mapping(comnKeyword -> AprExceptComnKeywordDto.builder()
                                .keyword(comnKeyword.getKeyword())
                                .gubun(String.valueOf(comnKeyword.getGubun())) // gubun 정보 포함
                                .build(), toList())
                ));

        // 응답 반환 (유효한 키워드와 예외 키워드를 포함)
        return AprExceptComnKeywordSentenceResponse.toResponseWithKeywords(
                keywordPartition.get(false),  // 예외 처리되지 않은 유효한 키워드
                keywordPartition.get(true),   // 예외 처리된 키워드
                AprExceptComnKeyword.isExcept(keywordPartition.get(false)),
                pcCode);
    }

 

위 메서드에서 getAprExceptComnKeywords(); 함수를 호출해야 하는데 이때 호출되는 쪽에 캐시를 적용한다 이때 호출하는 쪽과 다른 빈이여야 하므로 새로운 클래스를 만들어 적용한다.

 

@Builder
@Service
@RequiredArgsConstructor
@Transactional(readOnly = true)
public class AprExceptComnKeywordCacheBean {

    private final AprExceptComnKeywordRepository aprExceptComnKeywordRepository;

    // Redis Cache 설정
    @Cacheable(value = "comnKeywordsCache", key = "'comnKeywords'")
    public List<AprExceptComnKeyword> getAprExceptComnKeywords() {
        return aprExceptComnKeywordRepository.findAll();
    }
}

 

 

Springboot 애플리케이션 개발 ( 변경사항 발생을 전달하는 서버 )

@Service
@RequiredArgsConstructor
public class KafkaTest {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void saveOrUpdateComnKeyword() {
        // 데이터 변경 이벤트 발행
        kafkaTemplate.send("comnKeyword-updates", "update");
    }

    public void deleteComnKeyword(Long id) {
        // 데이터 변경 이벤트 발행
        kafkaTemplate.send("comnKeyword-updates", "delete");
    }
}