Observables

๐ŸŒŠ Observables โ€” RxSwift์˜ ํ•ต์‹ฌ ์ŠคํŠธ๋ฆผ

โ€œEverything is a sequence over time.โ€ โ€” Rx ๋””์ž์ธ ์ฒ ํ•™

์ด ๋ฌธ์„œ์—์„œ๋Š” Observable์ด ๋ฌด์—‡์ธ์ง€, ์–ด๋–ป๊ฒŒ ์ƒ์„ฑํ•˜๊ณ (Factory), ๊ตฌ๋…ํ•˜๋ฉฐ(Subscribe), ๊ด€๋ฆฌ(Dispose)ํ•˜๋Š”์ง€ ๋‹จ๊ณ„๋ณ„๋กœ ์‚ดํŽด๋ด…๋‹ˆ๋‹ค.


1๏ธโƒฃ Observable ์ƒ๋ช…์ฃผ๊ธฐ

Observable<Element> ๋Š” ์ด๋ฒคํŠธ(Event) ์ŠคํŠธ๋ฆผ์ž…๋‹ˆ๋‹ค. ๊ฐ ์ŠคํŠธ๋ฆผ์€ ์ตœ๋Œ€ ๋„ค ์ข…๋ฅ˜์˜ ์‹œ๊ทธ๋„์„ ์ˆœ์„œ๋Œ€๋กœ ๋ฐฉ์ถœํ•ฉ๋‹ˆ๋‹ค.

์ด๋ฒคํŠธ
์„ค๋ช…
Swift ์ฝœ๋ฐฑ
์ค‘๋‹จ ์—ฌ๋ถ€

next

์š”์†Œ ๊ฐ’ ์ „๋‹ฌ

onNext(T)

โŒ

error

์˜ค๋ฅ˜ ๋ฐœ์ƒ

onError(Error)

โ›” ์ŠคํŠธ๋ฆผ ์ข…๋ฃŒ

completed

์ •์ƒ ์ข…๋ฃŒ

onCompleted()

โ›” ์ŠคํŠธ๋ฆผ ์ข…๋ฃŒ

disposed

๋ฆฌ์†Œ์Šค ํ•ด์ œ

onDisposed()

โ€”

next๋Š” ์—ฌ๋Ÿฌ ๋ฒˆ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์ง€๋งŒ, error ๋˜๋Š” completed๋Š” ๋‹จ ํ•œ ๋ฒˆ๋งŒ ๋ฐœ์ƒํ•˜๋ฉฐ ๋‘˜ ์ค‘ ํ•˜๋‚˜๋งŒ ๋ฐฉ์ถœ๋ฉ๋‹ˆ๋‹ค.


2๏ธโƒฃ Observables ๋งŒ๋“ค๊ธฐ (Factory Methods)

๋ฉ”์„œ๋“œ
์„ค๋ช…
์˜ˆ์‹œ ์ฝ”๋“œ

just(_:)

ํ•˜๋‚˜์˜ ์š”์†Œ๋งŒ ๋ฐฉ์ถœ ํ›„ ์™„๋ฃŒ

Observable.just("๐ŸŽ")

of(_:)

์—ฌ๋Ÿฌ ์š”์†Œ ์ˆœ์ฐจ ๋ฐฉ์ถœ

Observable.of(1,2,3)

from(_:)

๋ฐฐ์—ดยท์‹œํ€€์Šค โ†’ ์ŠคํŠธ๋ฆผ

Observable.from(["A","B"])

create(_:)

์ปค์Šคํ…€ ์ด๋ฒคํŠธ ์ •์˜

Observable<String>.create { observer in ... }

deferred(_:)

๊ตฌ๋… ์‹œ์ ๋งˆ๋‹ค ์ƒˆ ์ŠคํŠธ๋ฆผ ์ƒ์„ฑ

Observable.deferred { Bool.random() ? .just("โœ…") : .error(err) }

interval

์ผ์ • ์ฃผ๊ธฐ ์ˆซ์ž ๋ฐฉ์ถœ

Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)

timer

์ง€์—ฐ ํ›„ 1ํšŒ ๋˜๋Š” ์ฃผ๊ธฐ ๋ฐฉ์ถœ

Observable<Int>.timer(.seconds(3), period: .seconds(1), scheduler: Main)

// ์˜ˆ: 0.5์ดˆ๋งˆ๋‹ค ์นด์šดํŠธ ์—… ์ŠคํŠธ๋ฆผ
let counter = Observable<Int>
    .interval(.milliseconds(500), scheduler: MainScheduler.instance)

๐Ÿ’ก Tip: ํ”Œ๋ ˆ์ด๊ทธ๋ผ์šด๋“œ์—์„œ PlaygroundPage.current.needsIndefiniteExecution = true ์„ค์ • ํ›„ ์‹คํ–‰ํ•ด ๋ณด์„ธ์š”.


3๏ธโƒฃ Cold vs Hot Observables

๊ตฌ๋ถ„
ํŠน์ง•
์˜ˆ์‹œ

Cold

๊ตฌ๋…๋งˆ๋‹ค ์ƒˆ ๋ฐ์ดํ„ฐ ์ƒ์‚ฐ

Observable<Int>.range(0, 5), URLSession.rx.response

Hot

์ด๋ฏธ ํ๋ฅด๊ณ  ์žˆ๋Š” ๊ณต์œ  ๋ฐ์ดํ„ฐ

PublishSubject, NotificationCenter.default.rx.notification

// Cold ์˜ˆ์‹œ
let range = Observable.range(start: 0, count: 3)
range.subscribe(onNext: { print("๐Ÿ…ฐ๏ธ", $0) })
range.subscribe(onNext: { print("๐Ÿ…ฑ๏ธ", $0) }) // ๋‘ ๊ตฌ๋…์ž ๋ชจ๋‘ 0,1,2 ๋ฐ›์Œ

// Hot ์˜ˆ์‹œ
let subject = PublishSubject<Int>()
subject.subscribe(onNext: { print("1st ->", $0) })
subject.onNext(1)
subject.subscribe(onNext: { print("2nd ->", $0) }) // ๋‘ ๋ฒˆ์งธ๋Š” 2๋ถ€ํ„ฐ ์ˆ˜์‹ 
subject.onNext(2)

4๏ธโƒฃ ๊ตฌ๋… & DisposeBag

let bag = DisposeBag()

Observable.from(["๐ŸŽ","๐ŸŒ","๐Ÿ‡"])
    .subscribe(
        onNext: { print("fruit:",$0) },
        onError: { print("error:",$0) },
        onCompleted: { print("completed") },
        onDisposed: { print("disposed") }
    )
    .disposed(by: bag)
  • DisposeBag: ๊ตฌ๋…(disposable)์„ ๋‹ด์•„ ์Šค์ฝ”ํ”„ ๋‹จ์œ„๋กœ ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ํ•ด์ œํ•ฉ๋‹ˆ๋‹ค. ํด๋ž˜์Šค๋ณ„๋กœ let bag = DisposeBag()๋ฅผ ๋ณด์œ ํ•˜๋Š” ํŒจํ„ด์ด ์ผ๋ฐ˜์ ์ž…๋‹ˆ๋‹ค.

rx.deallocated ์ŠคํŠธ๋ฆผ์„ ํ™œ์šฉํ•˜๋ฉด ๋ทฐ ์†Œ๋ฉธ ์‹œ ์ž๋™ ํ•ด์ œ ํŒจํ„ด์„ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.


5๏ธโƒฃ ์‹ค์ „ ์˜ˆ์ œ โ€” URLSession + JSON ๋””์ฝ”๋”ฉ

struct Post: Decodable { let id: Int; let title: String }

func fetchPosts() -> Observable<[Post]> {
    let url = URL(string: "https://jsonplaceholder.typicode.com/posts")!

    return URLSession.shared.rx.data(request: URLRequest(url: url))
        .map { data in try JSONDecoder().decode([Post].self, from: data) }
}

fetchPosts()
    .observe(on: MainScheduler.instance)
    .subscribe(onNext: { print($0.first?.title ?? "-") })
    .disposed(by: bag)

6๏ธโƒฃ Cheat Sheet

์นดํ…Œ๊ณ ๋ฆฌ
๋ฉ”์„œ๋“œ
ํ•œ ์ค„ ์„ค๋ช…

Factory

just, of, from, create, deferred, empty, never, error, interval, timer, range

Filtering

filter, distinctUntilChanged, take, skip, debounce, throttle

Transform

map, flatMap, flatMapLatest, buffer, scan

Combining

merge, concat, zip, combineLatest, withLatestFrom

Error

catchError, retry, retryWhen


7๏ธโƒฃ Mini Quiz

  1. flatMapLatest์™€ switchMap(Kotlin Flow) ์ฐจ์ด๋ฅผ ์„ค๋ช…ํ•ด ๋ณด์„ธ์š”.

  2. Cold Observable์—์„œ ๋‘ ๋ฒˆ์งธ ๊ตฌ๋…์ด ์ด์ „ ๊ฐ’์˜ ์žฌ์ƒ์‚ฐ์„ ๋ง‰์œผ๋ ค๋ฉด ์–ด๋–ค ์˜คํผ๋ ˆ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์„๊นŒ์š”?

  3. DisposeBag ์—†์ด๋„ ๋ฉ”๋ชจ๋ฆฌ ๋ˆ„์ˆ˜๊ฐ€ ๋ฐœ์ƒํ•˜์ง€ ์•Š๋Š” ๊ฒฝ์šฐ๋Š”?

์ •๋‹ต
  1. flatMapLatest vs switchMap

    • RxSwift์˜ flatMapLatest(a.k.a switchMap)๋Š” ๊ฐ€์žฅ ์ตœ๊ทผ์— ์ƒ์„ฑ๋œ ๋‚ด๋ถ€ Observable๋งŒ ๊ตฌ๋…ํ•˜๊ณ , ์ด์ „ ์ŠคํŠธ๋ฆผ์€ ๊ตฌ๋…์„ ํ•ด์ œํ•ฉ๋‹ˆ๋‹ค. Kotlin FlowยทRxJS์˜ switchMap๊ณผ ๋™์ž‘์ด ๋™์ผํ•˜๋ฉฐ ๋‹จ์ˆœํžˆ ๋„ค์ด๋ฐ ์ฐจ์ด์ž…๋‹ˆ๋‹ค.

  2. Cold Observable ์žฌ์ƒ์‚ฐ ๋ฐฉ์ง€

    • share() ๋˜๋Š” share(replay:scope:), publish().refCount() ๋“ฑ ๊ณต์œ  ์˜คํผ๋ ˆ์ดํ„ฐ๋กœ ์ŠคํŠธ๋ฆผ์„ Hotํ•˜๊ฒŒ ๋ณ€ํ™˜ํ•ด ๋‘ ๋ฒˆ์งธ ๊ตฌ๋… ์‹œ ์žฌ์‹คํ–‰์„ ๋ง‰์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  3. DisposeBag ์—†์ด ๋ฉ”๋ชจ๋ฆฌ ๋ˆ„์ˆ˜๊ฐ€ ์—†๋Š” ๊ฒฝ์šฐ

    • Observable.just, of, from ๊ฐ™์ด ๋™๊ธฐ์ ์œผ๋กœ ์ฆ‰์‹œ ์™„๊ฒฐ(completed)๋˜๋Š” ์‹œํ€€์Šค๋Š” ๊ตฌ๋… ์งํ›„ ์ข…๋ฃŒ๋˜๋ฏ€๋กœ ๋ช…์‹œ์  dispose๊ฐ€ ํ•„์š” ์—†์Šต๋‹ˆ๋‹ค. ๋˜ํ•œ take(1)์ฒ˜๋Ÿผ ์ผ์ฐ ์™„๋ฃŒ์‹œํ‚ค๋Š” ์˜คํผ๋ ˆ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ–ˆ์„ ๋•Œ๋„ ๋ฉ”๋ชจ๋ฆฌ ๋ˆ„์ˆ˜ ์œ„ํ—˜์ด ๋‚ฎ์Šต๋‹ˆ๋‹ค.

Last updated