local skynet = require "skynet" require "skynet.manager" local redis = require "skynet.db.redis" local settings = require "settings" local cjson = require "cjson" local mongohelper = require "mongohelper" local index = ... local CMD = {} local db = nil local lasttime local function do_func(data) return mongohelper.exec(data.cmd, data.table_name, data.cname, data.data, data.conds) end local function lpoprpush(queue, queuebakup) local script = [[ local v = redis.call('lpop', ARGV[1]) if not v then return nil end redis.call('rpush', ARGV[2], v) return v ]] local t = db:eval(script, 2, "k2", "k1", queuebakup, queue) return t end local function copyback(queue, queuebakup) local t = lpoprpush(queue, queuebakup) while t do t = lpoprpush(queue, queuebakup) end end function CMD.start(cnf) local ok, d = pcall(redis.connect, cnf) if ok then db = d else ERROR("---redis connect error---", inspect(cnf) ) end local k1 = settings.mq_key..index local k2 = settings.mq_key_backup..index copyback(k1, k2) skynet.fork(function() while true do if db then local res = db:brpoplpush(k1, k2, 10) if res then local ok, ret = pcall(cjson.decode, res) if ok then -- INFO("brpoplpush ret = ", DUMP(ret)) local r = do_func(ret) if not r then break end db:lrem(k2, -1, res) end end if not lasttime or lasttime < os.time() - 10 then lasttime = os.time() db:ping() end else skynet.sleep(1000) end end end) end skynet.start(function() skynet.dispatch("lua", function(_, _, cmd, ...) local f = assert(CMD[cmd], cmd .. "not found") skynet.retpack(f(...)) end) end)