local util = require "rx.util" --- @class Observer -- @description Observers are simple objects that receive values from Observables. local Observer = {} Observer.__index = Observer Observer.__tostring = util.constant("Observer") --- Creates a new Observer. -- @arg {function=} onNext - Called when the Observable produces a value. -- @arg {function=} onError - Called when the Observable terminates due to an error. -- @arg {function=} onCompleted - Called when the Observable completes normally. -- @returns {Observer} function Observer.create(onNext, onError, onCompleted) local self = { _onNext = onNext or util.noop, _onError = onError or error, _onCompleted = onCompleted or util.noop, stopped = false, } return setmetatable(self, Observer) end --- Pushes zero or more values to the Observer. -- @arg {*...} values function Observer:onNext(...) if not self.stopped then self._onNext(...) end end --- Notify the Observer that an error has occurred. -- @arg {string=} message - A string describing what went wrong. function Observer:onError(message) if not self.stopped then self.stopped = true self._onError(message) end end --- Notify the Observer that the sequence has completed and will produce no more values. function Observer:onCompleted() if not self.stopped then self.stopped = true self._onCompleted() end end return Observer