You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

251 lines
6.6 KiB
Lua

local skynet = require "skynet"
local crypt = require "skynet.crypt"
local cjson = require "cjson"
local cluster = require "skynet.cluster"
local msgutils = require "msgutils"
local settings = require "settings"
require "skynet.manager"
local name =...
local LOGOUT_TIME = 10 * 60
local CMD = {}
local SOCKET = {}
local gate
local agent = {}
local user_secret = {}
local session_map = {}
local subid=0
local agent_pool = {}
local function get_pool_size()
local n = 0
for _, v in pairs(agent_pool) do
n = n + 1
end
return n
end
local function del_agent_from_pool()
local removes = {}
for id, v in pairs(agent_pool) do
if v.logout_time and os.time() - v.logout_time >= LOGOUT_TIME then
table.insert(removes, id)
end
end
for _, id in ipairs(removes) do
local a = agent_pool[id].agent
agent_pool[id].is_offline = true
pcall(skynet.call, a, "lua", "exit")
agent_pool[id] = nil
end
end
local function checkPool()
del_agent_from_pool()
end
local function get_user_by_session(fd)
for k,v in pairs(session_map) do
if k == fd then
return v.userid
end
end
end
local function get_fd_by_userid(userid)
for k,v in pairs(session_map) do
if v.userid == userid then
return k
end
end
end
function SOCKET.open(fd, addr)
DEBUG("New client from : " , addr, " fd = ", fd)
skynet.call(gate, "lua", "accept", fd)
end
local function close_agent(fd)
skynet.error("close agent fd=",fd)
local id = get_user_by_session(fd)
session_map[fd]=nil
if agent_pool[id] then
local a = agent_pool[id].agent
if a then
skynet.call(gate, "lua", "kick", fd)
-- disconnect never return
skynet.send(a, "lua", "disconnect")
agent_pool[id].logout_time = os.time()
end
end
end
function SOCKET.close(fd)
skynet.error("socket close",fd)
close_agent(fd)
end
function SOCKET.error(fd, msg)
skynet.error("socket error",fd, msg)
close_agent(fd)
end
function SOCKET.warning(fd, size)
-- size K bytes havn't send out in fd
-- skynet.error("socket warning", fd, size)
end
function SOCKET.data(fd, msg)
skynet.error("=========> watchdog socket data", msg)
-- local ok, cmd = pcall(cjson.decode, msg)
local ok, cmd = msgutils.decode(msg)
if not ok then
skynet.error("命令数据错误87")
pcall(skynet.call, gate, "lua","kick",fd)
return
end
local data = cmd.data
local uid = data.uid
local mysubid = data.subid
local token = data.token
if not uid or not mysubid or not token then
skynet.error("命令数据错误100")
pcall(skynet.call, gate, "lua","kick",fd)
return
end
-- if not settings.debug then
-- local secret_info = user_secret[tostring(uid)]
-- if not secret_info then
-- skynet.error("命令数据错误101")
-- pcall(skynet.call, gate, "lua", "kick", fd)
-- return
-- end
-- local handshake = string.format("%s@%s#%s", crypt.base64encode(uid), crypt.base64encode(name),
-- crypt.base64encode(mysubid))
-- local hmac = crypt.hmac64(crypt.hashkey(handshake), secret_info.secret)
-- if crypt.base64encode(hmac) ~= token then
-- skynet.error("token 数据错误")
-- pcall(skynet.call, gate, "lua", "kick", fd)
-- return
-- end
-- end
-- local oldfd = get_fd_by_userid(id)
-- if oldfd then
-- skynet.error("id = ", id , " oldfd = ", oldfd)
-- close_agent(oldfd)
-- end
skynet.error(string.format("用户登录%s, %s成功!!!!!",tostring(uid),tostring(mysubid)))
session_map[fd] = { fd = fd, userid = uid, subid = mysubid }
--创建一个agent
if agent_pool[uid] then
if agent_pool[uid].is_offline then
ERROR("offline wait")
pcall(skynet.call, gate, "lua", "kick", fd)
return
end
local a = agent_pool[uid].agent
DEBUG("在agent 连接池中取一个agent uid = ", uid, " fd = ", fd, " a = ", a)
agent[fd] = a
agent_pool[uid].logout_time = nil
skynet.call(a, "lua", "reconnect", fd)
else
local size = get_pool_size()
if size >= 1000 then
del_agent_from_pool()
end
local a = skynet.newservice("agent", uid)
DEBUG("new一个 新的agent a = ", a, " fd = ", fd)
agent_pool[uid] = { agent = a, is_offline = false }
local ok = skynet.call(a, "lua", "start", {
server = name,
gate = gate,
client = fd,
watchdog = skynet.self(),
uid = uid,
})
end
end
function CMD.start(conf)
skynet.call(gate, "lua", "open" , conf)
end
function CMD.close(fd)
skynet.error("关闭 fd:",fd)
close_agent(fd)
end
function CMD.login(uid,secret)
-- you may use secret to make a encrypted data stream
-- skynet.error("uid=",uid,",secret=",secret)
-- skynet.error(string.format("%s is login secret is %s", uid,crypt.hexencode(secret)))
-- userid = uid
-- you may load user data from database
INFO(string.format("%s is login secret is %s", uid, secret))
subid=subid+1
user_secret[uid]={secret=secret,subid=subid,}
return subid
end
function CMD.kick( uid, last_subid )
skynet.error("watchdog kick: ", uid, ", ", last_subid)
pcall(cluster.call, "loginserver", ".sessionmgr", "del_user_online", uid)
if user_secret[uid] ~= nil and user_secret[uid].subid==last_subid then
local fd = get_fd_by_userid(uid)
if fd then
skynet.error("关闭之前登录用户得链接")
close_agent(fd)
end
user_secret[uid]=nil
end
skynet.error("kick over")
return "ok"
end
skynet.start(function()
skynet.dispatch("lua", function(session, source, cmd, subcmd, ...)
if cmd == "socket" then
skynet.error("watchdog socket subcmd = ", subcmd)
local f = SOCKET[subcmd]
f(...)
-- socket api don't need return
else
skynet.error("watchdog cmd = ", cmd)
local f = assert(CMD[cmd])
skynet.ret(skynet.pack(f(subcmd, ...)))
end
end)
skynet.fork(function()
while true do
checkPool()
skynet.sleep(300)
end
end)
-- gate = skynet.newservice("gate")
--切换websocket 协议
gate = skynet.newservice("wsgate")
skynet.register('.' .. SERVICE_NAME)
end)