포시코딩

[Nest.js] 동시성 문제 다루기 with. JMeter, Bull Queue, event-emitter 본문

Node.js

[Nest.js] 동시성 문제 다루기 with. JMeter, Bull Queue, event-emitter

포시 2023. 3. 8. 10:37
728x90

개요

https://4sii.tistory.com/423

 

[Nest.js][동시성 문제] Bull Queue - 작성중

개요 https://4sii.tistory.com/422 [동시성 문제] (1) Transaction 사용과 통 Lock 걸어버리기 코드 Back-End: Nest.js - boards.service.ts async joinGroup(boardId: number, userId: number) { const board = await this.getBoard(boardId); const boar

4sii.tistory.com

위 글에서 이어진다.

프로젝트 시작 전 최대한 생길 수 있는 문제에 대해 기술적인 해답을 얻고 들어가기로 했으나

내가 담당했던 동시성 문제는 끝내 완벽하게 해결하지 못했었다.

 

동시성 문제를 고려하지 않은 관련 기능이 완성됨에 따라

원래 계획이었던 동시성 문제가 발생할 수 있는 상황에 대해 재현해보고

Bull Queue를 적용해 해결 방법을 찾아보자.

 

동시성 문재 재현 세팅

https://4sii.tistory.com/428

 

[동시성 문제] Apache JMeter를 이용한 테스트

개요 https://4sii.tistory.com/422 [동시성 문제] (1) Transaction 사용과 통 Lock 걸어버리기 코드 Back-End: Nest.js - boards.service.ts async joinGroup(boardId: number, userId: number) { const board = await this.getBoard(boardId); const boar

4sii.tistory.com

 

먼저 테스트하려는 환경을 세팅해보자.

위처럼 참여하기가 있는 기능에 대해

각기 다른 아이디로 로그인한 사람들이 동시에 누른다고 가정할 것이다.

 

유저 목록은 위와 같고, 참여 인원이 3명 까지지만 주최자로 한명이 들어가기 때문에 

참여 가능 인원은 2명. 여기서 test용 계정 5개가 동시에 눌러

2명만 참여에 성공하고 나머지 3명은 실패하게끔 하는게 목표이다.

 

참여를 시도하면 DB join 테이블에 meetupId 58에 대해 주최자 id인 13을 제외한 

테스트 계정들의 idrk 쌓일 것이므로 이것을 통해 확인할 수 있다.

(혹은 모임 상세 조회에서 58번에 대해 API 조회)

 

테스트에는 JMeter를 사용할 것이며 각기 다른 유저라는 것에 대한 증명은

쿠키를 직접 세팅해 로그인한 것과 동일하게 만들어 진행

 

쿠키는 로그인 API를 통해 직접 얻어서 사용했다.

 

동시성 문제 재현 (시행착오)

위에서 세팅한 결과로 동시성 문제가 재현되야 됐는데 그냥 정상적으로 처리되는 문제가 있었다.

알고보니 각 테스트를 Thread 별로 진행했어야 했는데

한 Thread 안에서 테스트를 진행하고 있었기 때문에 발생한 문제였다. (진짜 한참 찾음)

이러니 당연히 순서대로 요청하지 ..

 

이렇게 만들면서 listener는 View Results Tree만 있으면 될 것 같아 나머지는 지워버렸다.

다시 테스트를 해보니

 

성공적으로 기존 userId가 13인 유저를 제외하고 2명만 더 추가되어야 하는 상황에서

3명이 추가된 것을 볼 수 있었다.

transaction을 포함한 코드도 마찬가지로 제한된 인원을 넘어서는 문제를 확인할 수 있었는데

비로소 내가 원하던 상황을 맞이할 수 있었다.

 

이제 Bull Queue를 적용해보자.

 

Bull Queue 적용

queue.consumer.ts

import { Process, Processor } from "@nestjs/bull";
import { Job } from "bull";
import { MeetupsService } from "./meetups.service";

@Processor('joinQueue')
export class QueueConsumer {
  constructor(private readonly meetupsService: MeetupsService) {}

  @Process('addJoinQueue')
  async handleAddJoinQueue(job: Job) {
    return await this.meetupsService.addJoin(job.data.meetupId, job.data.userId);
  }
}

redis 대기열에 넣은 걸 차례대로 꺼내는 consumer 부터 만들어준다. 

대기열에 실행하지 않은 job이 있다면 꺼내서 

job을 만들 때 등록한 meetupId, userId를 갖고

아래에서 나올 meetupsService.addJoin() 메서드를 실행하는 코드이다.

 

meetups.service.ts

import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';

@Injectable()
export class MeetupsService {
  constructor(
    @InjectQueue('joinQueue') private joinQueue: Queue
  ) {}

  async addJoinQueue(meetupId: number, userId: number) {
    await this.joinQueue.add(
      'addJoinQueue',
      { meetupId, userId },
      { removeOnComplete: true, removeOnFail: true },
    );
  }
  
  async addJoin(meetupId: number, userId: number) {
    const join = await this.getJoin(meetupId, userId);
    if (!_.isNil(join)) {
      throw new ConflictException(`이미 참여하고 있는 유저입니다.`);
    }
    const meetup = await this.getMeetup(meetupId);
    if (meetup.headcount <= meetup.joins.length) {
      throw new ForbiddenException('정원이 다 찼습니다.');
    }
    await this.joinRepository.insert({
      meetupId, userId
    });
  }

이제 controller에서 위 service의 메서드로 진행되게 한 후 

위 메서드에 도착하면 joinQueue.add를 통해 addJoinQueue 라는 대기열에 

{ meetupId, userId }

를 job으로 만들어 등록한다. 여기서

{ removeOnComplete: true, removeOnFail: true },

위 옵션을 추가해줘야 대기열에서 빠져나가 실행된 후

redis에 남아있지 않게 된다.

 

meetups.module.ts

import { BullModule } from '@nestjs/bull';
// ...다른 import 생략

@Module({
  imports: [
    BullModule.registerQueue({
      name: 'joinQueue'
    }),
  ],
  controllers: [MeetupsController],
  providers: [MeetupsService, MeetupsRepository, QueueConsumer]
})
export class MeetupsModule {}

module에도 사용할 queue를 BullModule에 등록해주고 

providers에 QueueConsumer를 등록해준다.

 

app.module.ts

import { BullModule } from '@nestjs/bull';

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 6379,
      }
    }),
    
// ...생략

app.module에서는 BullModule에 대한 세팅을 등록해주면 끝이다.

 

결과 확인

적용 후 몇 번의 테스트 과정을 거치면서 확인해보면

DB에는 원하던대로 단 두개의 데이터만 들어오지만 

들어간(성공한) 요청을 제외한 요청도 201 성공 응답을 받고 있다. 

 

위 사진에서도 15, 16이 성공한 요청인데 18 요청도 201 응답을 받은걸 볼 수 있다. 

하나하나 클릭 후 보여줘야 해서 18 하나만 올렸는데 14, 17도 마찬가지로 201 응답을 받았다.

 

이에 대한 원인은 클라이언트에 대한 요청이 service에서 addJoinQueue로 들어가

queue 대기열에 등록하는 순간 로직이 끝나 그대로 성공 응답을 반환하게 되는 것이다.

 

대기열에서 나온 job이 addJoin 메서드를 실행하고 Exception을 throw 해도

이미 응답을 리턴했기 때문에 길을 잃는 것이다.

 

여기까지가 이전에도 Bull Queue를 연습하며 겪었던 문제의 막다른 길이었고

이번에도 똑같이 한참 헤매면서 해결 방법 모색했다.

 

팀원들과의 회의를 통해 다양한 의견이 나왔는데 다음과 같다.

  1. 요청하는 순간 socket을 연결하고 응답받은 결과를 버리고
    consumer에서 실행한 메서드가 응답하는 결과를 socket으로 받아들이는 방법
  2. setTimeOut을 통해 응답을 보내지 않고 있다가 consumer에서 실행한 메서드가 응답을 보내게..
    안될 것 같지만 일단 나온 의견 중 하나
  3. eventEmitter 사용

이중에서 eventEmitter가 우리가 원하던 기능의 역할을 할 수 있을 것이라 판단했는데

쉽게 설명하자면 메서드와 메서드를 socket 통신처럼 연결할 수 있게 해주는 기능이라고 보면 된다.

 

그래서 eventEmitter를 써보기로 결정되었고

앵귤러에서 eventEmitter를 써본 팀원의 도움을 통해 진행할 수 있었다.

 

EventEmitter

EventEmitter를 사용하며 바뀔 로직은 다음과 같다.

  1. queue에 넣고 setTimeOut 등으로 대기시키면서 eventEmitter의 addListener를 통해 받을 준비
  2. consumer에서 실행된 addJoin 메서드에서 성공하거나 에러가 발생할 경우
    eventEmitter의 emit을 통해 addListener가 받을 수 있게 전달

 

설치

npm i @nestjs/event-emitter

 

코드

app.module.ts

import { EventEmitterModule } from '@nestjs/event-emitter';

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 6379,
      }
    }),
    EventEmitterModule.forRoot({
      global: true
    }),
    
// ...생략

app.module에 eventEmitter를 세팅

 

meetups.service.ts

import { EventEmitter2 } from '@nestjs/event-emitter';

@Injectable()
export class MeetupsService {
  constructor(
    @InjectQueue('joinQueue') private joinQueue: Queue,
    private eventEmitter: EventEmitter2
  ) {}
    
  async addJoinQueue(meetupId: number, userId: number) {
    const eventName = `finishJoin-${userId}-${Math.floor(Math.random() * 89999) + 1}`
    await this.joinQueue.add(
      'addJoinQueue',
      { meetupId, userId, eventName },
      { removeOnComplete: true, removeOnFail: true },
    );
    return this.waitFinish(eventName, 2);
  }

  private waitFinish(eventName: string, sec: number) {
    return new Promise((resolve, reject) => {
      const wait = setTimeout(() => {
        this.eventEmitter.removeAllListeners(eventName);
        resolve({ 
          message: '다시 시도해주세요.',
        });
      }, sec * 1000);
      const listenFn = ({ success, exception }: { success: boolean, exception?: HttpException }) => {
        clearTimeout(wait)
        this.eventEmitter.removeAllListeners(eventName);
        success ? 
          resolve({ message: '참여 성공' }) : 
          reject(exception);
      };
      this.eventEmitter.addListener(eventName, listenFn);
    });
  }

  async addJoin(meetupId: number, userId: number, eventName: string) {
    try {
      const join = await this.getJoin(meetupId, userId);
      if (!_.isNil(join)) {
        throw new ConflictException(`이미 참여하고 있는 유저입니다.`);
      }
      const meetup = await this.getMeetup(meetupId);
      if (meetup.headcount <= meetup.joins.length) {
        throw new ForbiddenException('정원이 다 찼습니다.');
      }
      await this.joinRepository.insert({
        meetupId, userId
      });
      return this.eventEmitter.emit(eventName, { success: true });
    } catch (err) {
      return this.eventEmitter.emit(eventName, { success: false, exception: err });
    }
  }

 

queue.consumer.ts

import { Process, Processor } from "@nestjs/bull";
import { Job } from "bull";
import { MeetupsService } from "./meetups.service";

@Processor('joinQueue')
export class QueueConsumer {
  constructor(private readonly meetupsService: MeetupsService) {}

  @Process('addJoinQueue')
  async handleAddJoinQueue(job: Job) {
    return await this.meetupsService.addJoin(job.data.meetupId, job.data.userId, job.data.eventName);
  }
}

바뀐 코드는 이렇게가 끝이다.

consumer에 eventName이 추가되었고

 

addJoinQueue에서 대기열에 job을 추가시킨 후 waitFinish() 함수를 실행시키는데

여기서 eventEmitterListener로 emit을 받게끔 대기시킨다.

 

consumer에서 실행될 addJoin에서 성공하거나 Exception이 발생해 실패하면

eventEmitter.emit()이 실행되어 위의 eventEmitterListener에서 결과를 받게 되고

 

이걸 다시 addJoinQueue에서 받아 클라이언트로 return 하는 구조이다.

 

정리

이렇게 길고 긴 동시성 문제와의 씨름이 끝났다.

정말 시간도 많이 걸렸고 참고한 코드도 많았는데

결국 해결한건 팀원끼리 머리를 맞대서 나온 아이디어란 점에서 너무 뿌듯하게 끝난 것 같다.

 

이번 문제를 해결하면서 내가 새로 알게된 사실은

모르는 부분에 대해 도와주는 것은

구글링도, 다른 잘하는 외부 개발자도 아닌 팀원이라는 것이다.

 

이러한 사실을 생각하며

나도 그들에게 그러한 팀원이 될 수 있게 계속 성장하며 도움을 주는 사람이 되도록 노력해야겠다.

 

 

* 추가로 위 코드만으로 정보가 부족하다면 아래 GitHub repo를 통해 확인할 수 있다.

https://github.com/chalkak2023/Chalkak-Backend

 

GitHub - chalkak2023/Chalkak-Backend

Contribute to chalkak2023/Chalkak-Backend development by creating an account on GitHub.

github.com

728x90