Post

NestJS에서 AWS SQS 사용하기

  1. AWS SQS를 생성할 수 있다.
  2. NestJS 프로젝트에서 AWS SQS를 활용할 수 있다.

AWS SQS (Simple Queue Service)

Amazon 심플 큐 서비스란?

Amazon Simple Queue Service(Amazon SQS)는 AWS에서 제공하는 보안 호스팅 대기열 (QUEUE) 이다. 보안, 내구성, 가용성, 확장성, 신뢰성, 사용자 지정 등의 특장점을 갖고 있다.

https://choiblog.tistory.com/190 이미지 출처

Amazon SQS 대기열 유형

SQS 유형 표

Amazon SQS 대기열 유형 은 참고차 더했다.

AWS SQS 생성 및 테스트

SQS 만들기

기본값 그대로 두고 일반 큐로 생성해보았다.

SQS test

우상단의 메시지 전송 및 수신 을 이용해서 자유롭게 테스트가 가능하다.

SQS test

NestJS 에서 사용하기

@ssut/nestjs-sqs 라이브러리를 선택했다.

1
npm i @ssut/nestjs-sqs @aws-sdk/client-sqs

편해보여서 항상 썼는데, 알고보니 만드신 분이 전 재직자셔서 좀 신기했던 기억..

1
2
3
4
5
6
# .env.skel
AWS_SQS_QUEUE_NAME=
AWS_SQS_QUEUE_URL=
AWS_REGION='ap-northeast-2'
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=

AWS_ACCESS_KEY_ID은 IAM에서 사용자를 생성한 후 발급이 가능합니다. (정책은 AmazonSQSFullAccess만 부여)
다만 AWS에서는 이렇게 iam user의 access key 인증은 지양하기를 권장합니다. 액세스 키 보안

consumer (소비자)

consumer는 대기열에 자신이 처리할 메시지기 있는지 확인하고 메시지를 가져온다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// app.module.ts
@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
    }),
    SqsModule.registerAsync({
      imports: [ConfigModule],
      inject: [ConfigService],
      useFactory: (configService) => ({
        consumers: [
          {
            name: configService.get('AWS_SQS_QUEUE_NAME'),
            queueUrl: configService.get('AWS_SQS_QUEUE_URL'),
          },
        ],
        producers: [],
      }),
    }),
  ],
  providers: [AppConsumer],
})
export class AppModule {}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// app.consumer.ts
const QUEUE = process.env.AWS_SQS_QUEUE_NAME || '';

@Injectable()
export class AppConsumer {
  @SqsMessageHandler(QUEUE)
  async messageHandler(message: Message) {
    console.log(message);
    if (!message?.Body) {
      console.error('No message body');
      return message;
    }
    const body: object | null = JSON.parse(message.Body); // 실제 값 사용 부분
    return body;
  }
}

큐로부터 넘어오는 값 원본은 아래와 같다. console.log(message);

1
2
3
4
5
6
{
  Body: '{"test":"test"}',
  MD5OfBody: '828bcef8763c1bc616e25a06be4b90ff',
  MessageId: 'feb95320-eed8-4688-aa38-5f9b1f7b912e',
  ReceiptHandle: 'AQEBfD0NZ7S+gNoO6Mm6KC1IQZoyiZZCEHvwy3mtd6caLbfCAkJVz2izX6hYnpWomUZTRDZ1zbSRFVHILoeQNYVEq7PzlWzPKR00739ymYtf3UKbNqBxpA+8KK365MUdG4N2jjmuEYKhp84jMPVQlq0YQPPtSx530gWznTQiIhAvM8h+OQw/n9h+m7xSIB+5rXtlSuvCrnt67W4aNenXHoVbCpV9RnrYqChfddDG48F4gJrOTbVUz2EctfzNY3K9ALU7qnd6gr89rFVBjrwUV6LqJU55Ke3Mf+Ob8l+mGkzc2fgY1hxg/LTrQbQOvAJyK0A1Q5VDDFByBoTaR1Q1I++bFY7TunCWtVov55Y0BKfTb4K28gJpS0fcF4/dEZShJO4L6+nJcBDQ3a3jTUYBizeV0Q=='
}

실제로는 JSON.parse(message.Body); 를 해서 사용해야 한다.

Producer (생산자)

Producer 는 SQS 대기열로 메시지를 전송한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// app.module.ts

@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
    }),
    SqsModule.registerAsync({
      imports: [ConfigModule],
      inject: [ConfigService],
      useFactory: (configService) => ({
        consumers: [],
        producers: [
          {
            name: configService.get('AWS_SQS_QUEUE_NAME'),
            queueUrl: configService.get('AWS_SQS_QUEUE_URL'),
          },
        ],
      }),
    }),
  ],
  providers: [AppConsumer],
})
export class AppModule {}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// app.service.ts

export class AppService {
  constructor(
    private readonly configService: ConfigService,
    private readonly sqsService: SqsService,
  ) {}

  async sendSqsMessage() {
    const queueName = this.configService.get('AWS_SQS_QUEUE_NAME');
    let result: SendMessageBatchResultEntry[];

    try {
      result = await this.sqsService.send(queueName, {
        id: uuid(), // make unique id
        body: {
          test: 'test',
        },
      });
      console.log(result);
      return result;
    } catch (err) {
      console.error(err);
    }
  }
  }
}
1
2
3
4
5
6
7
8
9
10
// app.controller.ts

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}
  @Get('test')
  sendSqs() {
    return this.appService.sendSqsMessage();
  }
}
1
2
curl localhost:3000/test
[{"Id":"45ceec65-5592-4a00-b4ac-627806246ee2","MD5OfMessageBody":"828bcef8763c1bc616e25a06be4b90ff","MessageId":"8f760bc7-da41-4553-993c-0ec4ed9a99e4"}]%               

SQS 수신

MessageId8f760bc7-da41-4553-993c-0ec4ed9a99e4 를 메시지 수신ID 에서 찾을 수 있다.

SQS 수신2

보낸 내용도 동일함을 확인한다.

마무리

입사후 가장 먼저 구현한 기능이 SQS라, 각별히 기억에 남는다. 그 작업 덕분에 다른 작업의 worker 구현에서도 유용하게 써먹을 수 있었다. 다음 번에는 다른 메시지 브로커를 활용해보고싶다..!

This post is licensed under CC BY 4.0 by the author.

Trending Tags