diff --git a/framework/lualib/zeus/skynet/rpc.lua b/framework/lualib/zeus/skynet/rpc.lua new file mode 100644 index 0000000..cf9f1ec --- /dev/null +++ b/framework/lualib/zeus/skynet/rpc.lua @@ -0,0 +1,22 @@ +local class = class +-- local skynet = require "skynet" + +local Rpc = class("Rpc") + +function Rpc:initialize() +end + +function Rpc:parallels() +end + +function Rpc:call() + -- return skynet.call() +end + +function Rpc:send() +end + +function Rpc:timeout_call() +end + +return Rpc diff --git a/framework/lualib/zeus/skynet/skynetimer.lua b/framework/lualib/zeus/skynet/timer.lua similarity index 98% rename from framework/lualib/zeus/skynet/skynetimer.lua rename to framework/lualib/zeus/skynet/timer.lua index 373a0b0..a540667 100755 --- a/framework/lualib/zeus/skynet/skynetimer.lua +++ b/framework/lualib/zeus/skynet/timer.lua @@ -150,7 +150,6 @@ function mt:__update() end end self.state = state.INIT - -- print("timer update running=", self.state) end local M = {} diff --git a/framework/service/ws_agent.lua b/framework/service/ws_agent.lua new file mode 100755 index 0000000..472584f --- /dev/null +++ b/framework/service/ws_agent.lua @@ -0,0 +1,45 @@ +local skynet = require "skynet" +local socket = require "skynet.socket" + +local WATCHDOG +local host +local send_request + +local CMD = {} +local client_fd +local gate + +skynet.register_protocol { + name = "client", + id = skynet.PTYPE_CLIENT, + unpack = skynet.tostring, + dispatch = function(fd, address, msg) + assert(fd == client_fd) -- You can use fd to reply message + skynet.ignoreret() -- session is fd, don't call skynet.ret + -- skynet.trace() + -- echo simple + skynet.send(gate, "lua", "response", fd, msg) + skynet.error(address, msg) + end, +} + +function CMD.start(conf) + local fd = conf.client + gate = conf.gate + WATCHDOG = conf.watchdog + client_fd = fd + skynet.call(gate, "lua", "forward", fd) +end + +function CMD.disconnect() + -- todo: do something before exit + skynet.exit() +end + +skynet.start(function() + skynet.dispatch("lua", function(_, _, command, ...) + -- skynet.trace() + local f = CMD[command] + skynet.ret(skynet.pack(f(...))) + end) +end) diff --git a/framework/service/ws_gate.lua b/framework/service/ws_gate.lua new file mode 100755 index 0000000..df1b090 --- /dev/null +++ b/framework/service/ws_gate.lua @@ -0,0 +1,230 @@ +local skynet = require "skynet" +require "skynet.manager" +local socket = require "skynet.socket" +local websocket = require "http.websocket" +local socketdriver = require "skynet.socketdriver" + +local watchdog +local connection = {} -- fd -> connection : { fd , client, agent , ip, mode } +local forwarding = {} -- agent -> connection + +local client_number = 0 +local maxclient -- max client +local nodelay +local protocol + +local master_name, slave_name = ... +master_name = master_name or ".ws_gate" + +skynet.register_protocol { + name = "client", + id = skynet.PTYPE_CLIENT, +} + +local function launch_master() + local CMD_MASTER = {} + local slave = {} + + function CMD_MASTER.open(source, conf) + local instance = conf.instance or 8 + assert(instance > 0) + local balance = 1 + + for i = 1, instance do + local _slave_name = string.format("%s-slave-%d", master_name, i) + table.insert(slave, skynet.newservice(SERVICE_NAME, master_name, _slave_name)) + end + + conf.watchdog = conf.watchdog or source + for i = 1, instance do + local s = slave[i] + skynet.call(s, "lua", "open", conf) + end + + local address = conf.address or "0.0.0.0" + local port = assert(conf.port) + protocol = conf.protocol or "ws" + + local fd = socket.listen(address, port) + skynet.error(string.format("Listen websocket port:%s protocol:%s", port, protocol)) + socket.start(fd, function(fd, addr) + skynet.error(string.format("accept client socket_fd: %s addr:%s", fd, addr)) + + local s = slave[balance] + balance = balance + 1 + if balance > #slave then + balance = 1 + end + local ok, err = skynet.call(s, "lua", "accept", fd, addr) + if not ok then + skynet.error(string.format("invalid client (fd = %d) error = %s", fd, err)) + end + end) + end + + skynet.dispatch("lua", function(session, source, cmd, ...) + local f = CMD_MASTER[cmd] + if not f then + skynet.error("ws gate master can't dispatch cmd " .. (cmd or nil)) + skynet.ret(skynet.pack({ + ok = false, + })) + return + end + if session == 0 then + f(source, ...) + else + skynet.ret(skynet.pack(f(source, ...))) + end + end) +end + +local function launch_slave() + + local function unforward(c) + if c.agent then + forwarding[c.agent] = nil + c.agent = nil + c.client = nil + end + end + + local function close_fd(fd) + local c = connection[fd] + if c then + unforward(c) + connection[fd] = nil + client_number = client_number - 1 + end + end + + local handler = {} + + function handler.connect(fd) + skynet.error("ws connect from: " .. tostring(fd)) + if client_number >= maxclient then + socketdriver.close(fd) + return + end + if nodelay then + socketdriver.nodelay(fd) + end + + client_number = client_number + 1 + local addr = websocket.addrinfo(fd) + local c = { + fd = fd, + ip = addr, + } + connection[fd] = c + + skynet.send(watchdog, "lua", "socket", "open", fd, addr, skynet.self()) + end + + function handler.handshake(fd, header, url) + local addr = websocket.addrinfo(fd) + skynet.error("ws handshake from: " .. tostring(fd), "url", url, "addr:", addr) + skynet.error("----header-----") + for k, v in pairs(header) do + skynet.error(k, v) + end + skynet.error("--------------") + end + + function handler.message(fd, msg) + skynet.error("ws ping from: " .. tostring(fd), msg .. "\n") + -- recv a package, forward it + local c = connection[fd] + local agent = c and c.agent + -- msg is string + if agent then + skynet.redirect(agent, c.client, "client", fd, msg) + else + skynet.send(watchdog, "lua", "socket", "data", fd, msg) + end + end + + function handler.ping(fd) + skynet.error("ws ping from: " .. tostring(fd) .. "\n") + end + + function handler.pong(fd) + skynet.error("ws pong from: " .. tostring(fd)) + end + + function handler.close(fd, code, reason) + skynet.error("ws close from: " .. tostring(fd), code, reason) + close_fd(fd) + skynet.send(watchdog, "lua", "socket", "close", fd) + end + + function handler.error(fd) + skynet.error("ws error from: " .. tostring(fd)) + close_fd(fd) + skynet.send(watchdog, "lua", "socket", "error", fd, msg) + end + + function handler.warning(fd, size) + skynet.send(watchdog, "lua", "socket", "warning", fd, size) + end + + local CMD_SLAVE = {} + function CMD_SLAVE.forward(source, fd, client, address) + local c = assert(connection[fd]) + unforward(c) + c.client = client or 0 + c.agent = address or source + forwarding[c.agent] = c + end + + function CMD_SLAVE.response(source, fd, msg) + skynet.error("ws response: " .. tostring(fd), msg .. "\n") + -- forward msg + websocket.write(fd, msg) + end + + function CMD_SLAVE.kick(source, fd) + websocket.close(fd) + end + + function CMD_SLAVE.accept(source, fd, addr) + return websocket.accept(fd, handler, protocol, addr) + end + + function CMD_SLAVE.open(source, conf) + maxclient = conf.maxclient or 1024 + nodelay = conf.nodelay + protocol = conf.protocol or "ws" + watchdog = conf.watchdog + skynet.error("open", watchdog, source) + end + + skynet.dispatch("lua", function(session, source, cmd, ...) + local f = CMD_SLAVE[cmd] + if not f then + skynet.error("ws gate slave can't dispatch cmd " .. (cmd or nil)) + skynet.ret(skynet.pack({ + ok = false, + })) + return + end + if session == 0 then + f(source, ...) + else + skynet.ret(skynet.pack(f(source, ...))) + end + end) +end + +skynet.start(function() + if slave_name then + skynet.error("start ws_gate slave:", slave_name) + skynet.register(slave_name) + launch_slave() + else + skynet.error("start ws_gate master:", master_name) + skynet.register(master_name) + launch_master() + end +end) + diff --git a/framework/service/ws_watchdog.lua b/framework/service/ws_watchdog.lua new file mode 100755 index 0000000..1fada2b --- /dev/null +++ b/framework/service/ws_watchdog.lua @@ -0,0 +1,79 @@ +local skynet = require "skynet" + +local CMD = {} +local SOCKET = {} +local master_gate +local agent = {} +local protocol +local fd2gate = {} + +function SOCKET.open(fd, addr, gate) + skynet.error("New client from : " .. addr) + fd2gate[fd] = gate + agent[fd] = skynet.newservice("ws_agent") + skynet.call(agent[fd], "lua", "start", { + gate = gate, + client = fd, + watchdog = skynet.self(), + protocol = protocol, + addr = addr, + }) +end + +local function close_agent(fd) + local a = agent[fd] + agent[fd] = nil + if a then + local gate = fd2gate[fd] + if gate then + skynet.call(gate, "lua", "kick", fd) + fd2gate[fd] = nil + end + -- disconnect never return + skynet.send(a, "lua", "disconnect") + end +end + +function SOCKET.close(fd) + print("socket close",fd) + close_agent(fd) +end + +function SOCKET.error(fd, msg) + print("socket error",fd, msg) + close_agent(fd) +end + +function SOCKET.warning(fd, size) + -- size K bytes havn't send out in fd + print("socket warning", fd, size) +end + +function SOCKET.data(fd, msg) + print("socket data", fd, msg) +end + +function CMD.start(conf) + protocol = conf.protocol + skynet.call(master_gate, "lua", "open" , conf) +end + +function CMD.close(fd) + close_agent(fd) +end + +skynet.start(function() + skynet.dispatch("lua", function(session, source, cmd, subcmd, ...) + if cmd == "socket" then + local f = SOCKET[subcmd] + f(...) + -- socket api don't need return + else + local f = assert(CMD[cmd]) + skynet.ret(skynet.pack(f(subcmd, ...))) + end + end) + + master_gate = skynet.newservice("ws_gate") +end) +