You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
282 lines
7.5 KiB
Lua
282 lines
7.5 KiB
Lua
local redis_c = require "resty.redis"
|
|
local ngx_now = ngx.now
|
|
local ngx_sleep = ngx.sleep
|
|
|
|
local ok, new_tab = pcall(require, "table.new")
|
|
if not ok or type(new_tab) ~= "function" then
|
|
new_tab = function (narr, nrec) return {} end
|
|
end
|
|
|
|
|
|
local _M = new_tab(0, 155)
|
|
_M._VERSION = '0.01'
|
|
|
|
|
|
local commands = {
|
|
"append", "auth", "bgrewriteaof",
|
|
"bgsave", "bitcount", "bitop","bitpos",
|
|
"blpop", "brpop",
|
|
"brpoplpush", "client", "config",
|
|
"dbsize",
|
|
"debug", "decr", "decrby",
|
|
"del", "discard", "dump",
|
|
"echo",
|
|
"eval", "exec", "exists",
|
|
"expire", "expireat", "flushall",
|
|
"flushdb", "get", "getbit",
|
|
"getrange", "getset", "hdel",
|
|
"hexists", "hget", "hgetall",
|
|
"hincrby", "hincrbyfloat", "hkeys",
|
|
"hlen",
|
|
"hmget", "hmset", "hscan",
|
|
"hset",
|
|
"hsetnx", "hvals", "incr",
|
|
"incrby", "incrbyfloat", "info",
|
|
"keys",
|
|
"lastsave", "lindex", "linsert",
|
|
"llen", "lpop", "lpush",
|
|
"lpushx", "lrange", "lrem",
|
|
"lset", "ltrim", "mget",
|
|
"migrate",
|
|
"monitor", "move", "mset",
|
|
"msetnx", "multi", "object",
|
|
"persist", "pexpire", "pexpireat",
|
|
"ping", "psetex", "psubscribe",
|
|
"pttl",
|
|
"publish", --[[ "punsubscribe", ]] "pubsub",
|
|
"quit",
|
|
"randomkey", "rename", "renamenx",
|
|
"restore",
|
|
"rpop", "rpoplpush", "rpush",
|
|
"rpushx", "sadd", "save",
|
|
"scan", "scard", "script",
|
|
"sdiff", "sdiffstore",
|
|
"select", "set", "setbit",
|
|
"setex", "setnx", "setrange",
|
|
"shutdown", "sinter", "sinterstore",
|
|
"sismember", "slaveof", "slowlog",
|
|
"smembers", "smove", "sort",
|
|
"spop", "srandmember", "srem",
|
|
"sscan",
|
|
"strlen", --[[ "subscribe", ]] "sunion",
|
|
"sunionstore", "sync", "time",
|
|
"ttl",
|
|
"type", --[[ "unsubscribe", ]] "unwatch",
|
|
"watch", "zadd", "zcard",
|
|
"zcount", "zincrby", "zinterstore",
|
|
"zrange", "zrangebyscore", "zrank",
|
|
"zrem", "zremrangebyrank", "zremrangebyscore",
|
|
"zrevrange", "zrevrangebyscore", "zrevrank",
|
|
"zscan",
|
|
"zscore", "zunionstore", "evalsha"
|
|
}
|
|
|
|
|
|
local mt = { __index = _M }
|
|
|
|
|
|
local function is_redis_null( res )
|
|
if type(res) == "table" then
|
|
for k,v in pairs(res) do
|
|
if v ~= ngx.null then
|
|
return false
|
|
end
|
|
end
|
|
return true
|
|
elseif res == ngx.null then
|
|
return true
|
|
elseif res == nil then
|
|
return true
|
|
end
|
|
|
|
return false
|
|
end
|
|
|
|
local function defaultReplyValue(originValue, defaultValue)
|
|
if originValue and originValue ~= null and originValue ~= ngx.null then
|
|
return originValue
|
|
end
|
|
return defaultValue
|
|
end
|
|
|
|
|
|
|
|
-- change connect address as you need
|
|
function _M.connect_mod( self, redis )
|
|
redis:set_timeout(self.timeout)
|
|
return redis:connect(self.host, self.port)
|
|
end
|
|
|
|
|
|
function _M.set_keepalive_mod( redis )
|
|
-- put it into the connection pool of size 100, with 60 seconds max idle time
|
|
return redis:set_keepalive(60000, 1000)
|
|
end
|
|
|
|
|
|
function _M.lockProcess( self,key,proc )
|
|
local timeout = 10
|
|
local lock = nil
|
|
while not lock do
|
|
lock = self:setnx(key, ngx_now() * 1000 + timeout * 1000 + 1)
|
|
if defaultReplyValue(lock, nil) then break end
|
|
local locktime = self:get(key)
|
|
if ngx_now() * 1000 > tonumber(defaultReplyValue(locktime, nil) or 0) then -- if lock timeout, try to get lock.
|
|
local origin_locktime = self:getset(key, ngx_now() * 1000 + timeout * 1000 + 1) -- set new lock timeout.
|
|
if ngx_now() * 1000 > tonumber(defaultReplyValue(origin_locktime, nil) or 0) then break end -- if origin lock timeout, lock get.
|
|
end
|
|
ngx_sleep(timeout)
|
|
end
|
|
pcall(proc, self)
|
|
if ngx_now() * 1000 < tonumber(defaultReplyValue(self:get(key), nil) or 0) then
|
|
self:del(key)
|
|
end
|
|
end
|
|
|
|
function _M.init_pipeline( self )
|
|
self._reqs = {}
|
|
end
|
|
|
|
function _M.commit_pipeline( self )
|
|
local reqs = self._reqs
|
|
|
|
if nil == reqs or 0 == #reqs then
|
|
return {}, "no pipeline"
|
|
else
|
|
self._reqs = nil
|
|
end
|
|
|
|
local redis, err = redis_c:new()
|
|
if not redis then
|
|
return nil, err
|
|
end
|
|
|
|
local ok, err = self:connect_mod(redis)
|
|
if not ok then
|
|
return {}, err
|
|
end
|
|
|
|
redis:init_pipeline()
|
|
for _, vals in ipairs(reqs) do
|
|
local fun = redis[vals[1]]
|
|
table.remove(vals , 1)
|
|
|
|
fun(redis, unpack(vals))
|
|
end
|
|
|
|
local results, err = redis:commit_pipeline()
|
|
if not results or err then
|
|
return {}, err
|
|
end
|
|
|
|
if is_redis_null(results) then
|
|
results = {}
|
|
ngx.log(ngx.WARN, "is null")
|
|
end
|
|
-- table.remove (results , 1)
|
|
|
|
self.set_keepalive_mod(redis)
|
|
|
|
for i,value in ipairs(results) do
|
|
if is_redis_null(value) then
|
|
results[i] = nil
|
|
end
|
|
end
|
|
|
|
return results, err
|
|
end
|
|
|
|
|
|
function _M.subscribe( self, channel )
|
|
local redis, err = redis_c:new()
|
|
if not redis then
|
|
return nil, err
|
|
end
|
|
|
|
local ok, err = self:connect_mod(redis)
|
|
if not ok or err then
|
|
return nil, err
|
|
end
|
|
|
|
local res, err = redis:subscribe(channel)
|
|
if not res then
|
|
return nil, err
|
|
end
|
|
|
|
local function do_read_func ( do_read )
|
|
if do_read == nil or do_read == true then
|
|
res, err = redis:read_reply()
|
|
if not res then
|
|
return nil, err
|
|
end
|
|
return res
|
|
end
|
|
|
|
redis:unsubscribe(channel)
|
|
self.set_keepalive_mod(redis)
|
|
return
|
|
end
|
|
|
|
return do_read_func
|
|
end
|
|
|
|
|
|
local function do_command(self, cmd, ... )
|
|
if self._reqs then
|
|
table.insert(self._reqs, {cmd, ...})
|
|
return
|
|
end
|
|
|
|
local redis, err = redis_c:new()
|
|
if not redis then
|
|
return nil, err
|
|
end
|
|
|
|
local ok, err = self:connect_mod(redis)
|
|
if not ok or err then
|
|
return nil, err
|
|
end
|
|
|
|
local fun = redis[cmd]
|
|
local result, err = fun(redis, ...)
|
|
if not result or err then
|
|
-- ngx.log(ngx.ERR, "pipeline result:", result, " err:", err)
|
|
return nil, err
|
|
end
|
|
|
|
if is_redis_null(result) then
|
|
result = nil
|
|
end
|
|
|
|
self.set_keepalive_mod(redis)
|
|
|
|
return result, err
|
|
end
|
|
|
|
|
|
function _M.new(self, opts)
|
|
opts = opts or {}
|
|
local timeout = (opts.timeout and opts.timeout * 1000) or 5000
|
|
local db_index= opts.db_index or 0
|
|
local host = opts.host or '127.0.0.1'
|
|
local port = opts.port or 6379
|
|
|
|
for i = 1, #commands do
|
|
local cmd = commands[i]
|
|
_M[cmd] =
|
|
function (self, ...)
|
|
return do_command(self, cmd, ...)
|
|
end
|
|
end
|
|
|
|
return setmetatable({
|
|
timeout = timeout,
|
|
db_index = db_index,
|
|
port = port,
|
|
host = host,
|
|
_reqs = nil }, mt)
|
|
end
|
|
|
|
|
|
return _M
|