You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
199 lines
4.5 KiB
Lua
199 lines
4.5 KiB
Lua
local skynet = require "skynet"
|
|
local redis = require "skynet.db.redis"
|
|
local crypt = require "skynet.crypt"
|
|
|
|
local math_floor = math.floor
|
|
local math_ceil = math.ceil
|
|
local math_random = math.random
|
|
|
|
local function hash(script)
|
|
local key = crypt.sha1(script)
|
|
return crypt.hexencode(key)
|
|
end
|
|
|
|
local QUORUM
|
|
|
|
local SCRIPT = {
|
|
LOCK = [[
|
|
local key = KEYS[1]
|
|
if redis.call("exists", key) == 1 then
|
|
return 0
|
|
end
|
|
redis.call("set", key, ARGV[1], "PX", ARGV[2])
|
|
return 1
|
|
]],
|
|
UNLOCK = [[
|
|
local key = KEYS[1]
|
|
if redis.call("get", key) == ARGV[1] then
|
|
redis.pcall("del", key)
|
|
return 1
|
|
end
|
|
return 0
|
|
]],
|
|
EXTEND = [[
|
|
local key = KEYS[1]
|
|
if redis.call("get", key) ~= ARGV[1] then
|
|
return 0
|
|
end
|
|
redis.call("set", key, ARGV[1], "PX", ARGV[2])
|
|
return 1
|
|
]],
|
|
}
|
|
|
|
local SCRIPT_HASH = {
|
|
LOCK = hash(SCRIPT.LOCK),
|
|
UNLOCK = hash(SCRIPT.UNLOCK),
|
|
EXTEND = hash(SCRIPT.EXTEND),
|
|
}
|
|
|
|
local conf
|
|
local dbs = {}
|
|
local sessions = {}
|
|
|
|
local function execute_script(db, type, s)
|
|
local ok, ret = pcall(db["evalsha"], db, SCRIPT_HASH[type], 1, s.lockname, s.uuid, s.timeout)
|
|
if not ok and ret:find("NOSCRIPT") then
|
|
ok, ret = pcall(db["eval"], db, SCRIPT[type], 1, s.lockname, s.uuid, s.timeout)
|
|
end
|
|
if not ok then
|
|
skynet.error("redis execute_script err.", ret, s.lockname, s.uuid, s.timeout)
|
|
return false
|
|
end
|
|
if ret == 1 then
|
|
return true
|
|
end
|
|
return false
|
|
end
|
|
|
|
local function execute_script_timeout(db, type, s)
|
|
local co = coroutine.running()
|
|
local ok, ret = false, "timeout"
|
|
|
|
skynet.fork(function()
|
|
ok, ret = execute_script(db, type, s)
|
|
if co then
|
|
skynet.wakeup(co)
|
|
co = nil
|
|
end
|
|
end)
|
|
|
|
skynet.sleep(conf.request_timeout / 10)
|
|
if co then
|
|
co = nil
|
|
end
|
|
return ok, ret
|
|
end
|
|
|
|
local function calc_time(s)
|
|
local now = skynet.now() * 10
|
|
local drift = math_floor(conf.drift_factor * s.timeout) + 2
|
|
s.starttime = now
|
|
s.endtime = now + s.timeout - drift
|
|
end
|
|
|
|
local function make_session(lockname, uuid, timeout)
|
|
local s = {
|
|
lockname = lockname,
|
|
uuid = uuid,
|
|
timeout = timeout,
|
|
attempts = 0,
|
|
starttime = 0,
|
|
endtime = 0,
|
|
}
|
|
calc_time(s)
|
|
return s
|
|
end
|
|
|
|
local function unlock(s)
|
|
s.endtime = 0
|
|
for _, db in pairs(dbs) do
|
|
execute_script(db, "UNLOCK", s)
|
|
end
|
|
end
|
|
|
|
local function attempt(s, is_extend)
|
|
s.attempts = s.attempts + 1
|
|
local votes = 0
|
|
for _, db in pairs(dbs) do
|
|
local ok
|
|
if is_extend then
|
|
ok = execute_script_timeout(db, "EXTEND", s)
|
|
else
|
|
ok = execute_script_timeout(db, "LOCK", s)
|
|
end
|
|
if ok then
|
|
votes = votes + 1
|
|
end
|
|
end
|
|
|
|
local now = skynet.now() * 10
|
|
if votes >= QUORUM and s.endtime > now then
|
|
local ti = s.timeout / 3 - (now - s.starttime)
|
|
ti = math_floor(ti / 10)
|
|
if ti < 0 then
|
|
ti = 0
|
|
end
|
|
skynet.timeout(ti, function()
|
|
if s.endtime == 0 then
|
|
return
|
|
end
|
|
s.attempts = 0
|
|
calc_time(s)
|
|
attempt(s, true)
|
|
end)
|
|
return true
|
|
else
|
|
unlock(s)
|
|
-- retry
|
|
if conf.retry_count == -1 or s.attempts <= conf.retry_count then
|
|
local t = conf.retry_delay + math_floor((math_random() * 2 - 1) * conf.retry_jitter)
|
|
skynet.sleep(math_ceil(t / 10))
|
|
calc_time(s)
|
|
return attempt(s)
|
|
end
|
|
-- failed
|
|
sessions[s.uuid] = nil
|
|
return false, "timeout"
|
|
end
|
|
end
|
|
|
|
local CMD = {}
|
|
|
|
function CMD.lock(lockname, uuid, timeout)
|
|
timeout = timeout or conf.timeout
|
|
local s = sessions[uuid]
|
|
if s then
|
|
return false, "session exist"
|
|
end
|
|
s = make_session(lockname, uuid, timeout)
|
|
sessions[uuid] = s
|
|
|
|
return attempt(s)
|
|
end
|
|
|
|
function CMD.unlock(_, uuid)
|
|
local s = sessions[uuid]
|
|
if not s then
|
|
return false, "session not exist."
|
|
end
|
|
sessions[uuid] = nil
|
|
return unlock(s)
|
|
end
|
|
|
|
skynet.init(function()
|
|
conf = require "redlock_conf"
|
|
for _, client in ipairs(conf.servers) do
|
|
table.insert(dbs, redis.connect(client))
|
|
end
|
|
QUORUM = math_floor(#conf.servers / 2) + 1
|
|
end)
|
|
|
|
skynet.start(function()
|
|
skynet.dispatch("lua", function(_, _, cmd, ...)
|
|
local f = CMD[cmd]
|
|
assert(f, cmd)
|
|
skynet.retpack(f(...))
|
|
end)
|
|
end)
|
|
|