You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

121 lines
2.8 KiB
Lua

-- 消息分发模块
-- jack
-- 2017-06-20
--
local semaphore = require "ngx.semaphore"
local _M = { _VERSION = '0.01' }
local mt = {__index = _M}
local workerId = ngx.worker.id()
local cjson = require "cjson"
local mask = 2^34
local server_mask = 2^38
local redis = require "redis"
local util = require "utils"
local cmsgpack = require "cmsgpack"
local global = require "global"
local SERVER_ID = global.server_id
require "functions"
local messageList = {}
local incrId = 0
local smd = ngx.shared.msgqueue
local semaMap = {}
local function getWorkerId(sessionId)
return math.floor(tonumber(sessionId%server_mask)/mask)
end
local function getServerId(sessionId)
return math.floor(tonumber(sessionId)/server_mask)
end
_M.getWorkerId = getWorkerId
_M.getServerId = getServerId
function _M.dispatchToAll(message)
for sessionId,v in pairs(messageList) do
table.insert(v,message)
_M.wakeUp(sessionId)
end
end
function _M.dispatchToSession(sessionId,message)
ngx.log(ngx.INFO,"dispatchToSession=",cjson.encode(message))
if ( messageList[sessionId] ~= nil ) then
table.insert(messageList[sessionId],message)
_M.wakeUp(sessionId)
end
end
function _M.getSessionId()
local sessionId = SERVER_ID*server_mask+workerId * mask + incrId
incrId = incrId + 1
incrId=incrId%mask
messageList[sessionId] = {}
local sId = getServerId(sessionId)
local wId = getWorkerId(sessionId)
return sessionId
end
function _M.getSemaphore(sessionId)
if ( semaMap[sessionId] == nil ) then
semaMap[sessionId] = semaphore.new(0)
end
return semaMap[sessionId]
end
function _M.wakeUp(sessionId)
if ( semaMap[sessionId] ~= nil ) then
ngx.log(ngx.INFO,"wake up:",sessionId)
semaMap[sessionId]:post(1)
end
end
function _M.dispatch(sessionId,message)
local sId = getServerId(sessionId)
local wId = getWorkerId(sessionId)
if sId==SERVER_ID then
if wId == workerId then
_M.dispatchToSession(sessionId,message)
else
local k = string.format("msgqueue-%d",wId)
local len,err=smd:lpush(k,cjson.encode({sessionid=sessionId,message=message}))
end
else
ngx.log(ngx.ERR,"消息分发错误")
end
end
---
-- 销毁会话
function _M.destory(sessionId)
messageList[sessionId] = nil
semaMap[sessionId] = nil
end
---
-- 根据会话ID获取消息
--
function _M.getMessage(sessionId)
if ( messageList[sessionId] ~= nil ) then
local messages = {}
table.foreach(messageList[sessionId],
function(i, v)
table.insert(messages,v)
messageList[sessionId][i] = nil
end
)
return messages
end
end
return _M