You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
268 lines
6.7 KiB
Lua
268 lines
6.7 KiB
Lua
local skynet = require "skynet"
|
|
require "skynet.manager"
|
|
local cluster = require "skynet.cluster.core"
|
|
|
|
local config_name = skynet.getenv "cluster"
|
|
local node_address = {}
|
|
local node_sender = {}
|
|
local node_sender_closed = {}
|
|
local command = {}
|
|
local config = {}
|
|
local nodename = cluster.nodename()
|
|
|
|
local connecting = {}
|
|
|
|
local function open_channel(t, key)
|
|
local ct = connecting[key]
|
|
if ct then
|
|
local co = coroutine.running()
|
|
local channel
|
|
while ct do
|
|
table.insert(ct, co)
|
|
skynet.wait(co)
|
|
channel = ct.channel
|
|
ct = connecting[key]
|
|
-- reload again if ct ~= nil
|
|
end
|
|
return assert(node_address[key] and channel)
|
|
end
|
|
ct = {}
|
|
connecting[key] = ct
|
|
local address = node_address[key]
|
|
if address == nil and not config.nowaiting then
|
|
local co = coroutine.running()
|
|
assert(ct.namequery == nil)
|
|
ct.namequery = co
|
|
skynet.error("Waiting for cluster node [".. key.."]")
|
|
skynet.wait(co)
|
|
address = node_address[key]
|
|
end
|
|
local succ, err, c
|
|
if address then
|
|
local host, port = string.match(address, "([^:]+):(.*)$")
|
|
c = node_sender[key]
|
|
if c == nil then
|
|
c = skynet.newservice("clustersender", key, nodename, host, port)
|
|
if node_sender[key] then
|
|
-- double check
|
|
skynet.kill(c)
|
|
c = node_sender[key]
|
|
else
|
|
node_sender[key] = c
|
|
end
|
|
end
|
|
|
|
succ = pcall(skynet.call, c, "lua", "changenode", host, port)
|
|
|
|
if succ then
|
|
t[key] = c
|
|
ct.channel = c
|
|
node_sender_closed[key] = nil
|
|
else
|
|
err = string.format("changenode [%s] (%s:%s) failed", key, host, port)
|
|
end
|
|
elseif address == false then
|
|
c = node_sender[key]
|
|
if c == nil or node_sender_closed[key] then
|
|
-- no sender or closed, always succ
|
|
succ = true
|
|
else
|
|
-- trun off the sender
|
|
succ, err = pcall(skynet.call, c, "lua", "changenode", false)
|
|
if succ then --trun off failed, wait next index todo turn off
|
|
node_sender_closed[key] = true
|
|
end
|
|
end
|
|
else
|
|
err = string.format("cluster node [%s] is absent.", key)
|
|
end
|
|
connecting[key] = nil
|
|
for _, co in ipairs(ct) do
|
|
skynet.wakeup(co)
|
|
end
|
|
if node_address[key] ~= address then
|
|
return open_channel(t,key)
|
|
end
|
|
assert(succ, err)
|
|
return c
|
|
end
|
|
|
|
local node_channel = setmetatable({}, { __index = open_channel })
|
|
|
|
local function loadconfig(tmp)
|
|
if tmp == nil then
|
|
tmp = {}
|
|
if config_name then
|
|
local f = assert(io.open(config_name))
|
|
local source = f:read "*a"
|
|
f:close()
|
|
assert(load(source, "@"..config_name, "t", tmp))()
|
|
end
|
|
end
|
|
local reload = {}
|
|
for name,address in pairs(tmp) do
|
|
if name:sub(1,2) == "__" then
|
|
name = name:sub(3)
|
|
config[name] = address
|
|
skynet.error(string.format("Config %s = %s", name, address))
|
|
else
|
|
assert(address == false or type(address) == "string")
|
|
if node_address[name] ~= address then
|
|
-- address changed
|
|
if node_sender[name] then
|
|
-- reset connection if node_sender[name] exist
|
|
node_channel[name] = nil
|
|
table.insert(reload, name)
|
|
end
|
|
node_address[name] = address
|
|
end
|
|
local ct = connecting[name]
|
|
if ct and ct.namequery and not config.nowaiting then
|
|
skynet.error(string.format("Cluster node [%s] resloved : %s", name, address))
|
|
skynet.wakeup(ct.namequery)
|
|
end
|
|
end
|
|
end
|
|
if config.nowaiting then
|
|
-- wakeup all connecting request
|
|
for name, ct in pairs(connecting) do
|
|
if ct.namequery then
|
|
skynet.wakeup(ct.namequery)
|
|
end
|
|
end
|
|
end
|
|
for _, name in ipairs(reload) do
|
|
-- open_channel would block
|
|
skynet.fork(open_channel, node_channel, name)
|
|
end
|
|
end
|
|
|
|
function command.reload(source, config)
|
|
loadconfig(config)
|
|
skynet.ret(skynet.pack(nil))
|
|
end
|
|
|
|
function command.listen(source, addr, port, maxclient)
|
|
local gate = skynet.newservice("gate")
|
|
if port == nil then
|
|
local address = assert(node_address[addr], addr .. " is down")
|
|
addr, port = string.match(address, "(.+):([^:]+)$")
|
|
port = tonumber(port)
|
|
assert(port ~= 0)
|
|
skynet.call(gate, "lua", "open", { address = addr, port = port, maxclient = maxclient })
|
|
skynet.ret(skynet.pack(addr, port))
|
|
else
|
|
local realaddr, realport = skynet.call(gate, "lua", "open", { address = addr, port = port, maxclient = maxclient })
|
|
skynet.ret(skynet.pack(realaddr, realport))
|
|
end
|
|
end
|
|
|
|
function command.sender(source, node)
|
|
skynet.ret(skynet.pack(node_channel[node]))
|
|
end
|
|
|
|
function command.senders(source)
|
|
skynet.retpack(node_sender)
|
|
end
|
|
|
|
local proxy = {}
|
|
|
|
function command.proxy(source, node, name)
|
|
if name == nil then
|
|
node, name = node:match "^([^@.]+)([@.].+)"
|
|
if name == nil then
|
|
error ("Invalid name " .. tostring(node))
|
|
end
|
|
end
|
|
local fullname = node .. "." .. name
|
|
local p = proxy[fullname]
|
|
if p == nil then
|
|
p = skynet.newservice("clusterproxy", node, name)
|
|
-- double check
|
|
if proxy[fullname] then
|
|
skynet.kill(p)
|
|
p = proxy[fullname]
|
|
else
|
|
proxy[fullname] = p
|
|
end
|
|
end
|
|
skynet.ret(skynet.pack(p))
|
|
end
|
|
|
|
local cluster_agent = {} -- fd:service
|
|
local register_name = {}
|
|
|
|
local function clearnamecache()
|
|
for fd, service in pairs(cluster_agent) do
|
|
if type(service) == "number" then
|
|
skynet.send(service, "lua", "namechange")
|
|
end
|
|
end
|
|
end
|
|
|
|
function command.register(source, name, addr)
|
|
assert(register_name[name] == nil)
|
|
addr = addr or source
|
|
local old_name = register_name[addr]
|
|
if old_name then
|
|
register_name[old_name] = nil
|
|
clearnamecache()
|
|
end
|
|
register_name[addr] = name
|
|
register_name[name] = addr
|
|
skynet.ret(nil)
|
|
skynet.error(string.format("Register [%s] :%08x", name, addr))
|
|
end
|
|
|
|
function command.unregister(_, name)
|
|
if not register_name[name] then
|
|
return skynet.ret(nil)
|
|
end
|
|
local addr = register_name[name]
|
|
register_name[addr] = nil
|
|
register_name[name] = nil
|
|
clearnamecache()
|
|
skynet.ret(nil)
|
|
skynet.error(string.format("Unregister [%s] :%08x", name, addr))
|
|
end
|
|
|
|
function command.queryname(source, name)
|
|
skynet.ret(skynet.pack(register_name[name]))
|
|
end
|
|
|
|
function command.socket(source, subcmd, fd, msg)
|
|
if subcmd == "open" then
|
|
skynet.error(string.format("socket accept from %s", msg))
|
|
-- new cluster agent
|
|
cluster_agent[fd] = false
|
|
local agent = skynet.newservice("clusteragent", skynet.self(), source, fd)
|
|
local closed = cluster_agent[fd]
|
|
cluster_agent[fd] = agent
|
|
if closed then
|
|
skynet.send(agent, "lua", "exit")
|
|
cluster_agent[fd] = nil
|
|
end
|
|
else
|
|
if subcmd == "close" or subcmd == "error" then
|
|
-- close cluster agent
|
|
local agent = cluster_agent[fd]
|
|
if type(agent) == "boolean" then
|
|
cluster_agent[fd] = true
|
|
elseif agent then
|
|
skynet.send(agent, "lua", "exit")
|
|
cluster_agent[fd] = nil
|
|
end
|
|
else
|
|
skynet.error(string.format("socket %s %d %s", subcmd, fd, msg or ""))
|
|
end
|
|
end
|
|
end
|
|
|
|
skynet.start(function()
|
|
loadconfig()
|
|
skynet.dispatch("lua", function(session , source, cmd, ...)
|
|
local f = assert(command[cmd])
|
|
f(source, ...)
|
|
end)
|
|
end)
|