You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
215 lines
6.9 KiB
Lua
215 lines
6.9 KiB
Lua
local Observer = require "rx.observer"
|
|
local util = require "rx.util"
|
|
|
|
--- @class Observable
|
|
-- @description Observables push values to Observers.
|
|
local Observable = {}
|
|
Observable.__index = Observable
|
|
Observable.__tostring = util.constant("Observable")
|
|
|
|
--- Creates a new Observable.
|
|
-- @arg {function} subscribe - The subscription function that produces values.
|
|
-- @returns {Observable}
|
|
function Observable.create(subscribe)
|
|
local self = {
|
|
_subscribe = subscribe,
|
|
}
|
|
|
|
return setmetatable(self, Observable)
|
|
end
|
|
|
|
--- Shorthand for creating an Observer and passing it to this Observable's subscription function.
|
|
-- @arg {function} onNext - Called when the Observable produces a value.
|
|
-- @arg {function} onError - Called when the Observable terminates due to an error.
|
|
-- @arg {function} onCompleted - Called when the Observable completes normally.
|
|
function Observable:subscribe(onNext, onError, onCompleted)
|
|
if type(onNext) == "table" then
|
|
return self._subscribe(onNext)
|
|
else
|
|
return self._subscribe(Observer.create(onNext, onError, onCompleted))
|
|
end
|
|
end
|
|
|
|
--- Returns an Observable that immediately completes without producing a value.
|
|
function Observable.empty()
|
|
return Observable.create(function(observer)
|
|
observer:onCompleted()
|
|
end)
|
|
end
|
|
|
|
--- Returns an Observable that never produces values and never completes.
|
|
function Observable.never()
|
|
return Observable.create(function()
|
|
end)
|
|
end
|
|
|
|
--- Returns an Observable that immediately produces an error.
|
|
function Observable.throw(message)
|
|
return Observable.create(function(observer)
|
|
observer:onError(message)
|
|
end)
|
|
end
|
|
|
|
--- Creates an Observable that produces a set of values.
|
|
-- @arg {*...} values
|
|
-- @returns {Observable}
|
|
function Observable.of(...)
|
|
local args = {...}
|
|
local argCount = select("#", ...)
|
|
return Observable.create(function(observer)
|
|
for i = 1, argCount do
|
|
observer:onNext(args[i])
|
|
end
|
|
|
|
observer:onCompleted()
|
|
end)
|
|
end
|
|
|
|
--- Creates an Observable that produces a range of values in a manner similar to a Lua for loop.
|
|
-- @arg {number} initial - The first value of the range, or the upper limit if no other arguments
|
|
-- are specified.
|
|
-- @arg {number=} limit - The second value of the range.
|
|
-- @arg {number=1} step - An amount to increment the value by each iteration.
|
|
-- @returns {Observable}
|
|
function Observable.fromRange(initial, limit, step)
|
|
if not limit and not step then
|
|
initial, limit = 1, initial
|
|
end
|
|
|
|
step = step or 1
|
|
|
|
return Observable.create(function(observer)
|
|
for i = initial, limit, step do
|
|
observer:onNext(i)
|
|
end
|
|
|
|
observer:onCompleted()
|
|
end)
|
|
end
|
|
|
|
--- Creates an Observable that produces values from a table.
|
|
-- @arg {table} table - The table used to create the Observable.
|
|
-- @arg {function=pairs} iterator - An iterator used to iterate the table, e.g. pairs or ipairs.
|
|
-- @arg {boolean} keys - Whether or not to also emit the keys of the table.
|
|
-- @returns {Observable}
|
|
function Observable.fromTable(t, iterator, keys)
|
|
iterator = iterator or pairs
|
|
return Observable.create(function(observer)
|
|
for key, value in iterator(t) do
|
|
observer:onNext(value, keys and key or nil)
|
|
end
|
|
|
|
observer:onCompleted()
|
|
end)
|
|
end
|
|
|
|
--- Creates an Observable that produces values when the specified coroutine yields.
|
|
-- @arg {thread|function} fn - A coroutine or function to use to generate values. Note that if a
|
|
-- coroutine is used, the values it yields will be shared by all
|
|
-- subscribed Observers (influenced by the Scheduler), whereas a new
|
|
-- coroutine will be created for each Observer when a function is used.
|
|
-- @returns {Observable}
|
|
function Observable.fromCoroutine(fn, scheduler)
|
|
return Observable.create(function(observer)
|
|
local thread = type(fn) == "function" and coroutine.create(fn) or fn
|
|
return scheduler:schedule(function()
|
|
while not observer.stopped do
|
|
local success, value = coroutine.resume(thread)
|
|
|
|
if success then
|
|
observer:onNext(value)
|
|
else
|
|
return observer:onError(value)
|
|
end
|
|
|
|
if coroutine.status(thread) == "dead" then
|
|
return observer:onCompleted()
|
|
end
|
|
|
|
coroutine.yield()
|
|
end
|
|
end)
|
|
end)
|
|
end
|
|
|
|
--- Creates an Observable that produces values from a file, line by line.
|
|
-- @arg {string} filename - The name of the file used to create the Observable
|
|
-- @returns {Observable}
|
|
function Observable.fromFileByLine(filename)
|
|
return Observable.create(function(observer)
|
|
local file = io.open(filename, "r")
|
|
if file then
|
|
file:close()
|
|
|
|
for line in io.lines(filename) do
|
|
observer:onNext(line)
|
|
end
|
|
|
|
return observer:onCompleted()
|
|
else
|
|
return observer:onError(filename)
|
|
end
|
|
end)
|
|
end
|
|
|
|
--- Creates an Observable that creates a new Observable for each observer using a factory function.
|
|
-- @arg {function} factory - A function that returns an Observable.
|
|
-- @returns {Observable}
|
|
function Observable.defer(fn)
|
|
if not fn or type(fn) ~= "function" then
|
|
error("Expected a function")
|
|
end
|
|
|
|
return setmetatable({
|
|
subscribe = function(_, ...)
|
|
local observable = fn()
|
|
return observable:subscribe(...)
|
|
end,
|
|
}, Observable)
|
|
end
|
|
|
|
--- Returns an Observable that repeats a value a specified number of times.
|
|
-- @arg {*} value - The value to repeat.
|
|
-- @arg {number=} count - The number of times to repeat the value. If left unspecified, the value
|
|
-- is repeated an infinite number of times.
|
|
-- @returns {Observable}
|
|
function Observable.replicate(value, count)
|
|
return Observable.create(function(observer)
|
|
while count == nil or count > 0 do
|
|
observer:onNext(value)
|
|
if count then
|
|
count = count - 1
|
|
end
|
|
end
|
|
observer:onCompleted()
|
|
end)
|
|
end
|
|
|
|
--- Subscribes to this Observable and prints values it produces.
|
|
-- @arg {string=} name - Prefixes the printed messages with a name.
|
|
-- @arg {function=tostring} formatter - A function that formats one or more values to be printed.
|
|
function Observable:dump(name, formatter)
|
|
name = name and (name .. " ") or ""
|
|
|
|
local onNext
|
|
if formatter then
|
|
onNext = function(...)
|
|
print(name .. "onNext: " .. formatter(...))
|
|
end
|
|
else
|
|
onNext = function(...)
|
|
print(name .. "onNext: ", ...)
|
|
end
|
|
end
|
|
local onError = function(e)
|
|
print(name .. "onError: " .. e)
|
|
end
|
|
local onCompleted = function()
|
|
print(name .. "onCompleted")
|
|
end
|
|
|
|
return self:subscribe(onNext, onError, onCompleted)
|
|
end
|
|
|
|
return Observable
|