You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

126 lines
3.2 KiB
Lua

local skynet = require "skynet"
local socket = require "skynet.socket"
--[[
master manage data :
1. all the slaves address : id -> ipaddr:port
2. all the global names : name -> address
master hold connections from slaves .
protocol slave->master :
package size 1 byte
type 1 byte :
'H' : HANDSHAKE, report slave id, and address.
'R' : REGISTER name address
'Q' : QUERY name
protocol master->slave:
package size 1 byte
type 1 byte :
'W' : WAIT n
'C' : CONNECT slave_id slave_address
'N' : NAME globalname address
'D' : DISCONNECT slave_id
]]
local slave_node = {}
local global_name = {}
local function read_package(fd)
local sz = socket.read(fd, 1)
assert(sz, "closed")
sz = string.byte(sz)
local content = assert(socket.read(fd, sz), "closed")
return skynet.unpack(content)
end
local function pack_package(...)
local message = skynet.packstring(...)
local size = #message
assert(size <= 255 , "too long")
return string.char(size) .. message
end
local function report_slave(fd, slave_id, slave_addr)
local message = pack_package("C", slave_id, slave_addr)
local n = 0
for k,v in pairs(slave_node) do
if v.fd ~= 0 then
socket.write(v.fd, message)
n = n + 1
end
end
socket.write(fd, pack_package("W", n))
end
local function handshake(fd)
local t, slave_id, slave_addr = read_package(fd)
assert(t=='H', "Invalid handshake type " .. t)
assert(slave_id ~= 0 , "Invalid slave id 0")
if slave_node[slave_id] then
error(string.format("Slave %d already register on %s", slave_id, slave_node[slave_id].addr))
end
report_slave(fd, slave_id, slave_addr)
slave_node[slave_id] = {
fd = fd,
id = slave_id,
addr = slave_addr,
}
return slave_id , slave_addr
end
local function dispatch_slave(fd)
local t, name, address = read_package(fd)
if t == 'R' then
-- register name
assert(type(address)=="number", "Invalid request")
if not global_name[name] then
global_name[name] = address
end
local message = pack_package("N", name, address)
for k,v in pairs(slave_node) do
socket.write(v.fd, message)
end
elseif t == 'Q' then
-- query name
local address = global_name[name]
if address then
socket.write(fd, pack_package("N", name, address))
end
else
skynet.error("Invalid slave message type " .. t)
end
end
local function monitor_slave(slave_id, slave_address)
local fd = slave_node[slave_id].fd
skynet.error(string.format("Harbor %d (fd=%d) report %s", slave_id, fd, slave_address))
while pcall(dispatch_slave, fd) do end
skynet.error("slave " ..slave_id .. " is down")
local message = pack_package("D", slave_id)
slave_node[slave_id].fd = 0
for k,v in pairs(slave_node) do
socket.write(v.fd, message)
end
socket.close(fd)
end
skynet.start(function()
local master_addr = skynet.getenv "standalone"
skynet.error("master listen socket " .. tostring(master_addr))
local fd = socket.listen(master_addr)
socket.start(fd , function(id, addr)
skynet.error("connect from " .. addr .. " " .. id)
socket.start(id)
local ok, slave, slave_addr = pcall(handshake, id)
if ok then
skynet.fork(monitor_slave, slave, slave_addr)
else
skynet.error(string.format("disconnect fd = %d, error = %s", id, slave))
socket.close(id)
end
end)
end)