기본 설정해보기

의존성 설정 파일

build.gradle

// kafka  
implementation 'org.springframework.kafka:spring-kafka'

application.yml

spring
	kafka:  
	  bootstrap-servers: localhost:9092

Producer 및 Consumer 설정

프로젝트 코드에서 설정

Producer 파트 설정

/**  
 * ProducerFactory 설정  
 */
@Bean  
public ProducerFactory<String, String> producerFactory() {  
    HashMap<String, Object> config = new HashMap<>();  
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);  
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);  // 직렬화 방식
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);  // 직렬화 방식
    config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);  // 파티션으로 전달하는 방법을 정해준다.
    return new DefaultKafkaProducerFactory<>(config);  
}
 
 
/**  
 * Kafka Template 
 */
@Bean  
public KafkaTemplate<String, String> kafkaTemplate() {  
    KafkaTemplate<String, String> stringKafkaTemplate = new KafkaTemplate<>(  
        producerFactory());  
 
	// 위에서 설정한 Producer를 링크해준다.
    stringKafkaTemplate.setProducerListener(kafkaProducerListener);  
  
    return stringKafkaTemplate;  
}

Comsumer 파트 설정

/**  
 * ComsumerFactory 설정  
 */  
@Bean  
public ConsumerFactory<String, String> consumerFactory() {  
    HashMap<String, Object> config = new HashMap<>();  
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);  
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);  
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);  
  
    return new DefaultKafkaConsumerFactory<>(config);  
}  
  
/**  
 * 카프카 리스너 설정  
 */  
@Bean  
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>  
kafkaListenerContainerFactory() {  
    ConcurrentKafkaListenerContainerFactory<String, String> factory =  
        new ConcurrentKafkaListenerContainerFactory<>();  
  
    factory.setConsumerFactory(consumerFactory());  
	// custom한 에러핸들러를 설정해서 지정해 줄 수 있다.
    factory.setCommonErrorHandler(kafkaErrorHandler);  
  
    return factory;  
}

직렬화 처리

카프카 데이터 직렬화 변환

public class KafkaEventSerializer {  
  
    private static final ObjectMapper objectMapper = new ObjectMapper();  
  
    private KafkaEventSerializer() {  
        throw new UnsupportedOperationException("이 클래스는 인스턴스 생성을 지원하지 않습니다.");  
    }  
  
    // 직렬화 (객체 -> JSON 문자열)  
    public static <T> String serialize(T object) {  
        try {  
            return objectMapper.writeValueAsString(object);  
        } catch (JsonProcessingException e) {  
            log.error("Failed to serialize object: {}", object, e);  
            throw new RuntimeException("Serialization error", e);  
        }  
    }  
  
    // 역직렬화 (JSON 문자열 -> 객체)  
    public static <T> T deserialize(String json, Class<T> clazz) {  
        try {  
            return objectMapper.readValue(json, clazz);  
        } catch (JsonProcessingException e) {  
            log.error("Failed to deserialize JSON: {}", json, e);  
            throw new RuntimeException("Deserialization error", e);  
        }  
    }  
}

KafkaListener 설정

예시

  • 아래의 예시에선 동일한 토픽을 다른 Consumer 그룹에서 독립적으면 처리한다.
// 주문 처리 서비스
@KafkaListener(
    groupId = "order-processing-service",
    topics = "order-events"
)
public void processOrder(String orderData) {
    // 주문 처리 로직
}
 
// 알림 서비스 (같은 메시지를 독립적으로 처리)
@KafkaListener(
    groupId = "notification-service", 
    topics = "order-events"
)
public void sendNotification(String orderData) {
    // 알림 발송 로직
}

Topcis 파라미터의 역할

  • Topics는 Kafka 메시지를 저장하는 논리적인 채널입니다.

특징

  • 메시지 분류
    • 특정 유형의 데이터나 이벤트를 분류하여 저장하는 카테고리 역할
  • 데이터 스트림 구분
    • 서로 다른 종류의 메시지들을 구분하여 관리
  • 구독 대상 지정
    • Consumer가 어떤 토픽의 메시지를 소비할지 결정

GroupId 파라미터의 역할

  • Kafka Consumer의 그룹 식별자로, 여러 Consumer 인스턴스들을 하나의 논리적 그룹으로 묶는 역할을 합니다.

특징

  • 메시지 분산 처리
    • 같은 groupId를 가진 Consumer들은 토픽의 파티션을 나누어서 처리
    • 각 파티션은 그룹 내에서 오작 하나의 Consumer만 담당
  • 장애 복구
    • 한 Consumer가 다운되면 같은 그룹의 다른 Consumer가 해당 파티션을 인계받음
    • 자동 리밸런싱을 통해서 고가용성 보장
  • 오프셋 관리
    • 그룹 단위로 메시지 Offset 추적
    • Consumer 재시작 시 마지막 처리 시점 부터 재개 가능
  • 독립적인 소비
    • 서로 다른 groupId를 가진 Consumer들은 동일한 메시지를 독립적으로 소비
    • 하나의 토픽을 여러 서비스에서 가각ㄱ

추후 해볼 것

  • 직접 작성한 직렬화를 사용하지 않고 카프카 자체 직렬화 기능을 사용해 보자