최근 수정이 적고 반복적으로 질의 되는 DB 데이터를 Cache할 일이 생겼습니다. 어떤 방식으로 할까 고민하다. 최근 많이 사용되는 Redis Cache, Kafka 방식을 선택하였고 그 과정을 간단하게 기록으로 남깁니다.
애플리케이션이 A, B 두개가 있다고 가정하고 B에서는 API를 통해 반복적으로 조회가 일어납니다. 반복적인 질의는 불필요하므로 캐시를 적용합니다. 이때, A서버에서 데이터의 저장, 수정, 삭제 등과 같은 트랜잭션이 일어나게 되면 Kafka를 통해 B 서버로 변경을 알리게 됩니다. B서버는 이를 감지하고 Redis 캐시를 무효화 하고 새롭게 데이터를 Redis Cache에 적재합니다.
Redis 설정
Redis 설치
Redis는 메모리 내 데이터 구조 저장소로, 캐시, 데이터베이스, 메시지 브로커 역할을 합니다.
- macOS
brew install redis
- Ubuntu
sudo apt-get update
sudo apt-get install redis-server
Redis 설정 및 실행
- Redis 서버 시작
redis-server
- Redis 클라이언트 접속
redis-cli
KEYS * ( 현재 저장 중인 전체 키 조회 )
FLUSHALL
Springboot에서 Redis 사용 설정
spring:
cache:
type: redis
redis:
host: localhost
port: 6379
Kafka 설정
Kafka 설치
Apache Kafka는 분산 이벤트 스트리밍입니다.
- 다운로드 페이지 : https://kafka.apache.org/downloads
Kafka 설정 및 실행
- Kafka 2.5 이상 버전에서는 Zookeeper를 별도로 설치해야 합니다.
- 바이너리 파일을 다운 받고 해당 폴더의 압축을 해제 합니다.
- Kafka를 실행하기 전에 Zookeeper를 먼저 실행해야 합니다
bin/zookeeper-server-start.sh config/zookeeper.properties
- Kafka 서버 실행 전 server.properties 파일을 열어서 설정 정보를 확인합니다.
- broker.id는 고유해야 하며, 단일 브로커인 경우 0으로 설정합니다.
- advertised.listeners 설정이 클라이언트에서 접근 가능한 주소로 설정되어 있는지 확인합니다.
broker.id=0
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
- Kafka 서버 실행
bin/kafka-server-start.sh config/server.properties
- Springboot Kafka 사용 설정
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: comnKeywordGroup
auto-offset-reset: earliest
- 사용할 토픽 추가
in/kafka-topics.sh --create --topic comnKeyword-updates --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
- 토픽 생성 확인
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
== 토픽 리스트 출력 ==
__consumer_offsets
comnKeyword-updates
test-topic
Springboot 애플리케이션 개발 ( 캐시를 적용할 서버 애플리케이션 )
의존성 추가
// Redis를 사용하기 위한 의존성 추가
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
// Spring Kafka 의존성 추가
implementation 'org.springframework.kafka:spring-kafka'
Redis 캐시 설정
@Configuration
@EnableCaching
public class RedisCacheConfig {
@Bean
public RedisCacheManager cacheManager(RedisConnectionFactory connectionFactory) {
// ObjectMapper 설정
ObjectMapper objectMapper = new ObjectMapper();
// 커스텀 TypeResolverBuilder 생성
ObjectMapper.DefaultTypeResolverBuilder typeResolverBuilder = new ObjectMapper.DefaultTypeResolverBuilder(ObjectMapper.DefaultTyping.NON_FINAL, LaissezFaireSubTypeValidator.instance) {
@Override
public boolean useForType(JavaType t) {
// 컬렉션 타입에 대해서도 타입 정보를 포함하도록 설정
return t.isCollectionLikeType() || super.useForType(t);
}
};
typeResolverBuilder.init(JsonTypeInfo.Id.CLASS, null);
typeResolverBuilder.inclusion(JsonTypeInfo.As.PROPERTY);
// ObjectMapper에 커스텀 TypeResolverBuilder 적용
objectMapper.setDefaultTyping(typeResolverBuilder);
// 기타 설정
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.enable(DeserializationFeature.READ_ENUMS_USING_TO_STRING);
objectMapper.enable(SerializationFeature.WRITE_ENUMS_USING_TO_STRING);
// Serializer 설정
GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer(objectMapper);
// RedisCacheConfiguration 설정
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(30))
.serializeValuesWith(
RedisSerializationContext.SerializationPair.fromSerializer(serializer)
);
return RedisCacheManager.builder(connectionFactory)
.cacheDefaults(config)
.build();
}
}
Kafka 프로듀서 및 컨슈머 설정
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "comnKeywordGroup");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
캐시 무효화를 위한 Kafka 리스너 구현
@Service
@RequiredArgsConstructor
public class CommonKeywordUpdateListener {
private final CacheManager cacheManager;
@KafkaListener(topics = "comnKeyword-updates", groupId = "comnKeywordGroup")
public void listenComnKeywordUpdates(String message) {
// 캐시 무효화
Objects.requireNonNull(cacheManager.getCache("comnKeywordsCache")).clear();
}
}
적용할 캐시 함수 설정
@Builder
@Service
@RequiredArgsConstructor
@Transactional(readOnly = true)
public class AprExceptComnKeywordServiceImpl implements AutoApprovalKeywordExService {
private final AprExceptComnKeywordRepository aprExceptComnKeywordRepository;
private final AprExceptPcNuseKeywordRepository aprExceptPcNuseKeywordRepository;
private final AprExceptComnKeywordCacheBean aprExceptComnKeywordCacheBean;
/**
* 문장을 받아서 예외 처리된 키워드와 그렇지 않은 키워드를 반환하는 메서드
*/
@Override
public AprExceptComnKeywordSentenceResponse getAutoApprovalSentenceKeywordEx(String pcCode, String sentence) {
List<AprExceptComnKeyword> comnKeywords = aprExceptComnKeywordCacheBean.getAprExceptComnKeywords();
// 문장 내에서 공통 키워드가 포함되는지 확인하고, 예외 여부에 따라 분류
Map<Boolean, List<AprExceptComnKeywordDto>> keywordPartition = comnKeywords.stream()
.filter(comnKeyword -> AprExceptComnKeyword.containsKeyword(sentence, comnKeyword.getKeyword())) // 문장에 키워드 포함 여부 확인
.collect(partitioningBy(
comnKeyword -> isKeywordInPcNuseTable(pcCode, comnKeyword.getAprExceptComnKeywordIdx()), // 예외 처리 여부에 따른 분류
mapping(comnKeyword -> AprExceptComnKeywordDto.builder()
.keyword(comnKeyword.getKeyword())
.gubun(String.valueOf(comnKeyword.getGubun())) // gubun 정보 포함
.build(), toList())
));
// 응답 반환 (유효한 키워드와 예외 키워드를 포함)
return AprExceptComnKeywordSentenceResponse.toResponseWithKeywords(
keywordPartition.get(false), // 예외 처리되지 않은 유효한 키워드
keywordPartition.get(true), // 예외 처리된 키워드
AprExceptComnKeyword.isExcept(keywordPartition.get(false)),
pcCode);
}
위 메서드에서 getAprExceptComnKeywords(); 함수를 호출해야 하는데 이때 호출되는 쪽에 캐시를 적용한다 이때 호출하는 쪽과 다른 빈이여야 하므로 새로운 클래스를 만들어 적용한다.
@Builder
@Service
@RequiredArgsConstructor
@Transactional(readOnly = true)
public class AprExceptComnKeywordCacheBean {
private final AprExceptComnKeywordRepository aprExceptComnKeywordRepository;
// Redis Cache 설정
@Cacheable(value = "comnKeywordsCache", key = "'comnKeywords'")
public List<AprExceptComnKeyword> getAprExceptComnKeywords() {
return aprExceptComnKeywordRepository.findAll();
}
}
Springboot 애플리케이션 개발 ( 변경사항 발생을 전달하는 서버 )
@Service
@RequiredArgsConstructor
public class KafkaTest {
private final KafkaTemplate<String, String> kafkaTemplate;
public void saveOrUpdateComnKeyword() {
// 데이터 변경 이벤트 발행
kafkaTemplate.send("comnKeyword-updates", "update");
}
public void deleteComnKeyword(Long id) {
// 데이터 변경 이벤트 발행
kafkaTemplate.send("comnKeyword-updates", "delete");
}
}
'개발관련 이것저것' 카테고리의 다른 글
인텔리제이 Git clone (0) | 2024.11.22 |
---|---|
o.apache.kafka.clients.NetworkClient-1145 - [Consumer clientId=consumer-comnKeywordGroup-1, groupId=comnKeywordGroup] Error while fetching metadata with correlation id 157 : {comnKeyword-updates=LEADER_NOT_AVAILABLE} (0) | 2024.10.16 |
고수준 & 저수준 (0) | 2024.05.24 |
SOLID (0) | 2024.05.23 |
VO & DTO (0) | 2024.05.23 |