포시코딩

[Nest.js][동시성 문제] Bull Queue 본문

Node.js

[Nest.js][동시성 문제] Bull Queue

포시 2023. 2. 18. 21:56
728x90

개요

https://4sii.tistory.com/422

 

[동시성 문제] (1) Transaction 사용과 통 Lock 걸어버리기

코드 Back-End: Nest.js - boards.service.ts async joinGroup(boardId: number, userId: number) { const board = await this.getBoard(boardId); const boardJoinInfo = await this.joinRepository.find({ where: { boardId }, }); if (board.joinLimit { if (join.userId

4sii.tistory.com

이전 포스팅에서 이어진다.

 

이전에 고민했던 동시성 문제에 대해 Queue 디자인 패턴을 활용해 

해결할 수 있을 것 같다는 팀 의견이 모아졌다.

 

마침 Nest에서는 Node.js 기반 대기열 시스템 구현에 필요한 @nestjs/bull 패키지를 제공한다고 한다.

기본적으로 Redis 위에서 돌아가는데, 

Redis 사용법에 대해 공부해 놨으니 내가 시도해 보기에도 알맞아 보였다.

 

https://docs.nestjs.com/techniques/queues

 

Documentation | NestJS - A progressive Node.js framework

Nest is a framework for building efficient, scalable Node.js server-side applications. It uses progressive JavaScript, is built with TypeScript and combines elements of OOP (Object Oriented Progamming), FP (Functional Programming), and FRP (Functional Reac

docs.nestjs.com

공식 문서상의 Queues 적용 방법.

 

구현 과정

기본적인 Bull Queue 사용 방법은 아래 포스팅을 참고했고

https://donis-note.medium.com/nestjs-redis-message-queue-구현해보기-4a0da04f57b7

 

[NestJS] Redis Message Queue 구현해보기

Bull Queue를 이용한 메시지큐 튜토리얼

donis-note.medium.com

 

만들어진 Queue 대기열을 통해 게시글 그룹에 join을 순차적으로 진행하게끔 하는 로직은 

아래 GitHub를 참고해 만들었다.

https://github.com/Gosrock/Ticket-Backend-22nd

 

GitHub - Gosrock/Ticket-Backend-22nd: [고스락 티켓 2.0] 홍익대학교 컴퓨터공학과 밴드 고스락 • 티켓 예

[고스락 티켓 2.0] 홍익대학교 컴퓨터공학과 밴드 고스락 • 티켓 예매 서비스. Contribute to Gosrock/Ticket-Backend-22nd development by creating an account on GitHub.

github.com

 

Bull Queue를 통해 Redis에 데이터를 차례대로 넣는 거까진 구현했는데 이걸 어떻게 활용하면 좋을지

감이 안 잡히는 상황에서

 

내가 구현하려는 기능이랑 비슷한 예매 시스템을 구현한 경우에 대해 찾아보면 어떨까 해서

찾아봤더니 다행히 딱 Bull Queue로 예매 시스템을 만든 프로젝트를 찾을 수 있었다.

https://devnm.tistory.com/15

 

[고스락 티켓 2.0] 두번째 프로젝트는 어떻게 달라졌을까요?

우선 고스락 티켓 예매 프로젝트의 목적은 기존 종이티켓으로 표를 팔러다니던 OB시절에서 정산의 어려움이나 공연 홍보의 어려움 또한 비대면 시대로 돌입하면서 학교에 모이는 인원이 적다보

devnm.tistory.com

 

정리하자면 원래 로직은

  1. controller에서 받고
  2. service에 보내 transaction 감독 하에 데이터 저장

이라고 한다면 이번에 바뀐 로직은

 

  1. controller에서 받고
  2. queue에 저장
  3. redis에 대기열 저장
  4. consumer에서 대기 중인 job 가져옴
  5. service에 보내 transaction 감독 하에 데이터 저장

의 과정을 거친다. 

 

코드

boards.module.ts

// ...import는 생략
@Module({
  imports: [
    TypeOrmModule.forFeature([Board, Join]),
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 6379,
      }
    }),
    BullModule.registerQueue({
      name: 'joinQueue'
    })
  ],
  controllers: [BoardsController],
  providers: [BoardsService, JoinConsumer]
})
export class BoardsModule {}

Bull에 대해 Redis 세팅

providers의 JoinConsumer는 밑에서 JoinConsumer class를 추가한 후 추가 작성하면 되는 부분

 

boards.controller.ts

// ...생략
@Post(':id/join2')
async joinGroup2(@Param('id') boardId: number, @Body() body: {userId: string}) {
  const { userId } = body;
  return await this.boardsService.addJoinQueue(boardId, parseInt(userId));
}

요청 path에 대해 데이터를 받은 후 queue 로직이 있는 service의 메서드로 전달한다.

 

boards.service.ts

// ...import 생략
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';

@Injectable()
export class BoardsService {
  constructor(
    @InjectQueue('joinQueue') private joinQueue: Queue,
  ) {}
  
  async joinGroup(boardId: number, userId: number) {
    // ...생략. 그룹 join 로직이 위치하는 곳
    // 아래 addJoinQueue를 거친 후 consumer에 의해 실행되는 메서드이다.
  }

  async addJoinQueue(boardId: number, userId: number) {
    const job = await this.joinQueue.add('join', {
      boardId, userId
    }, 
    // { delay: 1000 }  // delay 주석처리 되있는 부분은 아래 시행착오 부분에서 설명
    );
  }
}

controller에서 호출한 addJoinQueue가 실행된다. 

위의 joinGroup 메서드는 이후 consumer에 의해 실행될 예정

 

join.consumer.ts

import { Process, Processor } from "@nestjs/bull";
import { Job } from "bull";
import { BoardsService } from "./boards.service";

@Processor('joinQueue')
export class JoinConsumer {
  constructor(private readonly boardsService: BoardsService) {}

  @Process('join')
  async getJoinQueue(job: Job) {
    await this.boardsService.joinGroup(job.data.boardId, job.data.userId);
  }
}

controller와 마찬가지로 생성자에서 BoardsService를 DI 해주었다.

위에서 말했듯 Queue에 의해 getJoinQueue() 메서드가 실행되고 job에 담긴 데이터를 

BoardsService의 joinGroup() 메서드로 전달해 원하던 로직을 실행하는 과정이다. 

 

join.consumer 생성 후 맨 위에서 언급했듯 module의 providers에 추가해 주는 것을 잊지 말자.

 

해당 코드는 아래 GitHub를 통해 더 자세히 확인할 수 있다. 

https://github.com/cchoseonghun/nestjs_sample

 

GitHub - cchoseonghun/nestjs_sample: Sample Back-End project using Nest.js. with Front-end using ejs or React.js

Sample Back-End project using Nest.js. with Front-end using ejs or React.js - GitHub - cchoseonghun/nestjs_sample: Sample Back-End project using Nest.js. with Front-end using ejs or React.js

github.com

 

해결해야 될 문제

1. 실패 응답 처리 오류

몇 번의 시도에서 모두 다섯 번의 요청 중 단 두 번의 요청만 받아들여져 DB에 저장되는 걸 확인했다.

하지만 나머지 요청에 대해선 정상적이라면 

throw new ForbiddenException('자리 부족');

위 코드에 의해 자리 부족이라는 응답을 받아야 되는데

 

위 사진과 같이 201 성공 응답을 받고 있다.

delay를 주면 영향이 있을까 했는데

DB에 늦게 들어오는 것만 빼곤 동일한 결과였다.

 

하지만 delay를 1초로 주니까 눈에 띄는 부분이 있었는데

아직 DB에는 들어오지도 않았는데

이미 위 사진과 같이 브라우저 콘솔에 'id: 0~ 5 성공 - status code: 201' 이 찍히는 것이었다.

 

이렇다는 건 일단 5건의 요청에 대해 서버에서 다 받은 다음

대기열에 보내놓고 처리된 거에 대한 응답을 받기도 전에 

service의 joinGroup 메서드를 지나가 201 응답을 하고 있다는 건데

이 부분에 대해서 좀 더 파보면 될 것 같은 느낌이 든다.

 

2. Redis에 쌓인 대기열 데이터 처리

5번씩 요청한 테스트들에 대해 DB 데이터를 지우며 몇 번 시도했었는데

그와 관련해서 쌓인 Redis의 데이터는 그대로 남아있는 걸 발견했다.

 

생각한 대로 구현하려면 대기열에서 나오며 처리된 값에 대해선 

Redis에서도 사라져 대기열이 비워져야 될 텐데

 

이러면 끝도 없이 쌓여 결국 좋지 않은 상황이 발생할 것이다.

 

consumer에서 boardsService에 요청함과 동시에

대기열에서 해당 job을 dequeue 하면 될 것으로 생각되는데 이 방법에 대해선 한 번 알아봐야겠다.

 

문제 해결

Redis에 쌓인 대기열 데이터 처리

Redis에 쌓인 대기열 처리를 먼저 진행하게 되었는데

해결 방법은 의외로 간단했다.

Queue에 add 할 때, 파라미터로 옵션을 줄 수 있는데 해당 옵션을 통해 손쉽게 

과정 종료 후 제거가 가능했다.

 

코드

async addJoinQueue(boardId: number, userId: number) {
  const job = await this.joinQueue.add(
    'join',
    { boardId, userId, },
    { removeOnComplete: true, removeOnFail: true },
  );
}

{ removeOnComplete: true, removeOnFail: true } 옵션을 통해 

성공이든 실패든 제거하게 하면 된다. 

 

테스트 결과 Redis에서 Queue 와 관련해 예전처럼 계속 데이터가 쌓여있지도 않았고

5개 요청 중 2개의 요청만 잘 저장되는 것까지 확인했다.

해결!

 

참고한 곳

https://stackoverflow.com/questions/67378770/how-to-remove-a-delayed-job-from-bull-js

 

how to remove a delayed job from bull js?

I am new to Bull and my use case is to run a job after 10 sec, for that, I am using the below code const options = { delay: 10000, // in ms jobId: myCustomUUID, }; myQueue.add(

stackoverflow.com

 

실패 응답 처리 오류

진행상황 (1)

여러 테스트를 진행해 본 결과

controller -> consumer -> service의 과정을 거칠 때

마지막 service에서 발생하는

throw new Exception()이 제대로 먹히지 않는다는 사실을 발견했다.

 

service.ts

if (board.joinLimit <= boardJoinInfo.length) {
  console.log('자리부족 err: ', boardId, userId);
  throw new ForbiddenException('자리 부족');
}

이미 자리가 꽉 찬 상태에서 참여 요청을 했을 때

consumer를 거치지 않고

controller -> service의 흐름으로 진행하면

 

정상적으로 403 에러를 응답받을 수 있는데

 

consumer를 거쳐 service에서 에러를 발생시키면

위 if문의 조건에 걸려 에러를 throw 했음에도 불구하고

 

status code 201로 응답을 하고 있었다.

 

여기까지 알아낸 사실을 종합하면 consumer를 통해 service에서 발생시키는 에러를

정상적으로 응답하게끔 해야한다는 사실

 

진행상황 (2)

join.consumer.ts

@Processor('joinQueue')
export class JoinConsumer {
  constructor(private readonly boardsService: BoardsService) {}

  @Process('join')
  async getJoinQueue(job: Job) {
    throw new NotFoundException('테스트!');
  }
}

여러 상황을 더 테스트하는 과정에서

consumer에서발생하는 에러가 무시되는게 원인인 걸 알아냈다.

 

관련해서 여러가지로 찾아보다 힌트를 발견했는데

https://github.com/nestjs/bull/issues/1076

여기서 @OnQueueError를 알게 되었고

https://stackoverflow.com/questions/71885898/onqueueerror-not-firing-on-task-queue-nestjs-bull

여기선 @OnQueueFailed를 사용하는걸 알게되었다.

 

해당 핸들러들에 대해 바로 적용해서 테스트

 

join.consumer.ts

// ...import 생략

@Processor('joinQueue')
export class JoinConsumer {
  constructor(private readonly boardsService: BoardsService) {}

  @OnQueueFailed()
  failHandler(job: Job, err: Error) {
    console.log('OnQueueFailed');
    throw err;
  }

  @OnQueueError()
  errorHandler(err: Error) {
    console.log('OnQueueError');
    throw err;
  }

  @Process('join')
  async getJoinQueue(job: Job) {
    return await this.boardsService.joinGroup(job.data.boardId, job.data.userId);
    // 들어가면 throw new ForbiddenException('자리 부족'); 발생
  }
}

예상과 달리 queue에서 발생한 에러에 대해서는 @OnQueueFailed() 데코레이터를 붙인 함수로 들어오고 있었다.

 

몇번의 시도 끝에 @OnQueueFailed()를 붙인 핸들러는 파라미터로 Job, Error를, 

@OnQueueError()를 붙인 핸들러는 Error를 얻는걸 확인할 수 있었는데

그에 대해 위와 같이 코드를 작성 후 에러를 내본 결과

 

failHandler에서 throw 한 에러는 errorHandler에서 받고 있었고, 

errorHandler에서 throw 한 에러는 요청을 보낸 클라이언트로 보내지지 않고 그냥 console에 출력되고 있었다.

 

errorHandler에서 갖고 있는 err를 클라이언트에 전달만 하면 해결될 거 같은데 

따로 출력하는 코드도 없는데 어딘가에서 console에 출력을 시키고 있다.. 이것도 문제고

 

어떻게 클라이언트에 전달할지도 감이 안온다.

큐에 들어간 시점에서 이미 클라이언트와의 연관성이 사라져 버리는걸까..

더 공부가 필요할듯

 

마치며

동시성 문제에 대해 지금까지 이런 상황이 있겠지~ 라고만 생각하고 

어떻게 해결해야 될지는 생각해 본 적이 없었는데

 

막상 구현하게 되니 

알아야 할 것도 많고 까다로운 조건도 많아서 애를 많이 먹었던 것 같다.

 

그래도 그 과정에서 배울 수 있었던 게 한두 가지가 아니라 정말 재밌게 진행할 수 있었는데

혼자만 고민하는 게 아닌

팀원들과 다 같이 하루하루 알아낸 정보들을 교환하며

해결 방법을 추적했던 과정이 있어 더 뜻깊었던 것 같다.

진행상황이 있을때마다 뿌듯해서 슬랙에 마구마구 도배했다.

 

아쉬웠던 건 도구(패키지)의 힘을 많이 빌렸다는 점인데

 

앞으로 비슷하게 새롭게 구현해야 될 기술이 있다면 

자료구조와 알고리즘을 써서

작동방식에 대해 머리를 써 뒤틀어 아름답게 해결할 수 있는 그런 기술이었음 내심 바란다.

 

추가사항

위 포스팅을 작성할 당시 해결하지 못하고 끝났었는데

프로젝트를 진행하며 Bull Queue 적용까지 진행한 상태에서 똑같은 문제를 겪다가

eventEmitter를 통해 해결을 할 수 있었다.

 

관련하여 포스팅한 글을 링크하니 참고하길!

https://4sii.tistory.com/456

 

[Nest.js] 동시성 문제 다루기 with. JMeter, Bull Queue, event-emitter

개요 https://4sii.tistory.com/423 [Nest.js][동시성 문제] Bull Queue - 작성중 개요 https://4sii.tistory.com/422 [동시성 문제] (1) Transaction 사용과 통 Lock 걸어버리기 코드 Back-End: Nest.js - boards.service.ts async joinGroup(bo

4sii.tistory.com

728x90