Spring RabbitMQ 적용하기

2025. 3. 30. 17:44·framework/spring

RabbitMQ는 docker를 이용해 container를 띄우고 테스트했습니다.

docker run -d --hostname my-rabbit \
  --name rabbitmq \
  -p 5672:5672 -p 15672:15672 \
  rabbitmq:3-management

 

Spring에서 RabbitMQ를 사용하기 위해서는 먼저 의존성을 추가해주어야 합니다.

implementation 'org.springframework.boot:spring-boot-starter-amqp'

 

RabbitMQ에는 exchange와 queue, binding이 있습니다. exchange는 producer로부터 받은 데이터를 큐에 전달하는 역할을 합니다.

이제 application.yml과 RabbitMQConfig에서 하나씩 설정을 해주면 됩니다.

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest1
    password: password
  application:
    name: HelloMessageQueue

간단하게 rabbitmq에 대한 연결 정보만을 설정해주었습니다. 추가로 virtual-host나 timeout, heartbeat 등을 설정해줄 수 있습니다.

 

Producer

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    public static final String QUEUE_NAME = "first-queue";

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, false);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(messageConverter);
        template.setMandatory(true);
        template.setReturnsCallback(returned -> {
            log.warn("Message returned: {}, Reply code: {}, Reply text: {}, Exchange: {}, Routing key: {}",
                    returned.getMessage(),
                    returned.getReplyCode(),
                    returned.getReplyText(),
                    returned.getExchange(),
                    returned.getRoutingKey());
        });

        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("Message sent successfully: {}", correlationData);
            } else {
                log.error("Message failed to send: {}, cause: {}", correlationData, cause);
            }
        });

        return template;
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

 

하나의 큐를 설정해주고 별도의 exchange를 설정해주지 않았습니다.

큐의 첫 번째 인자는 큐의 이름을 정의하고, 두 번째 인자는 내구성(durable)을 정의합니다. true인 경우, 영속성(Persistent)을 가지므로 RabbitMQ를 재시작해도 메세지가 저장된 상태로 남습니다. false인 경우, RabbitMQ를 재시작하는 경우 메세지가 사라지는 Transient 큐로 정의합니다.

 

RabbitMQ는 큐만 정의하고 exchange에 큐를 매핑하지 않은 경우 사용할 default exchange를 제공해줍니다. default exchange는 direct exchange 방식으로 설정된 routing key와 동일한 큐로 메세지를 전달하게 됩니다.

RabbitTemplate에서 제공하는 DEFAULT_EXCHANGE

 

RabbitTemplate은 application.yml의 설정을 ConnectionFactory를 통해 받아오지만 직렬화 설정이나 메세지 성공/실패에 대한 콜백은 @Configuration에서 해주어야 합니다. RedisTemplate이나 다른 Template처럼 비슷하게 설정해줄 수 있습니다.

 

public record MessageDto(String title, String content) {}
@RestController
@RequestMapping("/api/messages")
@RequiredArgsConstructor
public class MessageController {

    private final Producer producer;

    @PostMapping
    public ResponseEntity<ResponseDto> postMessage(@RequestBody MessageDto dto) {
        producer.send(dto);
        return ResponseEntity.ok(ResponseDto.builder()
                .success(true)
                .build());
    }
}
@Component
@RequiredArgsConstructor
public class Producer {

    private final RabbitTemplate rabbitTemplate;

    public void send(MessageDto dto) {
        rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_NAME, dto);
        System.out.println("Sent <" + dto + ">");
    }
}

api를 호출하면 controller는 Dto를 Producer에게 넘기게 됩니다. Producer는 config에서 설정한 Jackson2JsonMessageConverter를 이용해 직렬화한 값을 메세지 큐로 보내게 됩니다.

 

Consumer

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class RabbitMQConfig {

    public static final String QUEUE_NAME = "first-queue";

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, false);
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
                                                                         MessageConverter messageConverter) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter);
        return factory;
    }
}

Consumer 설정에서는 메세지 큐로 메세지를 보내기 위한 RabbitTemplate을 정의하지 않습니다. 대신, Consumer 설정에서는 ListenerContainerFactory를 정의해줍니다. @RabbitListener가 붙은 메서드들이 메세지를 어떻게 받을지를 설정하는 컨테이너를 생성해주는 팩토리입니다. 여기에 설정된 값은 @RabbitListener 애노테이션이 붙은 메서드들에 적용됩니다. 즉, 받은 메세지의 값은 역직렬화하기 위해 설정을 해주어야 합니다.

 

@Slf4j
@Component
public class Consumer {

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void receiveMessage(MessageDto dto) {
    	log.info("Received <{}>", dto);
    }
}

Consumer는 받은 메세지를 출력하도록 해줍니다.

 

생성한 API 호출
Sent = Producer, Received = Consumer

API를 호출하면 Producer가 메세지를 RabbitMQ로 보내고, Consumer가 정상적으로 값을 가져오는 것을 확인할 수 있습니다.

 

GUI 모니터링 툴

15672 port로 접속했을 때의 모니터링 화면

RabbitMQ의 좋은 점 중 하나는 GUI를 이용해 현재 브로커의 상태나 큐, exchange를 모니터링할 수 있다는 점입니다. 처음에 docker로 RabbitMQ를 띄웠을 때, 포트를 하나 더 열어놨는데 브라우저에서 해당 포트로 접속하게 되면 GUI 화면을 확인할 수 있습니다.

 

default exchange와 first-queue를 확인했을 때 상태

default exchange를 확인하면 정상적으로 메세지가 들어온 것을 확인할 수 있습니다. direct로 설정되어 있기 때문에 default exchange는 메세지를 send했을 때의 큐로 메세지를 라우팅하게 됩니다.

 

마찬가지로, first-queue를 확인하면 정상적으로 메세지가 큐로 라우팅된 것을 확인할 수 있습니다. queue에는 Ready, Unacked, Total이 있습니다.

  1. Ready : 큐에 메세지가 정상적으로 저장된 상태, 아직 consumer에게 전달되지 못한 상태를 나타냅니다.
  2. Unacked : consumer에게 메세지를 전송했지만, 아직 consumer로부터 ack를 받지 못한 상태입니다. 즉, consumer가 처리하고 있는 상태를 나타냅니다.
  3. Total : Ready와 Unacked에 있는 메세지 개수의 총합을 나타냅니다.

Ready의 값이 높다면 consumer의 수를 늘리거나 consumer의 메세지 처리 속도를 높여주어야 합니다.

Unacked의 값이 높다면 consumer의 처리 로직이나 네트워크 연결 상태를 확인해주어야 합니다.

'framework > spring' 카테고리의 다른 글

RabbitMQ의 Exchange  (0) 2025.03.30
Spring DispatcherServlet은 왜 필요할까?  (1) 2025.02.17
'framework/spring' 카테고리의 다른 글
  • RabbitMQ의 Exchange
  • Spring DispatcherServlet은 왜 필요할까?
koosco
koosco
웹 개발 공부하고 있어요 :)
  • koosco
    koosdata
    koosco
  • 전체
    오늘
    어제
    • ROOT (28)
      • WEB (2)
      • language (4)
        • java (3)
        • pug (1)
      • framework (3)
        • spring (3)
        • react (0)
      • database (1)
        • mysql (1)
      • DevOps (4)
        • Linux (4)
      • Error Log (2)
      • 독서 (3)
      • Computer Science (9)
        • Data Structure (2)
        • Network (2)
        • Design Pattern (3)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    React
    자료구조
    amqp
    독서
    Container
    Network
    DATABASE
    WSL
    가상면접 사례로 배우는 대규모 시스템 설계
    Java
    pug
    HTML
    linux
    rabbitmq
    Spring
    DNS
    github pages
    mysql
    docker
    AWS
    vercel
    Design Pattern
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
koosco
Spring RabbitMQ 적용하기
상단으로

티스토리툴바