Schedulers

๐Ÿงญ Schedulers โ€” RxSwift์˜ โ€˜์‹คํ–‰ ์ปจํ…์ŠคํŠธโ€™ ๊ด€๋ฆฌ

โ€œsubscribeOn์€ ์‹œ์ž‘์ ์„, observeOn์€ ๊ด€์ฐฐ ์ง€์ ์„ ๋ฐ”๊พผ๋‹ค.โ€

SchedulerType์€ Observable ์—ฐ์‚ฐ์ด ์–ด๋А ์Šค๋ ˆ๋“œ/ํ์—์„œ ์‹คํ–‰๋ ์ง€ ๊ฒฐ์ •ํ•ฉ๋‹ˆ๋‹ค. ๋™์‹œ์„ฑ ์ฝ”๋“œ๋„ ์„ ์–ธํ˜• ์ฒด์ธ์— ๋…น์—ฌ ๊ฐ€๋…์„ฑ๊ณผ ์•ˆ์ „์„ฑ์„ ๋†’์ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.


1๏ธโƒฃ ์ฃผ์š” Scheduler ์ข…๋ฅ˜

Scheduler
๊ธฐ๋ฐ˜
์šฉ๋„ & ํŠน์ง•

MainScheduler

๋ฉ”์ธ ์“ฐ๋ ˆ๋“œ

UI ์—…๋ฐ์ดํŠธ ํ•„์ˆ˜, Drive, bind(to:) ๊ธฐ๋ณธ ๊ฐ’

CurrentThreadScheduler

ํ˜„์žฌ ํ˜ธ์ถœ ์Šคํƒ

๋””ํดํŠธ(Cold Observable) โ€” ์žฌ๊ท€ ์‹œ ์Šคํƒ ์˜ค๋ฒ„ํ”Œ๋กœ์šฐ ์ฃผ์˜

SerialDispatchQueueScheduler

GCD Serial Queue

์‹œ๋ฆฌ์–ผ ์ž‘์—…, ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค I/O, ํŒŒ์ผ ์“ฐ๊ธฐ

ConcurrentDispatchQueueScheduler

GCD Concurrent Queue

CPUโ€‘bound ๊ณ„์‚ฐ, ํŒŒ์‹ฑ, ์ด๋ฏธ์ง€ ํ•„ํ„ฐ

OperationQueueScheduler

OperationQueue

์˜์กด์„ฑยท์šฐ์„ ์ˆœ์œ„ ์ œ์–ด ํ•„์š” ์‹œ

ImmediateSchedulerType

์ฆ‰์‹œ ์‹คํ–‰

ํ…Œ์ŠคํŠธยท๋™๊ธฐ ์ž‘์—… ์ตœ์ ํ™”

TestScheduler

๊ฐ€์ƒ ์‹œ๊ฐ„

๋‹จ์œ„ ํ…Œ์ŠคํŠธ, Marble ํ…Œ์ŠคํŠธ ๊ตฌ์ถ•


2๏ธโƒฃ subscribe(on:) vs observe(on:)

๋ฉ”์„œ๋“œ
๋ชฉ์ 
์˜ˆ์‹œ ๋น„์œ 

subscribe(on:)

์ƒ์‚ฐ ๋‹จ๊ณ„(Observable ์ƒ์„ฑ) ์Šค์ผ€์ค„๋Ÿฌ ์ง€์ •

โ€œ๋Œ€ํŒŒ ์†์งˆ์„ ์ฃผ๋ฐฉ์—์„œ ์‹œ์ž‘โ€

observe(on:)

์†Œ๋น„ ๋‹จ๊ณ„ ์ดํ›„ ์ฒด์ธ ์‹คํ–‰ ์Šค์ผ€์ค„๋Ÿฌ ๋ณ€๊ฒฝ

โ€œ์†์งˆ๋œ ์žฌ๋ฃŒ๋ฅผ ํ™€ ์„œ๋น™ ํ…Œ์ด๋ธ”์—์„œ ๋งˆ๋ฌด๋ฆฌโ€

URLSession.shared.rx.data(request: req)
    .subscribe(on: ConcurrentDispatchQueueScheduler(qos: .background)) // ๋„คํŠธ์›Œํฌ ๋ฐฑ๊ทธ๋ผ์šด๋“œ
    .map(parseJSON)
    .observe(on: MainScheduler.instance) // ๊ฒฐ๊ณผ UI ๋ฐ”์ธ๋”ฉ
    .bind(to: tableView.rx.items(cellIdentifier: "Cell")) { _, model, cell in
        cell.textLabel?.text = model.title
    }
    .disposed(by: bag)

3๏ธโƒฃ ์‹ค์ „ ํŒจํ„ด

A. ๋น„๋™๊ธฐ ์ด๋ฏธ์ง€ ์ฒ˜๋ฆฌ

imagePicker.rx.selectedImage
    .subscribe(on: ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .map(ImageProcessor.resize)
    .observe(on: MainScheduler.instance)
    .bind(to: imageView.rx.image)
    .disposed(by: bag)

B. CoreData ์ €์žฅ

saveTrigger
    .withLatestFrom(input)
    .observe(on: SerialDispatchQueueScheduler(qos: .utility))
    .flatMap(saveToCoreData)
    .observe(on: MainScheduler.instance)
    .subscribe(onNext: showToast)
    .disposed(by: bag)

4๏ธโƒฃ TestScheduler โ€” ๊ฐ€์ƒ ์‹œ๊ฐ„ ๋‹จ์œ„ ํ…Œ์ŠคํŠธ

let scheduler = TestScheduler(initialClock: 0)
let hot = scheduler.createHotObservable([
    .next(100, 1), .next(200, 2), .completed(300)
])

let res = scheduler.start { hot.map { $0 * 2 } }
XCTAssertEqual(res.events, [
    .next(100, 2), .next(200, 4), .completed(300)
])

๊ฐ€์ƒ ์‹œ๊ฐ„์œผ๋กœ ๋™๊ธฐ ํ…Œ์ŠคํŠธ ์ž‘์„ฑ โ†’ ํ…Œ์ŠคํŠธ ์†๋„ โฌ†๏ธ, ์žฌํ˜„์„ฑ โฌ†๏ธ.


5๏ธโƒฃ Deadlock & Race Condition ๋ฐฉ์ง€

  1. ๋ฉ”์ธ โ†’ ๋ฉ”์ธ ์žฌ์ž…๋ ฅ: Observable ์ฒด์ธ ์ค‘ ๋‹ค์‹œ MainScheduler๋กœ ์ „ํ™˜ ์‹œ ์ค‘์ฒฉ DispatchQueue ํ˜ธ์ถœ ์ง€์–‘.

  2. Shared Mutable State: SerialScheduler๋กœ ๋ณดํ˜ธํ•˜๊ฑฐ๋‚˜ Actor(Swift Concurrency) ์‚ฌ์šฉ.

  3. subscribeOn ์ค‘๋ณต: ๊ฐ€์žฅ ์ฒ˜์Œ์— ์œ„์น˜ํ•œ subscribeOn๋งŒ ์ ์šฉ.


6๏ธโƒฃ Best Practices Checklist โœ…


7๏ธโƒฃ Mini Quiz

  1. observe(on:)๋ฅผ ์—ฐ์† ๋‘ ๋ฒˆ ํ˜ธ์ถœํ•˜๋ฉด ์–ด๋–ป๊ฒŒ ๋™์ž‘ํ• ๊นŒ?

  2. subscribe(on:)์œผ๋กœ MainScheduler, observe(on:)์œผ๋กœ Background ์ง€์ • ์‹œ UI ์ž‘์—…์„ ํ•˜๋ฉด ๋ฌด์Šจ ๋ฌธ์ œ๊ฐ€?

  3. TestScheduler ์‚ฌ์šฉ ์‹œ Cold Observable๊ณผ Hot Observable์˜ ์ฐจ์ด๋Š”?

์ •๋‹ต
  1. ๋งˆ์ง€๋ง‰ observe(on:)๊ฐ€ ์šฐ์„  โ€” ์ฒด์ธ ์ง„ํ–‰ ์ค‘ ๊ฐ€์žฅ ์ตœ๊ทผ์— ์ง€์ •ํ•œ Scheduler๊ฐ€ ์ดํ›„ ์—ฐ์‚ฐ์— ์ ์šฉ๋ฉ๋‹ˆ๋‹ค.

  2. UI ์—…๋ฐ์ดํŠธ๊ฐ€ ๋ฐฑ๊ทธ๋ผ์šด๋“œ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰๋ผ ํฌ๋ž˜์‹œ(UI API called on background thread) ์œ„ํ—˜.

  3. TestScheduler์—์„œ

    • Cold: createColdObservable โ†’ ๊ตฌ๋… ์‹œ ์ด๋ฒคํŠธ ์Šค์ผ€์ค„๋ง ์‹œ์ž‘.

    • Hot: createHotObservable โ†’ ํ…Œ์ŠคํŠธ ์‹œ๊ฐ„ 0๋ถ€ํ„ฐ ์ด๋ฒคํŠธ ํ๋ฆ„, ๊ตฌ๋… ํƒ€์ด๋ฐ ๋”ฐ๋ผ ์ˆ˜์‹  ์ด๋ฒคํŠธ ๋‹ฌ๋ผ์ง.


๐ŸŽ‰ Schedulers ์„น์…˜ ์™„๋ฃŒ! ์ด์ œ Sequences ์นดํ…Œ๊ณ ๋ฆฌ๋กœ ๋„˜์–ด๊ฐ€ ์ŠคํŠธ๋ฆผ ์ƒ์„ ๋งˆ์Šคํ„ฐํ•ด ๋ณด์„ธ์š”. ๐Ÿš€

Last updated