일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- 게임
- AWS
- flask
- dfs
- class
- TypeScript
- mongoose
- Sequelize
- Bull
- Express
- 정렬
- Queue
- JavaScript
- 공룡게임
- typeORM
- nestjs
- MongoDB
- Nest.js
- OCR
- Dinosaur
- MySQL
- nodejs
- Python
- react
- cookie
- 자료구조
- GIT
- game
- jest
- Today
- Total
포시코딩
[Nest.js] 모임 참여 - 동시성 문제 해결 과정 정리 with. Bull & Event-Emitter 본문
개요
이전에 작성됐던 동시성 문제 관련 글들은 적용하며 겪은 시행착오에 대한 내용이라면
이번 글은 성공적으로 완성된 모임 참여 로직에 대한 전체적인 설명이 내용이 되겠다.
흐름을 돕기 위해 코드 중간중간 console.log()로 설명을 추가했다.
들어가기 앞서
이제부터 코드를 볼건데
코드만 보면 이해하기 어렵기 때문에 이해를 돕기 위해 그림으로도 그려봤다.
보다가 흐름이 이해가 가지 않는다면 위로 올라와 어떤 메서드에서 어떤 흐름으로 진행되고 있는지
한번 보고나서 코드를 보면 도움이 될 것이다.
클라이언트의 참여 요청
meetups.Controller.ts
@Post(':meetupId/join')
@UserGuard
async addJoin(@Param('meetupId') meetupId: number, @InjectUser() userDTO: decodedAccessTokenDTO) {
console.log('1. controller에서 service의 addJoinQueue 메서드 호출');
return await this.meetupsService.addJoinQueue(meetupId, userDTO.id);
}
제일 먼저 클라이언트는 위 컨트롤러를 향해 API 요청을 보낸다.
클라이언트에서 로그인 상태로 /api/meetups/{모임id}/join 에 대해 POST 요청을 보내면
모임 id인 meetupId, 사용자 로그인 정보인 userDTO에 각각 데이터가 담겨
service의 addJoinQueue로 meetupId와 userDTO.id를 전달한다.
잠시 짚고 넘어가는 module 세팅
meetups.module.ts
@Module({
imports: [
TypeOrmModule.forFeature([Meetup, Join]),
BullModule.registerQueue({
name: 'joinQueue'
}),
],
controllers: [MeetupsController],
providers: [MeetupsService, MeetupsRepository, QueueConsumer]
})
export class MeetupsModule {}
service로 넘어가기 전
service의 생성자에서 나올 joinQueue에 대해 잠깐 짚고 넘어가자.
joinQueue는 Bull을 세팅하며 등록한 queue에 대한 이름이다.
그 밖에도 대기열에 새로 등록된 Job을 감지해 꺼내오는 QueueConsumer도
provider로 세팅되고 있는 것을 볼 수 있다.
Service 내부 로직
meetups.service.ts - addJoinQueue
constructor(
@InjectQueue('joinQueue') private joinQueue: Queue,
private eventEmitter: EventEmitter2
) {}
async addJoinQueue(meetupId: number, userId: number) {
console.log('2. addJoinQueue 진입');
const eventName = `finishJoin-${userId}-${Math.floor(Math.random() * 89999) + 1}`
console.log('3. joinQueue에 Job 추가');
await this.joinQueue.add(
'addJoinQueue',
{ meetupId, userId, eventName },
{ removeOnComplete: true, removeOnFail: true },
);
console.log('4. waitFinish 호출');
return this.waitFinish(eventName, 2);
}
service에 넘어와 임의의 eventName을 세팅한 후
joinQueue를 통해 addJoinQueue라는 이름의 대기열에
{ meetupId, userId, eventName }
위 세개의 값을 세팅해준다.
이후부터 대기열에 추가된 위 데이터는 Job이라는 이름으로 지칭한다.
{ removeOnComplete: true, removeOnFail: true }
그 후 removeOnComplete, removeOnFail에 대한 옵션을 세팅하는 것을 볼 수 있는데
이후 대기열에서 Job을 처리할 때
처리했음에도 그대로 Redis에 쌓여있는걸 방지하기 위함이다.
return this.waitFinish(eventName, 2);
대기열 큐에 Job을 넣은 후 service 내부에서 waitFinish 메서드를 호출한다.
meetups.service.ts - waitFinish
private waitFinish(eventName: string, sec: number) {
console.log('5. waitFinish 진입');
return new Promise((resolve, reject) => {
console.log('6. promise 진입');
const wait = setTimeout(() => {
console.log('** setTimeout 진입');
this.eventEmitter.removeAllListeners(eventName);
resolve({
message: '다시 시도해주세요.',
});
}, sec * 1000);
const listenFn = ({ success, exception }: { success: boolean; exception?: HttpException }) => {
console.log('8. listenFn 진입');
clearTimeout(wait);
this.eventEmitter.removeAllListeners(eventName);
success ? resolve({ message: '참여 성공' }) : reject(exception);
};
console.log('7. this.eventEmitter.addListener 세팅');
this.eventEmitter.addListener(eventName, listenFn);
});
}
호출된 waitFinish는 위와 같다.
waitFinish가 호출되면 Promise가 생성되는데
여기서 약간 헷갈릴 수 있다.
쉽게 설명하자면 Promise 내부는 크게 세가지로 구분할 수 있다.
1. wait (setTimeOut)
2. listenFn (콜백함수)
3. this.eventEmitter.addListener (이벤트리스너)
들어오자마자 전달받은 sec의 2가 wait으로 들어와 2초짜리 setTimeOut 함수가 설정된다.
그와 별개로 listenFn 가 선언되어
this.eventEmitter에 전달받은 eventName에 대해 콜백함수로 세팅된다.
이 상태로 eventName으로 이벤트가 발생하면 이벤트 호출과 같이 전달받은 데이터를 받아
resolve 또는 reject 하거나
2초동안 이벤트를 전달받지 못하면 setTimeOut에 의해
resolve({
message: '다시 시도해주세요.',
});
위 코드가 실행될 것이다.
자 이제 waitFinish 함수는
1. event를 기다리거나
2. setTimeOut에 의해 2초를 기다리거나
둘 중 하나의 조건을 만족할 때 까지 '대기' 하는 상태가 된다.
Consumer
Bull을 사용할 때 Consumer를 세팅할 수 있는데
Consumer란 큐에 들어있는 작업을 처리하는 컴포넌트이다.
큐에 새로운 작업이 추가되면, Consumer는 해당 작업을 가져와 처리한다.
meetups.consumer.ts
@Process('addJoinQueue')
async handleAddJoinQueue(job: Job) {
console.log('*1 handleAddJoinQueue 진입');
return await this.meetupsService.addJoin(job.data.meetupId, job.data.userId, job.data.eventName);
}
Consumer의 로직은 간단하다.
queue에 쌓인 Job들을 FIFO(First In First Out)로 가져와 meetupsService의 addJoin 메서드를 호출한다.
다시 Service로
meetups.service.ts - addJoin
async addJoin(meetupId: number, userId: number, eventName: string) {
console.log('*2 addJoin 진입');
try {
const join = await this.getJoin(meetupId, userId);
if (!_.isNil(join)) {
throw new ConflictException(`이미 참여하고 있는 유저입니다.`);
}
const meetup = await this.getMeetup(meetupId);
if (meetup.headcount <= meetup.joins.length) {
throw new ForbiddenException('정원이 다 찼습니다.');
}
await this.joinRepository.insert({
meetupId, userId
});
return this.eventEmitter.emit(eventName, { success: true });
} catch (err) {
return this.eventEmitter.emit(eventName, { success: false, exception: err });
}
}
addJoin 메서드는 평소에 보던 메서드와 같다.
한가지 다른점은 참여 성공/실패의 여부를 확인 후 리턴할 때
eventEmitter를 통해 이벤트를 emit한다는 점이다.
덕분에 성공이든 실패든
위에서 기다리고 있던 waitFinish 메서드의 이벤트리스너가 이벤트를 전달받게 되며
클라이언트까지 무사히 응답을 보낼 수 있게 되는 것이다.
정리
위에서 일부러 흐름을 설명하기 위해 console.log를 적었는데
참여 성공 시 아래와 같이 출력된다.
근데 보면 setTimeOut이 실행되는 console.log가 보이지 않을 것이다.
2초안에 이벤트가 잘 전달됐기 때문에 실행되지 않은 것.
async addJoin(meetupId: number, userId: number, eventName: string) {
console.log('*2 addJoin 진입');
try {
const join = await this.getJoin(meetupId, userId);
if (!_.isNil(join)) {
throw new ConflictException(`이미 참여하고 있는 유저입니다.`);
}
const meetup = await this.getMeetup(meetupId);
if (meetup.headcount <= meetup.joins.length) {
throw new ForbiddenException('정원이 다 찼습니다.');
}
await this.joinRepository.insert({
meetupId, userId
});
// return this.eventEmitter.emit(eventName, { success: true });
} catch (err) {
// return this.eventEmitter.emit(eventName, { success: false, exception: err });
}
}
만약 대기열이 밀려 2초 넘게 이벤트가 전달되지 않거나
일부러 위 코드와 같이 이벤트를 전달하지 않는다면
아래와 같이 로그가 출력될 것이다.
이렇게 Bull을 사용해 대기열 큐를 만들어
동시 다발적인 참여 요청을 한줄로 세워 처리할 수 있게 되었고
메시지 큐에 job을 넣는 과정에서 정상적인 응답을 클라이언트로 전달하지 못해 발생한 문제도
Event-Emitter를 통해 해결할 수 있게 되었다.
* 설명이 부족한 부분이 있거나 잘못된 부분이 있다면 댓글이나 메일 부탁드립니다.
'Node.js' 카테고리의 다른 글
4월3일 - Nest.js의 Class와 Factory (0) | 2023.04.04 |
---|---|
[Nest.js] Gateway와 socket.io (0) | 2023.04.02 |
[Nest.js] CI/CD (0) | 2023.03.28 |
[Nest.js, React] socket.io를 통한 실시간 채팅 구현 (포스팅 추천) (0) | 2023.03.22 |
[Nest.js] 쿠키를 받지 못하는 문제 (상용 서버) (0) | 2023.03.13 |