You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

215 lines
6.9 KiB
Lua

local Observer = require "rx.observer"
local util = require "rx.util"
--- @class Observable
-- @description Observables push values to Observers.
local Observable = {}
Observable.__index = Observable
Observable.__tostring = util.constant("Observable")
--- Creates a new Observable.
-- @arg {function} subscribe - The subscription function that produces values.
-- @returns {Observable}
function Observable.create(subscribe)
local self = {
_subscribe = subscribe,
}
return setmetatable(self, Observable)
end
--- Shorthand for creating an Observer and passing it to this Observable's subscription function.
-- @arg {function} onNext - Called when the Observable produces a value.
-- @arg {function} onError - Called when the Observable terminates due to an error.
-- @arg {function} onCompleted - Called when the Observable completes normally.
function Observable:subscribe(onNext, onError, onCompleted)
if type(onNext) == "table" then
return self._subscribe(onNext)
else
return self._subscribe(Observer.create(onNext, onError, onCompleted))
end
end
--- Returns an Observable that immediately completes without producing a value.
function Observable.empty()
return Observable.create(function(observer)
observer:onCompleted()
end)
end
--- Returns an Observable that never produces values and never completes.
function Observable.never()
return Observable.create(function()
end)
end
--- Returns an Observable that immediately produces an error.
function Observable.throw(message)
return Observable.create(function(observer)
observer:onError(message)
end)
end
--- Creates an Observable that produces a set of values.
-- @arg {*...} values
-- @returns {Observable}
function Observable.of(...)
local args = {...}
local argCount = select("#", ...)
return Observable.create(function(observer)
for i = 1, argCount do
observer:onNext(args[i])
end
observer:onCompleted()
end)
end
--- Creates an Observable that produces a range of values in a manner similar to a Lua for loop.
-- @arg {number} initial - The first value of the range, or the upper limit if no other arguments
-- are specified.
-- @arg {number=} limit - The second value of the range.
-- @arg {number=1} step - An amount to increment the value by each iteration.
-- @returns {Observable}
function Observable.fromRange(initial, limit, step)
if not limit and not step then
initial, limit = 1, initial
end
step = step or 1
return Observable.create(function(observer)
for i = initial, limit, step do
observer:onNext(i)
end
observer:onCompleted()
end)
end
--- Creates an Observable that produces values from a table.
-- @arg {table} table - The table used to create the Observable.
-- @arg {function=pairs} iterator - An iterator used to iterate the table, e.g. pairs or ipairs.
-- @arg {boolean} keys - Whether or not to also emit the keys of the table.
-- @returns {Observable}
function Observable.fromTable(t, iterator, keys)
iterator = iterator or pairs
return Observable.create(function(observer)
for key, value in iterator(t) do
observer:onNext(value, keys and key or nil)
end
observer:onCompleted()
end)
end
--- Creates an Observable that produces values when the specified coroutine yields.
-- @arg {thread|function} fn - A coroutine or function to use to generate values. Note that if a
-- coroutine is used, the values it yields will be shared by all
-- subscribed Observers (influenced by the Scheduler), whereas a new
-- coroutine will be created for each Observer when a function is used.
-- @returns {Observable}
function Observable.fromCoroutine(fn, scheduler)
return Observable.create(function(observer)
local thread = type(fn) == "function" and coroutine.create(fn) or fn
return scheduler:schedule(function()
while not observer.stopped do
local success, value = coroutine.resume(thread)
if success then
observer:onNext(value)
else
return observer:onError(value)
end
if coroutine.status(thread) == "dead" then
return observer:onCompleted()
end
coroutine.yield()
end
end)
end)
end
--- Creates an Observable that produces values from a file, line by line.
-- @arg {string} filename - The name of the file used to create the Observable
-- @returns {Observable}
function Observable.fromFileByLine(filename)
return Observable.create(function(observer)
local file = io.open(filename, "r")
if file then
file:close()
for line in io.lines(filename) do
observer:onNext(line)
end
return observer:onCompleted()
else
return observer:onError(filename)
end
end)
end
--- Creates an Observable that creates a new Observable for each observer using a factory function.
-- @arg {function} factory - A function that returns an Observable.
-- @returns {Observable}
function Observable.defer(fn)
if not fn or type(fn) ~= "function" then
error("Expected a function")
end
return setmetatable({
subscribe = function(_, ...)
local observable = fn()
return observable:subscribe(...)
end,
}, Observable)
end
--- Returns an Observable that repeats a value a specified number of times.
-- @arg {*} value - The value to repeat.
-- @arg {number=} count - The number of times to repeat the value. If left unspecified, the value
-- is repeated an infinite number of times.
-- @returns {Observable}
function Observable.replicate(value, count)
return Observable.create(function(observer)
while count == nil or count > 0 do
observer:onNext(value)
if count then
count = count - 1
end
end
observer:onCompleted()
end)
end
--- Subscribes to this Observable and prints values it produces.
-- @arg {string=} name - Prefixes the printed messages with a name.
-- @arg {function=tostring} formatter - A function that formats one or more values to be printed.
function Observable:dump(name, formatter)
name = name and (name .. " ") or ""
local onNext
if formatter then
onNext = function(...)
print(name .. "onNext: " .. formatter(...))
end
else
onNext = function(...)
print(name .. "onNext: ", ...)
end
end
local onError = function(e)
print(name .. "onError: " .. e)
end
local onCompleted = function()
print(name .. "onCompleted")
end
return self:subscribe(onNext, onError, onCompleted)
end
return Observable