You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

231 lines
6.4 KiB
Lua

local skynet = require "skynet"
require "skynet.manager"
local socket = require "skynet.socket"
local websocket = require "http.websocket"
local socketdriver = require "skynet.socketdriver"
local watchdog
local connection = {} -- fd -> connection : { fd , client, agent , ip, mode }
local forwarding = {} -- agent -> connection
local client_number = 0
local maxclient -- max client
local nodelay
local protocol
local master_name, slave_name = ...
master_name = master_name or ".ws_gate"
skynet.register_protocol {
name = "client",
id = skynet.PTYPE_CLIENT,
}
local function launch_master()
local CMD_MASTER = {}
local slave = {}
function CMD_MASTER.open(source, conf)
local instance = conf.instance or 8
assert(instance > 0)
local balance = 1
for i = 1, instance do
local _slave_name = string.format("%s-slave-%d", master_name, i)
table.insert(slave, skynet.newservice(SERVICE_NAME, master_name, _slave_name))
end
conf.watchdog = conf.watchdog or source
for i = 1, instance do
local s = slave[i]
skynet.call(s, "lua", "open", conf)
end
local address = conf.address or "0.0.0.0"
local port = assert(conf.port)
protocol = conf.protocol or "ws"
local fd = socket.listen(address, port)
skynet.error(string.format("Listen websocket port:%s protocol:%s", port, protocol))
socket.start(fd, function(fd, addr)
skynet.error(string.format("accept client socket_fd: %s addr:%s", fd, addr))
local s = slave[balance]
balance = balance + 1
if balance > #slave then
balance = 1
end
local ok, err = skynet.call(s, "lua", "accept", fd, addr)
if not ok then
skynet.error(string.format("invalid client (fd = %d) error = %s", fd, err))
end
end)
end
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = CMD_MASTER[cmd]
if not f then
skynet.error("ws gate master can't dispatch cmd " .. (cmd or nil))
skynet.ret(skynet.pack({
ok = false,
}))
return
end
if session == 0 then
f(source, ...)
else
skynet.ret(skynet.pack(f(source, ...)))
end
end)
end
local function launch_slave()
local function unforward(c)
if c.agent then
forwarding[c.agent] = nil
c.agent = nil
c.client = nil
end
end
local function close_fd(fd)
local c = connection[fd]
if c then
unforward(c)
connection[fd] = nil
client_number = client_number - 1
end
end
local handler = {}
function handler.connect(fd)
skynet.error("ws connect from: " .. tostring(fd))
if client_number >= maxclient then
socketdriver.close(fd)
return
end
if nodelay then
socketdriver.nodelay(fd)
end
client_number = client_number + 1
local addr = websocket.addrinfo(fd)
local c = {
fd = fd,
ip = addr,
}
connection[fd] = c
skynet.send(watchdog, "lua", "socket", "open", fd, addr, skynet.self())
end
function handler.handshake(fd, header, url)
local addr = websocket.addrinfo(fd)
skynet.error("ws handshake from: " .. tostring(fd), "url", url, "addr:", addr)
skynet.error("----header-----")
for k, v in pairs(header) do
skynet.error(k, v)
end
skynet.error("--------------")
end
function handler.message(fd, msg)
skynet.error("ws ping from: " .. tostring(fd), msg .. "\n")
-- recv a package, forward it
local c = connection[fd]
local agent = c and c.agent
-- msg is string
if agent then
skynet.redirect(agent, c.client, "client", fd, msg)
else
skynet.send(watchdog, "lua", "socket", "data", fd, msg)
end
end
function handler.ping(fd)
skynet.error("ws ping from: " .. tostring(fd) .. "\n")
end
function handler.pong(fd)
skynet.error("ws pong from: " .. tostring(fd))
end
function handler.close(fd, code, reason)
skynet.error("ws close from: " .. tostring(fd), code, reason)
close_fd(fd)
skynet.send(watchdog, "lua", "socket", "close", fd)
end
function handler.error(fd)
skynet.error("ws error from: " .. tostring(fd))
close_fd(fd)
skynet.send(watchdog, "lua", "socket", "error", fd, msg)
end
function handler.warning(fd, size)
skynet.send(watchdog, "lua", "socket", "warning", fd, size)
end
local CMD_SLAVE = {}
function CMD_SLAVE.forward(source, fd, client, address)
local c = assert(connection[fd])
unforward(c)
c.client = client or 0
c.agent = address or source
forwarding[c.agent] = c
end
function CMD_SLAVE.response(source, fd, msg)
skynet.error("ws response: " .. tostring(fd), msg .. "\n")
-- forward msg
websocket.write(fd, msg)
end
function CMD_SLAVE.kick(source, fd)
websocket.close(fd)
end
function CMD_SLAVE.accept(source, fd, addr)
return websocket.accept(fd, handler, protocol, addr)
end
function CMD_SLAVE.open(source, conf)
maxclient = conf.maxclient or 1024
nodelay = conf.nodelay
protocol = conf.protocol or "ws"
watchdog = conf.watchdog
skynet.error("open", watchdog, source)
end
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = CMD_SLAVE[cmd]
if not f then
skynet.error("ws gate slave can't dispatch cmd " .. (cmd or nil))
skynet.ret(skynet.pack({
ok = false,
}))
return
end
if session == 0 then
f(source, ...)
else
skynet.ret(skynet.pack(f(source, ...)))
end
end)
end
skynet.start(function()
if slave_name then
skynet.error("start ws_gate slave:", slave_name)
skynet.register(slave_name)
launch_slave()
else
skynet.error("start ws_gate master:", master_name)
skynet.register(master_name)
launch_master()
end
end)