RxJS 정리
RxJS란?
RxJS는 Reactive Extensions For JavaScript 라이브러리입니다.
Reactive Extensions는 ReactiveX 프로젝트에서 출발한 리액티브 프로그래밍을 지원하기 위해 확장했다는 뜻입니다.
ReactiveX는 Observer 패턴, Iterator 패턴, 함수형 프로그래밍을 조합하여 제공합니다.
Observer 패턴이란?
객체의 상태 변화를 관찰하는 관찰자들, 즉 옵저버들의 목록을 객체에 등록하여 상태 변화가 있을 때마다 메서드 등을 통해 객체가 직접 목록의 각 옵저버에게 통지하도록 하는 디자인 패턴입니다.
위의 정의만 보면 이해하기 힘들수 있는데 간단히 말하자면, 어떤 객체의 상태가 변할 때 그와 연관된 객체들에게 알림을 보내는 디자인 패턴이라고 할 수 있습니다. 예를 들면, 자바스크립트의 addEventListener 메서드와 같이 이벤트를 전파하는 방식이라고 생각할 수 있습니다. 즉, 특정 이벤트를 관찰할 옵저버(리스너)를 등록한 후 해당 이벤트가 발생할 때 알려주는 것입니다.
RxJS는 이벤트 스트림을 Observable 이라는 객체로 표현한 후 비동기 이벤트 기반의 프로그램 작성을 돕습니다.
리액티브 프로그래밍이란?
리액티브 프로그래밍이란 이벤트나 배열 같은 데이터 스트림을 비동기로 처리해 변화에 유연하게 반응하는 프로그래밍 패러다임입니다.
외부와 통신하는 방식은 Pull과 Push 시나리오가 있습니다.
- Pull : 외부에서 명령하여 응답받고 처리합니다. 데이터를 가지고 오기 위해서는 계속 호출해야 합니다.
- Push : 외부에서 명령하고 기다리지 않고, 응답이 오면 그때 반응하여 처리합니다. 데이터를 가지고 오기 위해서는 구독해야 합니다.
리액티브 프로그래밍은 Push 시나리오를 채택하고 있습니다.
옵저버블과 옵저버
옵저버블은 특정 객체를 관찰하는 옵저버에게 여러 이벤트나 값을 보내는 역할을 합니다.
옵저버블에 subscribe가 호출되어야 옵저버블과 옵저버가 연결되어 실행됩니다. 실행된 결과는 옵저버블의 next가 실행되면서 확인할 수 있습니다.
const { Observable } = require("rxjs");
const observableCreated$ = Observable.create(function(observer) {
console.log("BEGIN Observable");
observer.next(1);
observer.next(2);
observer.complete();
console.log("END Observable");
});
observableCreated$.subscribe(
function next(item) {
console.log(item);
},
function error(e) {
console.log("error: ", e);
},
function complete() {
console.log("complete");
}
)
실행 결과는 다음과 같다
> BEGIN Observable
> 1
> 2
> complete
> END Observable
subscribe 함수의 호출 부분을 처리하면 아무 일도 일어나지 않습니다. 즉, 생성한 옵저버블은 옵저버에게 값을 전달하는 함수가 있지만 subscribe 함수가 호출되어야 옵저버블과 옵저버를 연결해 실행합니다.
Observable
비동기적이고 이벤트 베이스의 프로그램 구성을 위한 라이브러리 집합 특적 객체를 관찰하는 Observer에게 여러 이벤트나 값을 보내는 역할을 합니다.
생성 → 구독 → 실행 → 구독 해제의 라이프사이클을 가집니다.
Observable.create((observer) => {
try{
observer.next('item');
} catch(e) {
observer.errer(e)
} finally {
observer.complete();
}
}).subscribe(
(x)=>console.log(x),
(err) => console.error(err),
() => console.log("complele")
)
Observer
Observable에 의해 전달받은 value를 소비하는 Consumer입니다.
Observer는 Observable을 구독합니다.
옵저버에는 3가지 메서드가 존재합니다.
- next : 옵저버블 구독자에게 데이터를 전달합니다.
- error : 에러나 예외가 발생하면 이를 전달받습니다. 이후에 next 및 complete 이벤트가 발생하지 않습니다.
- complete : 정상적으로 옵저버블 구독을 완료하면 호출합니다. next 이벤트가 발생하지 않습니다.
const observer = {
next: x => console.log("asdfasdfas")
error: err => console.error(err)
complete:()=> console.log("Observer got a complete notification")
}
observable.subscribe(observer)
Subject
subject는 observer나 observable처럼 행동하지만 값을 여러 관찰자에게 멀티 캐스트 할 수 있는 특수한 유형입니다. (Observable은 유니캐스트입니다.) subject는 EventEmitters와 같습니다.
import { Subject, from } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
const observable = from([1, 2, 3]);
observable.subscribe(subject); // 주제를 제공하여 구독 할 수 있습니다.
// Logs:
// 옵저버 A : 1
// 옵저버 B : 1
// 관찰자 A : 2
// 관찰자 B : 2
// 관찰자 A : 3
// 관찰자 B : 3
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(1);
subject.next(2);
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
subject는 observer이기도하므로 다른 observable을 구독할 수 도 있고 관찰자역할을 할 수도 있습니다.
종류
- AysncsSubject
- BehaviorSubject
- PublishSubject
- ReplaySubject
ReplaySubject는 Observable 실행으로부터 여러개의 값을 기록해 두고 이를 새 구독자에게 다시 전송합니다.이전 값을 신규 구독자에게 보낼 수 있다는 점에서 BehaviorSubject와 비슷하지만 Observable의 실행의 일부를 기록하는 것도 가능합니다.
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // 새로운 구독자에게 전송할 값의 버퍼를 3으로 지정합니다.
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(5);
// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
Operator
자바스크립트의 map, filter와 같은 메서드처럼 RxJS에서도 옵저버블에 대한 연산 메서드를 지원합니다. 다만 옵저버블에만 적용할 수 있고 사용하려면 옵저버블을 생성해야 합니다.
of
인자로 받은 수를 emit하는 Observable을 생성합니다. 예를 들어, of(1, 2, 3) 은 1, 2, 3을 하나씩 차례로 내보내는 Observable을 생성합니다.
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
map(x => x * x)(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));
// Logs:
// value: 1
// value: 4
// value: 9
pipe
Operator를 순서대로 적용하는 역할
const observable = new Observable(subscribe: subscriber => {
subscriber.next(value:1);
subscriber.next(value:2);
subscriber.next(value:3);
});
observable.pipe(
map(project:(x:any) => x*x),
)
.subscribe(observer: {
next(x){ console.log('value ' + x);},
})
map
map operator는 js의 array나 java의 stream과 비슷합니다. 다른 API들과 동일하게 발생하는 요소들을 다른 값으로 변환해 줍니다.
map operator는 아래 예제와 같이 발생하는 값을 다른 값으로 변환, 대체해야 하는 경우에 적합합니다.
import { fromEvent } from 'rxjs';
import { map } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const positions = clicks.pipe(map(ev => ev.clientX));
positions.subscribe(x => console.log(x));
switchMap
구독중이던 Observable이 끝나기 전에 새로운 Observable을 구독하게 되면 이전에 구독중이던 Observable의 구독을 취소하고 다음 Observable 구독을 시작합니다.
switchMap의 경우 서버에 요청을 했다가 취소하고 다른 요청을 보내야 하는 상황처럼 진행중인 작업을 취소하는 경우에 적합하다고 합니다.
import { of, timer, interval } from 'rxjs';
import { switchMap, concatMap, delay, map, take, toArray } from 'rxjs/operators';
const delayTime = { 1: 100, 3: 300, 5: 200 };
const letters = of(1, 3, 5).pipe(concatMap(x => of(x).pipe(delay(delayTime[x]))));
letters.pipe(
switchMap(x => interval(100).pipe(map(_ => x * 10), take(3))),
toArray()
).subscribe(console.log);
// console : [10, 10, 10, 30, 30, 50, 50, 50]
'프론트엔드 스터디 > Javascript' 카테고리의 다른 글
[RxJS] Operator - iif() (0) | 2021.07.28 |
---|---|
[RxJS] Operator - pipe, zip (0) | 2021.07.28 |
프로미스(Promise)란? (0) | 2021.07.16 |
자바스크립트의 이벤트 (0) | 2021.07.14 |
자바스크립트에서 객체지향 프로그래밍이란? (0) | 2021.07.14 |