You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
251 lines
6.6 KiB
Lua
251 lines
6.6 KiB
Lua
local skynet = require "skynet"
|
|
local crypt = require "skynet.crypt"
|
|
local cjson = require "cjson"
|
|
local cluster = require "skynet.cluster"
|
|
local msgutils = require "msgutils"
|
|
local settings = require "settings"
|
|
require "skynet.manager"
|
|
local name =...
|
|
|
|
local LOGOUT_TIME = 10 * 60
|
|
|
|
local CMD = {}
|
|
local SOCKET = {}
|
|
local gate
|
|
local agent = {}
|
|
|
|
local user_secret = {}
|
|
local session_map = {}
|
|
|
|
local subid=0
|
|
|
|
local agent_pool = {}
|
|
|
|
|
|
local function get_pool_size()
|
|
local n = 0
|
|
for _, v in pairs(agent_pool) do
|
|
n = n + 1
|
|
end
|
|
return n
|
|
end
|
|
|
|
|
|
local function del_agent_from_pool()
|
|
local removes = {}
|
|
for id, v in pairs(agent_pool) do
|
|
if v.logout_time and os.time() - v.logout_time >= LOGOUT_TIME then
|
|
table.insert(removes, id)
|
|
end
|
|
end
|
|
for _, id in ipairs(removes) do
|
|
local a = agent_pool[id].agent
|
|
agent_pool[id].is_offline = true
|
|
pcall(skynet.call, a, "lua", "exit")
|
|
agent_pool[id] = nil
|
|
end
|
|
end
|
|
|
|
|
|
local function checkPool()
|
|
del_agent_from_pool()
|
|
end
|
|
|
|
local function get_user_by_session(fd)
|
|
for k,v in pairs(session_map) do
|
|
if k == fd then
|
|
return v.userid
|
|
end
|
|
end
|
|
end
|
|
|
|
local function get_fd_by_userid(userid)
|
|
for k,v in pairs(session_map) do
|
|
if v.userid == userid then
|
|
return k
|
|
end
|
|
end
|
|
end
|
|
|
|
|
|
|
|
function SOCKET.open(fd, addr)
|
|
DEBUG("New client from : " , addr, " fd = ", fd)
|
|
skynet.call(gate, "lua", "accept", fd)
|
|
end
|
|
|
|
local function close_agent(fd)
|
|
skynet.error("close agent fd=",fd)
|
|
local id = get_user_by_session(fd)
|
|
session_map[fd]=nil
|
|
|
|
if agent_pool[id] then
|
|
local a = agent_pool[id].agent
|
|
if a then
|
|
skynet.call(gate, "lua", "kick", fd)
|
|
-- disconnect never return
|
|
skynet.send(a, "lua", "disconnect")
|
|
agent_pool[id].logout_time = os.time()
|
|
end
|
|
end
|
|
end
|
|
|
|
function SOCKET.close(fd)
|
|
skynet.error("socket close",fd)
|
|
close_agent(fd)
|
|
end
|
|
|
|
function SOCKET.error(fd, msg)
|
|
skynet.error("socket error",fd, msg)
|
|
close_agent(fd)
|
|
end
|
|
|
|
function SOCKET.warning(fd, size)
|
|
-- size K bytes havn't send out in fd
|
|
-- skynet.error("socket warning", fd, size)
|
|
end
|
|
|
|
function SOCKET.data(fd, msg)
|
|
skynet.error("=========> watchdog socket data", msg)
|
|
-- local ok, cmd = pcall(cjson.decode, msg)
|
|
local ok, cmd = msgutils.decode(msg)
|
|
if not ok then
|
|
skynet.error("命令数据错误87")
|
|
pcall(skynet.call, gate, "lua","kick",fd)
|
|
return
|
|
end
|
|
|
|
local data = cmd.data
|
|
local uid = data.uid
|
|
local mysubid = data.subid
|
|
local token = data.token
|
|
if not uid or not mysubid or not token then
|
|
skynet.error("命令数据错误100")
|
|
pcall(skynet.call, gate, "lua","kick",fd)
|
|
return
|
|
end
|
|
|
|
-- if not settings.debug then
|
|
-- local secret_info = user_secret[tostring(uid)]
|
|
|
|
-- if not secret_info then
|
|
-- skynet.error("命令数据错误101")
|
|
-- pcall(skynet.call, gate, "lua", "kick", fd)
|
|
-- return
|
|
-- end
|
|
|
|
-- local handshake = string.format("%s@%s#%s", crypt.base64encode(uid), crypt.base64encode(name),
|
|
-- crypt.base64encode(mysubid))
|
|
-- local hmac = crypt.hmac64(crypt.hashkey(handshake), secret_info.secret)
|
|
|
|
-- if crypt.base64encode(hmac) ~= token then
|
|
-- skynet.error("token 数据错误")
|
|
-- pcall(skynet.call, gate, "lua", "kick", fd)
|
|
-- return
|
|
-- end
|
|
-- end
|
|
|
|
-- local oldfd = get_fd_by_userid(id)
|
|
-- if oldfd then
|
|
-- skynet.error("id = ", id , " oldfd = ", oldfd)
|
|
-- close_agent(oldfd)
|
|
-- end
|
|
|
|
skynet.error(string.format("用户登录%s, %s成功!!!!!",tostring(uid),tostring(mysubid)))
|
|
|
|
session_map[fd] = { fd = fd, userid = uid, subid = mysubid }
|
|
|
|
--创建一个agent
|
|
if agent_pool[uid] then
|
|
if agent_pool[uid].is_offline then
|
|
ERROR("offline wait")
|
|
pcall(skynet.call, gate, "lua", "kick", fd)
|
|
return
|
|
end
|
|
local a = agent_pool[uid].agent
|
|
DEBUG("在agent 连接池中取一个agent uid = ", uid, " fd = ", fd, " a = ", a)
|
|
agent[fd] = a
|
|
agent_pool[uid].logout_time = nil
|
|
skynet.call(a, "lua", "reconnect", fd)
|
|
else
|
|
local size = get_pool_size()
|
|
if size >= 1000 then
|
|
del_agent_from_pool()
|
|
end
|
|
local a = skynet.newservice("agent", uid)
|
|
DEBUG("new一个 新的agent a = ", a, " fd = ", fd)
|
|
agent_pool[uid] = { agent = a, is_offline = false }
|
|
local ok = skynet.call(a, "lua", "start", {
|
|
server = name,
|
|
gate = gate,
|
|
client = fd,
|
|
watchdog = skynet.self(),
|
|
uid = uid,
|
|
})
|
|
end
|
|
end
|
|
|
|
function CMD.start(conf)
|
|
skynet.call(gate, "lua", "open" , conf)
|
|
end
|
|
|
|
function CMD.close(fd)
|
|
skynet.error("关闭 fd:",fd)
|
|
close_agent(fd)
|
|
end
|
|
|
|
|
|
function CMD.login(uid,secret)
|
|
-- you may use secret to make a encrypted data stream
|
|
-- skynet.error("uid=",uid,",secret=",secret)
|
|
-- skynet.error(string.format("%s is login secret is %s", uid,crypt.hexencode(secret)))
|
|
-- userid = uid
|
|
-- you may load user data from database
|
|
INFO(string.format("%s is login secret is %s", uid, secret))
|
|
subid=subid+1
|
|
user_secret[uid]={secret=secret,subid=subid,}
|
|
return subid
|
|
end
|
|
|
|
|
|
function CMD.kick( uid, last_subid )
|
|
skynet.error("watchdog kick: ", uid, ", ", last_subid)
|
|
pcall(cluster.call, "loginserver", ".sessionmgr", "del_user_online", uid)
|
|
if user_secret[uid] ~= nil and user_secret[uid].subid==last_subid then
|
|
local fd = get_fd_by_userid(uid)
|
|
if fd then
|
|
skynet.error("关闭之前登录用户得链接")
|
|
close_agent(fd)
|
|
end
|
|
user_secret[uid]=nil
|
|
end
|
|
skynet.error("kick over")
|
|
return "ok"
|
|
end
|
|
|
|
skynet.start(function()
|
|
|
|
skynet.dispatch("lua", function(session, source, cmd, subcmd, ...)
|
|
if cmd == "socket" then
|
|
skynet.error("watchdog socket subcmd = ", subcmd)
|
|
local f = SOCKET[subcmd]
|
|
f(...)
|
|
-- socket api don't need return
|
|
else
|
|
skynet.error("watchdog cmd = ", cmd)
|
|
local f = assert(CMD[cmd])
|
|
skynet.ret(skynet.pack(f(subcmd, ...)))
|
|
end
|
|
end)
|
|
skynet.fork(function()
|
|
while true do
|
|
checkPool()
|
|
skynet.sleep(300)
|
|
end
|
|
end)
|
|
-- gate = skynet.newservice("gate")
|
|
--切换websocket 协议
|
|
gate = skynet.newservice("wsgate")
|
|
skynet.register('.' .. SERVICE_NAME)
|
|
end)
|