From bdeceafb976cf087c5755b3279f72f1d5923ba81 Mon Sep 17 00:00:00 2001 From: yanxurui <617080352@qq.com> Date: Tue, 23 May 2017 00:49:38 +0800 Subject: [PATCH 1/4] tests: set with exptime first then get --- t/sanity.t | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/t/sanity.t b/t/sanity.t index 2ba2ea5..9916a72 100644 --- a/t/sanity.t +++ b/t/sanity.t @@ -830,17 +830,17 @@ dog: 32 (flags: 526) return end - local res, flags, err = memc:get("dog") - if err then - ngx.say("failed to get dog: ", err) + local ok, err = memc:set("dog", 32, 1, 526) + if not ok then + ngx.say("failed to set dog: ", err) return end ngx.location.capture("/sleep"); - local ok, err = memc:set("dog", 32, 1, 526) - if not ok then - ngx.say("failed to set dog: ", err) + local res, flags, err = memc:get("dog") + if err then + ngx.say("failed to get dog: ", err) return end @@ -863,6 +863,7 @@ GET /t dog not found --- no_error_log [error] +--- only From 2b636bf4fb0fe4abf68816df799b8d6870c7f5ac Mon Sep 17 00:00:00 2001 From: yanxurui <617080352@qq.com> Date: Sat, 27 May 2017 21:17:48 +0800 Subject: [PATCH 2/4] feature: support cluster by consistent hashing --- lib/resty/hashmemcached.lua | 230 ++++++++++++++++++++++++++++++++++++ lib/resty/memcached.lua | 142 ++++++++-------------- t/sanity.t | 2 +- 3 files changed, 277 insertions(+), 97 deletions(-) create mode 100644 lib/resty/hashmemcached.lua diff --git a/lib/resty/hashmemcached.lua b/lib/resty/hashmemcached.lua new file mode 100644 index 0000000..65d3a79 --- /dev/null +++ b/lib/resty/hashmemcached.lua @@ -0,0 +1,230 @@ +-- Dependency +local memcached = require "resty.memcached" +local resty_chash = require "resty.chash" + +local _M = { + _VERSION = '0.01' +} + +-- Global private methods +local function Set (list) + local set = {} + for _, l in ipairs(list) do set[l] = true end + return set +end + +-- Global private variables: +local log = ngx.log +local ERR = ngx.ERR +local INFO = ngx.INFO + +local key_methods = Set{ + 'get', 'gets', + 'set', 'add', 'replace', 'append', 'prepend', + 'cas', + 'delete', + 'incr', 'decr', + 'touch' +} + +-- Class implemented by closure +function _M.new(cluster, shm, opts) + local self = {} + + -- Private variables: + local client, err = memcached:new(opts) + if not client then + return client, err + end + + local max_fails = 3 + local fail_timeout = 10 + if opts then + if opts.max_fails then + max_fails = opts.max_fails + end + if opts.fail_timeout then + fail_timeout = opts.fail_timeout + end + end + + local dict = ngx.shared[shm or 'hashmemcached'] + local serv_id -- current node (string in format 'ip:port') + local chash_up + local servers = {} + + -- Private methods: + local function init_chash() + local nodes = {} + for i, node in ipairs(cluster) do + local ip, port, weight = unpack(node) + local id = ip..':'..port + servers[id] = {ip, port} + nodes[id] = weight or 1 + end + local keys = dict:get_keys() + for i, k in ipairs(keys) do + local v = dict:get(k) + -- 0 means this node is down + if v == 0 then + nodes[k] = nil + end + end + chash_up = resty_chash:new(nodes) + end + + local function down() + -- if a node fails max_fails times in fail_timeout seconds + -- then this node is considered unavailable in the duration of fail_timeout + local fails = dict:get(serv_id) + if fails then + if fails > 0 then + fails = fails + 1 + -- two requests may increase the same value + fails = dict:incr(serv_id, 1) + log(INFO, "hashmemcached: ", serv_id, " failed ", fails, " times") + else + -- in case this node is already marked down by another client + chash_up:delete(serv_id) + end + else + fails = 1 + log(INFO, "hashmemcached: ", serv_id, " failed the first time") + dict:set(serv_id, 1, fail_timeout) + end + if fails >= max_fails then + dict:set(serv_id, 0, fail_timeout) + log(ERR, "hashmemcached: ", serv_id, " is turned down after ", fails, " failure(s)") + chash_up:delete(serv_id) + end + serv_id = nil + end + + local function connect(id) + -- is connected already + if serv_id then + if serv_id == id then return 1 end + -- ignore error + client:set_keepalive() + end + serv_id = id + server = servers[id] + local ok, err = client:connect(unpack(server)) + if not ok then + down() + end + return ok, err + end + + local function call(method) + return function(self, key, ...) + if type(key) == "table" then + -- get or gets multi keys + local servs = {} + -- use empty table as default value + setmetatable(servs, { __index = function(t, k) t[k]={};return t[k] end }) + for i, k in pairs(key) do + local id = chash_up:find(k) + if not id then + assert(next(servs)==nil) -- servs must be empty + return nil, 'no available memcached server' + end + table.insert(servs[id], k) + end + + local results = {} + for id, keys in pairs(servs) do + local ok, err = connect(id) + if ok then + local data, err = client[method](client, keys, ...) + if data then + -- data is a table + for k, v in pairs(data) do + -- v is a table too + -- merge result + results[k] = v + end + else + for i, k in ipairs(keys) do + results[k] = {nil, err} + end + if client.failed then + down() + end + end + else + for i, k in ipairs(keys) do + results[k] = {nil, err} + end + end + end + return results + else + -- single key + local id = chash_up:find(key) + local res1, res2, res3, res4 + local ok, err + if id then + ok, err = connect(id) + if ok then + -- at most 4 return values + res1, res2, res3, res4 = client[method](client, key, ...) + if client.failed then + down() + end + end + else + err = 'no available memcached server' + end + if method == 'get' then + return res1, res2, err or res3 + elseif method == 'gets' then + return res1, res2, res3, err or res4 + else + return res1, err or res2 + end + end + end + end + + + -- Public methods: + function self.which_server() + return serv_id + end + + -- override flush_all + function self.flush_all(self, time) + local ok, err + local results = {} + for id, serv in pairs(servers) do + ok, err = connect(id) + if ok then + results[id] = {client:flush_all(time)} + else + results[id] = {nil, err} + end + end + return results + end + + -- Apply some private methods + init_chash() + + local mt = {} + mt.__index = function(t, k) + -- intercept these methods + if key_methods[k] then + return call(k) + else + if k=='connect' or k=='close' or k=='quit' or k=='set_keepalive' then + serv_id = nil + end + return client[k] + end + end + setmetatable(self, mt) + return self +end + +return _M \ No newline at end of file diff --git a/lib/resty/memcached.lua b/lib/resty/memcached.lua index 3390e68..8ee0be0 100644 --- a/lib/resty/memcached.lua +++ b/lib/resty/memcached.lua @@ -62,6 +62,7 @@ end function _M.connect(self, ...) + self.failed = nil local sock = self.sock if not sock then return nil, "not initialized" @@ -71,6 +72,18 @@ function _M.connect(self, ...) end +-- a socket error happend the socket is closed then +local function fail(self, ...) + err = select(select('#', ...), ...) + ngx.log(ngx.ERR, "failed: ", err) + if err == "timeout" then + self.sock:close() + end + self.failed = true + return ... +end + + local function _multi_get(self, keys) local sock = self.sock if not sock then @@ -107,10 +120,7 @@ local function _multi_get(self, keys) while true do local line, err = sock:receive() if not line then - if err == "timeout" then - sock:close() - end - return nil, err + return fail(self, nil, err) end if line == 'END' then @@ -126,20 +136,14 @@ local function _multi_get(self, keys) local data, err = sock:receive(len) if not data then - if err == "timeout" then - sock:close() - end - return nil, err + return fail(self, nil, err) end results[unescape_key(key)] = {data, flags} data, err = sock:receive(2) -- discard the trailing CRLF if not data then - if err == "timeout" then - sock:close() - end - return nil, err + return fail(self, nil, err) end end @@ -164,10 +168,7 @@ function _M.get(self, key) local line, err = sock:receive() if not line then - if err == "timeout" then - sock:close() - end - return nil, nil, err + return fail(self, nil, nil, err) end if line == 'END' then @@ -183,18 +184,12 @@ function _M.get(self, key) local data, err = sock:receive(len) if not data then - if err == "timeout" then - sock:close() - end - return nil, nil, err + return fail(self, nil, nil, err) end line, err = sock:receive(7) -- discard the trailing "\r\nEND\r\n" if not line then - if err == "timeout" then - sock:close() - end - return nil, nil, err + return fail(self, nil, nil, err) end return data, flags @@ -227,7 +222,7 @@ local function _multi_gets(self, keys) local bytes, err = sock:send(cmd) if not bytes then - return nil, err + return fail(self, nil, err) end local unescape_key = self.unescape_key @@ -236,10 +231,7 @@ local function _multi_gets(self, keys) while true do local line, err = sock:receive() if not line then - if err == "timeout" then - sock:close() - end - return nil, err + return fail(self, nil, err) end if line == 'END' then @@ -257,20 +249,14 @@ local function _multi_gets(self, keys) local data, err = sock:receive(len) if not data then - if err == "timeout" then - sock:close() - end - return nil, err + return fail(self, nil, err) end results[unescape_key(key)] = {data, flags, cas_uniq} data, err = sock:receive(2) -- discard the trailing CRLF if not data then - if err == "timeout" then - sock:close() - end - return nil, err + return fail(self, nil, err) end end @@ -290,15 +276,12 @@ function _M.gets(self, key) local bytes, err = sock:send("gets " .. self.escape_key(key) .. "\r\n") if not bytes then - return nil, nil, err + return fail(self, nil, nil, err) end local line, err = sock:receive() if not line then - if err == "timeout" then - sock:close() - end - return nil, nil, nil, err + return fail(self, nil, nil, nil, err) end if line == 'END' then @@ -314,18 +297,12 @@ function _M.gets(self, key) local data, err = sock:receive(len) if not data then - if err == "timeout" then - sock:close() - end - return nil, nil, nil, err + return fail(self, nil, nil, nil, err) end line, err = sock:receive(7) -- discard the trailing "\r\nEND\r\n" if not line then - if err == "timeout" then - sock:close() - end - return nil, nil, nil, err + return fail(self, nil, nil, nil, err) end return data, flags, cas_uniq @@ -372,15 +349,12 @@ local function _store(self, cmd, key, value, exptime, flags) .. "\r\n" local bytes, err = sock:send(req) if not bytes then - return nil, err + return fail(self, nil, err) end local data, err = sock:receive() if not data then - if err == "timeout" then - sock:close() - end - return nil, err + return fail(self, nil, err) end if data == "STORED" then @@ -439,15 +413,12 @@ function _M.cas(self, key, value, cas_uniq, exptime, flags) local bytes, err = sock:send(req) if not bytes then - return nil, err + return fail(self, nil, err) end local line, err = sock:receive() if not line then - if err == "timeout" then - sock:close() - end - return nil, err + return fail(self, nil, err) end -- print("response: [", line, "]") @@ -472,15 +443,12 @@ function _M.delete(self, key) local bytes, err = sock:send(req) if not bytes then - return nil, err + return fail(self, nil, err) end local res, err = sock:receive() if not res then - if err == "timeout" then - sock:close() - end - return nil, err + return fail(self, nil, err) end if res ~= 'DELETED' then @@ -526,15 +494,12 @@ function _M.flush_all(self, time) local bytes, err = sock:send(req) if not bytes then - return nil, err + return fail(self, nil, err) end local res, err = sock:receive() if not res then - if err == "timeout" then - sock:close() - end - return nil, err + return fail(self, nil, err) end if res ~= 'OK' then @@ -555,15 +520,12 @@ local function _incr_decr(self, cmd, key, value) local bytes, err = sock:send(req) if not bytes then - return nil, err + return fail(self, nil, err) end local line, err = sock:receive() if not line then - if err == "timeout" then - sock:close() - end - return nil, err + return fail(self, nil, err) end if not match(line, '^%d+$') then @@ -599,7 +561,7 @@ function _M.stats(self, args) local bytes, err = sock:send(req) if not bytes then - return nil, err + return fail(self, nil, err) end local lines = {} @@ -607,10 +569,7 @@ function _M.stats(self, args) while true do local line, err = sock:receive() if not line then - if err == "timeout" then - sock:close() - end - return nil, err + return fail(self, nil, err) end if line == 'END' then @@ -638,15 +597,12 @@ function _M.version(self) local bytes, err = sock:send("version\r\n") if not bytes then - return nil, err + return fail(self, nil, err) end local line, err = sock:receive() if not line then - if err == "timeout" then - sock:close() - end - return nil, err + return fail(self, nil, err) end local ver = match(line, "^VERSION (.+)$") @@ -666,7 +622,7 @@ function _M.quit(self) local bytes, err = sock:send("quit\r\n") if not bytes then - return nil, err + return fail(self, nil, err) end return 1 @@ -681,15 +637,12 @@ function _M.verbosity(self, level) local bytes, err = sock:send("verbosity " .. level .. "\r\n") if not bytes then - return nil, err + return fail(self, nil, err) end local line, err = sock:receive() if not line then - if err == "timeout" then - sock:close() - end - return nil, err + return fail(self, nil, err) end if line ~= 'OK' then @@ -709,15 +662,12 @@ function _M.touch(self, key, exptime) local bytes, err = sock:send("touch " .. self.escape_key(key) .. " " .. exptime .. "\r\n") if not bytes then - return nil, err + return fail(self, nil, err) end local line, err = sock:receive() if not line then - if err == "timeout" then - sock:close() - end - return nil, err + return fail(self, nil, err) end -- moxi server from couchbase returned stored after touching diff --git a/t/sanity.t b/t/sanity.t index 9916a72..8788d2e 100644 --- a/t/sanity.t +++ b/t/sanity.t @@ -1441,7 +1441,7 @@ successfully set verbosity to level 2 local ok, err = memc:set("cat", "hello\\nworld\\n") if not ok then - ngx.say("failed to set dog: ", err) + ngx.say("failed to set cat: ", err) return end From c41ee6599909b098890bd2b03b3b3cca9f9c0670 Mon Sep 17 00:00:00 2001 From: yanxurui <617080352@qq.com> Date: Tue, 28 Nov 2017 18:53:36 +0800 Subject: [PATCH 3/4] tests: hashmemcacahe --- .travis.yml | 4 + t/hash.t | 866 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 870 insertions(+) create mode 100644 t/hash.t diff --git a/.travis.yml b/.travis.yml index ade20f4..ab3894f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -45,6 +45,7 @@ install: - git clone https://github.com/openresty/echo-nginx-module.git ../echo-nginx-module - git clone https://github.com/openresty/no-pool-nginx.git ../no-pool-nginx - git clone -b v2.1-agentzh https://github.com/openresty/luajit2.git + - git clone https://github.com/openresty/lua-resty-balancer.git ../lua-resty-balancer - git clone https://github.com/openresty/mockeagain.git script: @@ -58,6 +59,9 @@ script: - make -j$JOBS > build.log 2>&1 || (cat build.log && exit 1) - sudo make PATH=$PATH install_sw > build.log 2>&1 || (cat build.log && exit 1) - cd ../mockeagain/ && make CC=$CC -j$JOBS && cd .. + - make -C ../lua-resty-balancer + - cp -r ../lua-resty-balancer/lib/resty ../lua-resty-balancer/librestychash.so lib + - memcached -p 11212 -d -m 1024 - export PATH=$PWD/work/nginx/sbin:$PWD/nginx-devel-utils:$PATH - export LD_PRELOAD=$PWD/mockeagain/mockeagain.so - export LD_LIBRARY_PATH=$PWD/mockeagain:$LD_LIBRARY_PATH diff --git a/t/hash.t b/t/hash.t new file mode 100644 index 0000000..8e42590 --- /dev/null +++ b/t/hash.t @@ -0,0 +1,866 @@ +# vim:set ft= ts=4 sw=4 et: + +use Test::Nginx::Socket::Lua; +use Cwd qw(cwd); + +repeat_each(2); + +plan tests => repeat_each() * (3 * blocks() + 6); + +my $pwd = cwd(); + +our $HttpConfig = qq{ + lua_package_path "$pwd/lib/?.lua;;"; + lua_package_cpath "$pwd/lib/?.so;;"; + lua_shared_dict hashmemcached 1m; +}; + +$ENV{TEST_NGINX_MEMCACHED_PORT} ||= 11211; +$ENV{TEST_NGINX_MEMCACHED_BACKUP_PORT} ||= 11212; + +no_long_string(); + +run_tests(); + +__DATA__ + +=== TEST 1: basic (set, get, flush_all) +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local hashmemcached = require "resty.hashmemcached" + local hmemc, err = hashmemcached.new({ + {'127.0.0.1', $TEST_NGINX_MEMCACHED_PORT, 2}, + {'127.0.0.1', $TEST_NGINX_MEMCACHED_BACKUP_PORT, 1} + }, 'hashmemcached') + + -- always flush_all at first + local result = hmemc:flush_all() + local serv, res, ok, err + for serv, res in pairs(result) do + ok, err = unpack(res) + if not ok then + ngx.say("failed to flush ", serv, ", ", err) + return + end + end + + ngx.say("set") + + keys = {'dog', 'puppy', 'cat', 'kitten'} + values = {32, "I am a little dog", 64, "I am a \nlittle cat\n"} + local i, key + for i, key in ipairs(keys) do + local ok, err = hmemc:set(key, values[i]) + if not ok then + ngx.say("failed to set ", key, ": ", err) + else + ngx.say(key, " is stored in ", hmemc:which_server()) + end + end + + ngx.say("\nget") + + for i, key in ipairs(keys) do + local res, flags, err = hmemc:get(key) + if err then + ngx.say("failed to get ", key, ": ", err) + elseif not res then + ngx.say(key, " not found") + else + ngx.say(key, ": ", res, " (flags: ", flags, ")") + end + end + + local result = hmemc:flush_all() + local serv, res, ok, err + for serv, res in pairs(result) do + ok, err = unpack(res) + if not ok then + ngx.say("failed to flush ", serv, ", ", err) + return + end + end + + ngx.say("\nget after flush") + + -- get (404) + for i, key in ipairs(keys) do + local res, flags, err = hmemc:get(key) + if err then + ngx.say("failed to get ", key, ": ", err) + elseif not res then + ngx.say(key, " not found") + else + ngx.say(key, ": ", res, " (flags: ", flags, ")") + end + end + + hmemc:close() + } + } +--- request +GET /t +--- response_body +set +dog is stored in 127.0.0.1:11211 +puppy is stored in 127.0.0.1:11212 +cat is stored in 127.0.0.1:11212 +kitten is stored in 127.0.0.1:11211 + +get +dog: 32 (flags: 0) +puppy: I am a little dog (flags: 0) +cat: 64 (flags: 0) +kitten: I am a +little cat + (flags: 0) + +get after flush +dog not found +puppy not found +cat not found +kitten not found +--- no_error_log +[error] + + +=== TEST 2: multi get, gets +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local hashmemcached = require "resty.hashmemcached" + local hmemc, err = hashmemcached.new({ + {'127.0.0.1', $TEST_NGINX_MEMCACHED_PORT, 2}, + {'127.0.0.1', $TEST_NGINX_MEMCACHED_BACKUP_PORT, 1} + }, 'hashmemcached') + + local result = hmemc:flush_all() + local serv, res, ok, err + for serv, res in pairs(result) do + ok, err = unpack(res) + if not ok then + ngx.say("failed to flush ", serv, ", ", err) + return + end + end + + keys = {'dog', 'puppy', 'cat', 'kitten'} + values = {32, "I am a little dog", 64, "I am a \nlittle cat\n"} + local i, key + for i, key in ipairs(keys) do + local ok, err = hmemc:set(key, values[i]) + if not ok then + ngx.say("failed to set ", key, ": ", err) + end + end + + ngx.say("get") + + keys[#keys+1] = 'blah' + local results = hmemc:get(keys) + for i, key in ipairs(keys) do + ngx.print(key, ": ") + if results[key] then + ngx.say(table.concat(results[key], ", ")) + else + ngx.say("not found") + end + end + + ngx.say("\ngets") + + local results = hmemc:gets(keys) + for i, key in ipairs(keys) do + ngx.print(key, ": ") + if results[key] then + ngx.say(table.concat(results[key], ", ")) + else + ngx.say("not found") + end + end + + hmemc:close() + } + } +--- request +GET /t +--- response_body_like chop +^get +dog: 32, 0 +puppy: I am a little dog, 0 +cat: 64, 0 +kitten: I am a +little cat +, 0 +blah: not found + +gets +dog: 32, 0, \d+ +puppy: I am a little dog, 0, \d+ +cat: 64, 0, \d+ +kitten: I am a +little cat +, 0, \d+ +blah: not found$ +--- no_error_log +[error] + + +=== TEST 3: add, set, incr, decr, replace, append, prepend, delete +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local hashmemcached = require "resty.hashmemcached" + local hmemc, err = hashmemcached.new({ + {'127.0.0.1', $TEST_NGINX_MEMCACHED_PORT, 2}, + {'127.0.0.1', $TEST_NGINX_MEMCACHED_BACKUP_PORT, 1} + }, 'hashmemcached') + + local result = hmemc:flush_all() + local serv, res, ok, err + for serv, res in pairs(result) do + ok, err = unpack(res) + if not ok then + ngx.say("failed to flush ", serv, ", ", err) + return + end + end + -- add + local ok, err = hmemc:add("dog", 32) + if not ok then + ngx.say("failed to add dog: ", err) + end + + local res, flags, err = hmemc:get("dog") + if err then + ngx.say("failed to get dog: ", err) + return + end + if not res then + ngx.say("dog not found") + return + end + ngx.say("dog: ", res) + + -- set + local ok, err = hmemc:set("dog", 33) + if not ok then + ngx.say("failed to set dog: ", err) + return + end + + local res, flags, err = hmemc:get("dog") + if err then + ngx.say("failed to get dog: ", err) + return + end + if not res then + ngx.say("dog not found") + return + end + ngx.say("dog: ", res) + + -- incr + local value, err = hmemc:incr("dog", 2) + if not value then + ngx.say("failed to incr dog: ", err) + return + end + + ngx.say("dog: ", value) + + --decr + local value, err = hmemc:decr("dog", 3) + if not value then + ngx.say("failed to decr dog: ", err) + return + end + + ngx.say("dog: ", value) + -- replace + local ok, err = hmemc:replace("dog", 56) + if not ok then + ngx.say("failed to replace dog: ", err) + return + end + + local res, flags, err = hmemc:get("dog") + if err then + ngx.say("failed to get dog: ", err) + return + end + if not res then + ngx.say("dog not found") + return + end + ngx.say("dog: ", res) + + -- append + local ok, err = hmemc:append("dog", 78) + if not ok then + ngx.say("failed to append to dog: ", err) + end + + local res, flags, err = hmemc:get("dog") + if err then + ngx.say("failed to get dog: ", err) + return + end + if not res then + ngx.say("dog not found") + return + end + ngx.say("dog: ", res) + + -- prepend + local ok, err = hmemc:prepend("dog", 34) + if not ok then + ngx.say("failed to prepend to dog: ", err) + end + + local res, flags, err = hmemc:get("dog") + if err then + ngx.say("failed to get dog: ", err) + return + end + if not res then + ngx.say("dog not found") + return + end + ngx.say("dog: ", res) + + -- delete + local ok, err = hmemc:delete("dog") + if not ok then + ngx.say("failed to delete dog: ", err) + end + + local res, flags, err = hmemc:get("dog") + if err then + ngx.say("failed to get dog: ", err) + return + end + if not res then + ngx.say("dog not found") + return + end + ngx.say("dog: ", res) + + hmemc:close() + } + } +--- request +GET /t +--- response_body +dog: 32 +dog: 33 +dog: 35 +dog: 32 +dog: 56 +dog: 5678 +dog: 345678 +dog not found +--- no_error_log +[error] + + +=== TEST 4: incr, decr, replace, append, prepend, delete on a nonexistent key +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local hashmemcached = require "resty.hashmemcached" + local hmemc, err = hashmemcached.new({ + {'127.0.0.1', $TEST_NGINX_MEMCACHED_PORT, 2}, + {'127.0.0.1', $TEST_NGINX_MEMCACHED_BACKUP_PORT, 1} + }, 'hashmemcached') + + local result = hmemc:flush_all() + local serv, res, ok, err + for serv, res in pairs(result) do + ok, err = unpack(res) + if not ok then + ngx.say("failed to flush all: ", err) + return + end + end + + -- incr + local value, err = hmemc:incr("dog", 2) + if not value then + ngx.say("failed to incr dog: ", err) + end + + --decr + local value, err = hmemc:decr("dog", 3) + if not value then + ngx.say("failed to decr dog: ", err) + end + + -- replace + local ok, err = hmemc:replace("dog", 56) + if not ok then + ngx.say("failed to replace dog: ", err) + end + + -- append + local ok, err = hmemc:append("dog", 78) + if not ok then + ngx.say("failed to append to dog: ", err) + end + + -- prepend + local ok, err = hmemc:prepend("dog", 34) + if not ok then + ngx.say("failed to prepend to dog: ", err) + end + + -- delete + local ok, err = hmemc:delete("dog") + if not ok then + ngx.say("failed to delete dog: ", err) + end + + hmemc:close() + } + } +--- request +GET /t +--- response_body +failed to incr dog: NOT_FOUND +failed to decr dog: NOT_FOUND +failed to replace dog: NOT_STORED +failed to append to dog: NOT_STORED +failed to prepend to dog: NOT_STORED +failed to delete dog: NOT_FOUND +--- no_error_log +[error] + + +=== TEST 5: set, gets, cas, cas +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local hashmemcached = require "resty.hashmemcached" + local hmemc, err = hashmemcached.new({ + {'127.0.0.1', $TEST_NGINX_MEMCACHED_PORT, 2}, + {'127.0.0.1', $TEST_NGINX_MEMCACHED_BACKUP_PORT, 1} + }, 'hashmemcached') + + local result = hmemc:flush_all() + local serv, res, ok, err + for serv, res in pairs(result) do + ok, err = unpack(res) + if not ok then + ngx.say("failed to flush ", serv, ", ", err) + return + end + end + + -- set + local ok, err = hmemc:set("dog", 32) + if not ok then + ngx.say("failed to set dog: ", err) + return + end + + -- gets + local res, flags, cas_uniq, err = hmemc:gets("dog") + if err then + ngx.say("failed to get dog: ", err) + return + end + if not res then + ngx.say("dog not found") + return + end + ngx.say("dog: ", res, " (flags: ", flags, ", cas_uniq: ", cas_uniq, ")") + + -- cas + local ok, err = hmemc:cas("dog", "hello world", cas_uniq, 0, 78) + if not ok then + ngx.say("failed to cas: ", err) + return + end + ngx.say("cas succeeded") + + -- cas again + local ok, err = hmemc:cas("dog", 56, cas_uniq, 0, 56) + if not ok then + ngx.say("failed to cas: ", err) + return + end + ngx.say("second cas succeeded") + + hmemc:close() + } + } +--- request +GET /t +--- response_body_like chop +^dog: 32 \(flags: 0, cas_uniq: \d+\) +cas succeeded +failed to cas: EXISTS$ +--- no_error_log +[error] + + + +=== TEST 6: one of the servers is down +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local hashmemcached = require "resty.hashmemcached" + + ngx.shared['hashmemcached']:flush_all() + + local hmemc, err = hashmemcached.new({ + {'127.0.0.1', $TEST_NGINX_MEMCACHED_PORT}, + {'127.0.0.1', 1921} + }, 'hashmemcached') + + local result = hmemc:flush_all() + local serv, res, ok, err + for serv, res in pairs(result) do + ok, err = unpack(res) + if not ok then + ngx.say("failed to flush ", serv, ", ", err) + end + end + + -- bird is hashed to 127.0.0.1:1921 + -- set + local ok, err = hmemc:set("bird", 32) + if not ok then + ngx.say("failed to set bird: ", err) + end + + -- set again + local ok, err = hmemc:set("bird", 33) + if not ok then + ngx.say("failed to set bird: ", err) + end + + -- set finally successfully + local ok, err = hmemc:set("bird", 34) + if not ok then + ngx.say("failed to set bird: ", err) + return + end + + -- get + local res, flags, err = hmemc:get("bird") + if err then + ngx.say("failed to get bird: ", err) + return + end + if not res then + ngx.say("bird not found") + return + end + ngx.say("bird: ", res) + + hmemc:close() + } + } +--- request +GET /t +--- response_body +failed to flush 127.0.0.1:1921, connection refused +failed to set bird: connection refused +failed to set bird: connection refused +bird: 34 +--- error_log +127.0.0.1:1921 failed the first time +127.0.0.1:1921 failed 2 times +127.0.0.1:1921 is turned down after 3 failure(s) + + +=== TEST 7: one of the servers is marked down previously +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local hashmemcached = require "resty.hashmemcached" + + ngx.shared['hashmemcached']:flush_all() + + local hmemc, err = hashmemcached.new({ + {'127.0.0.1', $TEST_NGINX_MEMCACHED_PORT}, + {'127.0.0.1', 1921} + }, 'hashmemcached') + + local result = hmemc:flush_all() + local serv, res, ok, err + for serv, res in pairs(result) do + ok, err = unpack(res) + if not ok then + ngx.say("failed to flush ", serv, ", ", err) + end + end + + -- set 3 times + for i = 1, 3 do + local ok, err = hmemc:set("bird", i) + if not ok then + ngx.say("failed to set bird ", i, " time(s): ", err) + end + end + + hmemc:close() + + -- 127.0.0.1:1921 is turned down now + + -- a new client + local hmemc2, err = hashmemcached.new({ + {'127.0.0.1', $TEST_NGINX_MEMCACHED_PORT}, + {'127.0.0.1', 1921} + }, "hashmemcached") + + -- get + local res, flags, err = hmemc2:get("bird") + if err then + ngx.say("failed to get bird: ", err) + return + end + if not res then + ngx.say("bird not found") + return + end + ngx.say("bird: ", res) + + hmemc2:close() + } + } +--- request +GET /t +--- response_body +failed to flush 127.0.0.1:1921, connection refused +failed to set bird 1 time(s): connection refused +failed to set bird 2 time(s): connection refused +bird: 3 +--- error_log +127.0.0.1:1921 is turned down after 3 failure(s) + + + +=== TEST 8: fail_timeout, max_fails +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local hashmemcached = require "resty.hashmemcached" + + ngx.shared['hashmemcached']:flush_all() + + local hmemc, err = hashmemcached.new({ + {'127.0.0.1', $TEST_NGINX_MEMCACHED_PORT}, + {'127.0.0.1', 1921} + }, 'hashmemcached', {fail_timeout=0.5, max_fails=1}) + + hmemc:flush_all() + + -- 127.0.0.1:1921 is marked down because max_fails=1 + + hmemc:close() + + ngx.sleep(0.6) + -- 127.0.0.1:1921 is removed from blacklist now + + -- must use a new client here + local hmemc2, err = hashmemcached.new({ + {'127.0.0.1', $TEST_NGINX_MEMCACHED_PORT}, + {'127.0.0.1', 1921} + }, "hashmemcached", {fail_timeout=0.5, max_fails=4}) + + local res, flags, err = hmemc2:get("bird") + if err then + ngx.say("failed to get bird: ", err) + elseif not res then + ngx.say("bird not found") + else + ngx.say("bird: ", res, " (flags: ", flags, ")") + end + + ngx.say('sleep...') + ngx.sleep(0.7) + -- 127.0.0.1:1921's bad record is evicted now + + for i = 1,5 do + local res, flags, err = hmemc2:get("bird") + if err then + ngx.say("failed to get ", "bird: ", err) + elseif not res then + ngx.say("bird not found") + else + ngx.say("bird: ", res, " (flags: ", flags, ")") + end + end + + hmemc2:close() + } + } +--- request +GET /t +--- response_body +failed to get bird: connection refused +sleep... +failed to get bird: connection refused +failed to get bird: connection refused +failed to get bird: connection refused +failed to get bird: connection refused +bird not found +--- error_log +127.0.0.1:1921 is turned down after 1 failure(s) +127.0.0.1:1921 is turned down after 4 failure(s) + + +=== TEST 9: mock memcached emulating read timeout +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local hashmemcached = require "resty.hashmemcached" + + ngx.shared['hashmemcached']:flush_all() + + local hmemc, err = hashmemcached.new({ + {'127.0.0.1', $TEST_NGINX_MEMCACHED_PORT} + }) + + hmemc:flush_all() + + hmemc:close() + + local hmemc2, err = hashmemcached.new({ + {'127.0.0.1', $TEST_NGINX_MEMCACHED_PORT}, + {'127.0.0.1', 1921} + }, nil, {max_fails=1}) + + hmemc2:set_timeout(100) -- 0.1 sec + + -- get twice + for i = 1, 2 do + local data, flags, err = hmemc2:get("bird") + if err then + ngx.say("failed to get bird: ", err) + elseif not data then + ngx.say("bird not found") + else + ngx.say("bird: ", data, " (flags: ", flags, ")") + end + end + + hmemc2:close() + } + } +--- request +GET /t +--- tcp_listen: 1921 +--- tcp_query_len: 10 +--- tcp_query eval +"get bird\r\n" +--- tcp_reply eval +"VALUE bird 0 11\r\nI am a bird\r\nEND\r\n" +--- tcp_reply_delay: 150ms +--- response_body +failed to get bird: timeout +bird not found +--- error_log +127.0.0.1:1921 is turned down after 1 failure(s) + + +=== TEST 10: mock memcached emulating remote closed +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local hashmemcached = require "resty.hashmemcached" + + ngx.shared['hashmemcached']:flush_all() + + local hmemc, err = hashmemcached.new({ + {'127.0.0.1', $TEST_NGINX_MEMCACHED_PORT} + }) + + hmemc:flush_all() + + local ok, err = hmemc:set('bird', 'a good bird') + if not ok then + ngx.say("failed to set bird: ", err) + return + end + + hmemc:close() + + + local hmemc2, err = hashmemcached.new({ + {'127.0.0.1', $TEST_NGINX_MEMCACHED_PORT}, + {'127.0.0.1', 1921} + }, nil, {max_fails=1}) + + -- get twice + for i = 1, 2 do + local data, flags, err = hmemc2:get("bird") + if err then + ngx.say("failed to get bird: ", err) + elseif not data then + ngx.say("bird not found") + else + ngx.say("bird: ", data, " (flags: ", flags, ")") + end + end + + hmemc2:close() + } + } +--- request +GET /t +--- tcp_listen: 1921 +--- tcp_query_len: 10 +--- tcp_query eval +"get bird\r\n" +--- tcp_shutdown: 1 +--- response_body +failed to get bird: closed +bird: a good bird (flags: 0) +--- error_log +127.0.0.1:1921 is turned down after 1 failure(s) + + +=== TEST 11: no available memcaced server in the cluster +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local hashmemcached = require "resty.hashmemcached" + local hmemc, err = hashmemcached.new({}, 'hashmemcached') + + -- always flush_all at first + local result = hmemc:flush_all() + local serv, res, ok, err + for serv, res in pairs(result) do + ok, err = unpack(res) + if not ok then + ngx.say("failed to flush ", serv, ", ", err) + return + end + end + + local ok, err = hmemc:set('dog', 32) + if not ok then + ngx.say("failed to set dog: ", err) + return + end + } + } +--- request +GET /t +--- response_body +failed to set dog: no available memcached server From 3c1caeeafca8bcf04bb99da579eece51a72f02e4 Mon Sep 17 00:00:00 2001 From: yanxurui <617080352@qq.com> Date: Tue, 28 Nov 2017 18:54:36 +0800 Subject: [PATCH 4/4] doc: support memcache cluster --- README.markdown | 104 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/README.markdown b/README.markdown index f27381b..7f29c38 100644 --- a/README.markdown +++ b/README.markdown @@ -34,6 +34,7 @@ Table of Contents * [version](#version) * [quit](#quit) * [verbosity](#verbosity) +* [Cluster Support](#cluster-support) * [Automatic Error Logging](#automatic-error-logging) * [Limitations](#limitations) * [TODO](#todo) @@ -359,6 +360,7 @@ In case of success, returns `1`. In case of errors, returns `nil` with a string get --- `syntax: value, flags, err = memc:get(key)` + `syntax: results, err = memc:get(keys)` Get a single entry or multiple entries in the memcached server via a single key or a table of keys. @@ -501,6 +503,108 @@ Returns `1` in case of success and `nil` other wise. In case of failures, anothe [Back to TOC](#table-of-contents) +Cluster Support +=============== + +A new module is imported to support memcache cluster based on [consistent hash](https://github.com/openresty/lua-resty-balancer). hashmemcached is a proxy class of memcached implemented by closure. Please notice that this module won't reconnect to a good server when it finds current one is down. + +example +------- + +```lua +local hashmemcached = require "resty.hashmemcached" +local hmemc, err = hashmemcached.new({ + {'127.0.0.1', 11211, 2}, + {'127.0.0.1', 11212, 1} +}, 'hashmemcached') + +-- always flush_all at first +local result = hmemc:flush_all() +local serv, res, ok, err +for serv, res in pairs(result) do + ok, err = unpack(res) + if not ok then + ngx.say("failed to flush ", serv, ", ", err) + return + end +end + +ngx.say("set") + +keys = {'dog', 'puppy', 'cat', 'kitten'} +values = {32, "I am a little dog", 64, "I am a \nlittle cat\n"} +local i, key +for i, key in ipairs(keys) do + local ok, err = hmemc:set(key, values[i]) + if not ok then + ngx.say("failed to set ", key, ": ", err) + else + ngx.say(key, " is stored in ", hmemc:which_server()) + end +end + +ngx.say("\nget") + +for i, key in ipairs(keys) do + local res, flags, err = hmemc:get(key) + if err then + ngx.say("failed to get ", key, ": ", err) + elseif not res then + ngx.say(key, " not found") + else + ngx.say(key, ": ", res, " (flags: ", flags, ")") + end +end + +hmemc:set_keepalive(10000, 100) +``` + +methods +------- +below are methods whose behavior changed. + +new +--- +`syntax: hmemc, err = hashmemcached.new(cluster, shm, opts?)` + +Creates a hashed memcached object based on the cluster paramter. + +* cluster + an array of tables containing all memcaches' infomation in the form of `{{ip, port, weight}, ...}`. Here is an example: + ```lua + { + {'127.0.0.1', 11211, 2}, + {'127.0.0.1', 11212} + } + ``` + `weight` is optional, defaut value is 1 +* shm + the name of the shared memory zone used to share infomation between worker processes. By defaut, it's `hashmemcached` +* opts + besides `key_transform`, it accepts the following options: + * max_fails + * fail_timeout + + if a node fails `max_fails` times in `fail_timeout` seconds then this node is considered unavailable in the duration of `fail_timeout`. This policy of remapping imitates the nginx http upstream's server directive. + +which_server +------------ +`syntax: results = hashmemcached:which_server()` + +This method is added for debugging purpose. Return currently connnected server's id which is a string of `ip:port`. + +flush_all +--------- +`syntax: server_id = hashmemcached:flush_all()` + +call flush_all method of every memcache instance. +`results` is a table of `{server_id:res}` and res is a table of `{1}` or `{nil, err}` + +get(s) +----------- +They can accept multiple keys and these keys can be mapped to multiple memcaches. +The return value is the same as the original version except that it will always return a table whose value is a table of the **data** or `{nil, err}` or `nil` which indicates not found. + Automatic Error Logging =======================