zoukankan      html  css  js  c++  java
  • Nginx一致性哈希模块的Lua实现

    Nginx一致性哈希模块的Lua重新实现

    技术背景:

    最近在工作中使用了nginx+redis 的架构,redis在后台做分布式存储,每个redis都存放不同的数据,这些数据都是某门户网站通过Hadoop分析出来的用户行为日志,key是uid,value是user profile,每小时更新量在500-800万条记录,而这些记录一旦生成,我需要在5分钟左右的时间完成所有导入过程。

    首先,我在nginx中使用了第三方模块HttpUpstreamConsistent来做负载均衡策略,针对不同用户(uid)选取不同的backend redis:

       upstream somestream {
          consistent_hash $arg_uid;
          server 10.50.1.3:11211;
          server 10.50.1.4:11211;
          server 10.50.1.5:11211;
        }

    现在问题来了,由于Hadoop系统处理日志的速度非常快,如果把每条记录都通过Nginx来写入Redis中,这样的速度是无法接受的,而且会影响Nginx对正常请求的服务能力。所以,需要将这些数据以离线的方式导入redis集群中,这样就要重新实现HttpUpstreamConsistent模块了,才能保证读写的哈希策略一致。

    下面的源码演示了如何将HttpUpstreamConsistent模块翻译成Lua的过程,(使用了CRC32作散列,依赖库的路径已列在Reference中)。

    #!/usr/bin/lua
    
    -- chenqi@2014/04/02
    --[Reference]
    --https://github.com/yaoweibin/ngx_http_consistent_hash
    --https://github.com/davidm/lua-digest-crc32lua
    
    local CRC = require('CRC32')
    
    local M = {}
    
    local CONSISTENT_BUCKETS = 1024
    local VIRTUAL_NODE = 160
    
    local HASH_PEERS = {}
    local CONTINUUM = {}
    local BUCKETS = {}
    
    local function hash_fn(key)
        return CRC.crc32(key)
    end
    
    -- in-place quicksort
    function quicksort(array,compareFunc)  
        quick(array,1,#array,compareFunc)  
    end  
    
    function quick(array,left,right,compareFunc)  
        if(left < right ) then  
            local index = partion(array,left,right,compareFunc)  
            quick(array,left,index-1,compareFunc)  
            quick(array,index+1,right,compareFunc)  
        end  
    end  
      
    function partion(array,left,right,compareFunc)  
        local key = array[left] 
        local index = left  
        array[index],array[right] = array[right],array[index]
        local i = left  
        while i< right do  
            if compareFunc( key,array[i]) then  
                array[index],array[i] = array[i],array[index]
                index = index + 1  
            end  
            i = i + 1  
        end  
        array[right],array[index] = array[index],array[right]
        return index;  
    end  
    
    -- binary search
    local function chash_find(point)
        local mid, lo, hi = 1, 1, #CONTINUUM
        while 1 do
            if point <= CONTINUUM[lo][2] or point > CONTINUUM[hi][2] then
                return CONTINUUM[lo]
            end
    
            -- test middle point
            mid = lo + math.floor((hi-lo)/2)
    
            -- perfect match
            if point <= CONTINUUM[mid][2] and point > (mid > 1 and CONTINUUM[mid-1][2] or 0) then
                return CONTINUUM[mid]
            end
    
            -- too low, go up
            if CONTINUUM[mid][2] < point then
                lo = mid + 1
            else
                hi = mid - 1
            end
        end
    end
    
    local function chash_init()
        local n = #HASH_PEERS
        if n == 0 then
            print("There is no backend servers")
            return
        end
    
        local C = {}
        for i,peer in ipairs(HASH_PEERS) do
            for k=1, math.floor(VIRTUAL_NODE * peer[1]) do
                local hash_data = peer[2] .. "-" .. (k - 1)
                table.insert(C, {peer[2], hash_fn(hash_data)})
            end
        end
    
        quicksort(C, function(a,b) return a[2] > b[2] end)
        CONTINUUM = C
    
    --[[
        for i=1,#C do
            print(CONTINUUM[i][1],CONTINUUM[i][2])
        end
    --]]
    
        local step = math.floor(0xFFFFFFFF / CONSISTENT_BUCKETS)
    
        BUCKETS = {}
        for i=1, CONSISTENT_BUCKETS do
            table.insert(BUCKETS, i, chash_find(math.floor(step * (i - 1))))
            -- print(BUCKETS[i][1],BUCKETS[i][2])
        end
    
    end
    M.init = chash_init
    
    local function chash_get_upstream_crc32(point)
        return BUCKETS[(point % CONSISTENT_BUCKETS)+1][1]
    end
    M.get_upstream_crc32 = chash_get_upstream_crc32
    
    local function chash_get_upstream(key)
        local point = math.floor(hash_fn(key)) 
        return chash_get_upstream_crc32(point)
    end
    M.get_upstream = chash_get_upstream
    
    local function chash_add_upstream(upstream, weigth)
        weight = weight or 1
        table.insert(HASH_PEERS, {weight, upstream})
    end
    M.add_upstream = chash_add_upstream
    
    return M

    API调用方式:

    local redis_login= {
        "10.50.1.3:11211",
        "10.50.1.4:11211",
        "10.50.1.5:11211",
    }
    
    for k, backend in ipairs(redis_login) do
        chash_login.add_upstream(backend)
    end
    chash_login.init()
    
    uid="309473941"
    chash_login.chash_get_upstream(uid)

    返回一个backend地址,将该uid对应的数据写入对应的redis中即可,稍后可以使用Nginx读到。

    PS:关于redis的mass insertion问题,最高效的方式是批量写入文件(文件格式遵循redis协议),然后使用 redis-cli --pipe 直接导入。

  • 相关阅读:
    python-configparser模块,xml.etree模块
    Ubuntu16.04环境下Vim 配置 for HTML,CSS,JAVASCRIPT(1)
    Windows 命令行及Git操作
    Ubuntu16.04 无任务栏问题
    ubuntu16.04安装中文输入法
    本地Web服务器搭建
    爬虫(1)
    Python(四):数字连珠2
    python学习(四)五数连珠
    Openjudge 百练第4109题
  • 原文地址:https://www.cnblogs.com/chenny7/p/3640990.html
Copyright © 2011-2022 走看看