You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

41 lines
1.2 KiB
Lua

local Observable = require "rx.observable"
local Subscription = require "rx.subscription"
--- Returns a new Observable that subscribes to the Observables produced by the original and
-- produces their values.
-- @returns {Observable}
function Observable:flatten()
return Observable.create(function(observer)
local subscriptions = {}
local remaining = 1
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
remaining = remaining - 1
if remaining == 0 then
return observer:onCompleted()
end
end
local function onNext(observable)
local function innerOnNext(...)
observer:onNext(...)
end
remaining = remaining + 1
local subscription = observable:subscribe(innerOnNext, onError, onCompleted)
subscriptions[#subscriptions + 1] = subscription
end
subscriptions[#subscriptions + 1] = self:subscribe(onNext, onError, onCompleted)
return Subscription.create(function()
for i = 1, #subscriptions do
subscriptions[i]:unsubscribe()
end
end)
end)
end