본문 바로가기
  • _^**_
무근본 IT 지식 공유/무근본 스프링(Spring Framework)

[무근본 스프링(Spring Framework) 공부] 카프카 큐(Kafka Queue)를 스프링 프레임워크와 연결하는 예제

by 크리드로얄워터 2023. 3. 26.
반응형

카프카는 대규모 데이터 처리를 위한 분산 메시징 시스템으로, 스프링 프레임워크와 함께 사용하면 더욱 편리한 애플리케이션 개발이 가능합니다.

 

이 글은 스프링 부트와 스프링 카프카를 사용하여 카프카와 연결하는 예제 코드를 포함하고 있습니다.

 

1. Maven 또는 Gradle을 사용하여 스프링 카프카 및 카프카 클라이언트를 프로젝트에 추가합니다.

 

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.6</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
</dependency>

 

 

2. 카프카 프로듀서와 컨슈머를 생성할 때 사용할 설정을 정의합니다.

 

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.consumer.group-id}")
    private String consumerGroupId;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        return props;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

 

 

 

3. 카프카 프로듀서를 생성하고 메시지를 전송합니다.

 

@RestController
public class KafkaProducerController {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    public KafkaProducerController(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestBody Message message) {
        kafkaTemplate.send("my-topic", message);
        
        return ResponseEntity.ok("Message sent successfully");
}

 

 

 

 

4. 카프카 컨슈머를 생성하고 메시지를 수신합니다.

 


@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic")
    public void receiveMessage(Message message) {
        // Process the message
        System.out.println("Received message: " + message);
    }

}

 

예제에서는 KafkaConfig 클래스에서 카프카 프로듀서와 컨슈머를 위한 설정을 정의하고,

 

KafkaProducerController 클래스에서 KafkaTemplate을 사용하여 메시지를 전송하고,

 

KafkaConsumerService 클래스에서 @KafkaListener 어노테이션을 사용하여 메시지를 수신하고 처리합니다.

 

이와 같은 방식으로 스프링 프레임워크와 카프카를 연동하여 데이터를 신속하게 처리할 수 있습니다.

반응형

댓글