zoukankan      html  css  js  c++  java
  • Nginx一致性哈希模块的Lua实现

    Nginx一致性哈希模块的Lua重新实现

    技术背景:

    最近在工作中使用了nginx+redis 的架构,redis在后台做分布式存储,每个redis都存放不同的数据,这些数据都是某门户网站通过Hadoop分析出来的用户行为日志,key是uid,value是user profile,每小时更新量在500-800万条记录,而这些记录一旦生成,我需要在5分钟左右的时间完成所有导入过程。

    首先,我在nginx中使用了第三方模块HttpUpstreamConsistent来做负载均衡策略,针对不同用户(uid)选取不同的backend redis:

       upstream somestream {
          consistent_hash $arg_uid;
          server 10.50.1.3:11211;
          server 10.50.1.4:11211;
          server 10.50.1.5:11211;
        }

    现在问题来了,由于Hadoop系统处理日志的速度非常快,如果把每条记录都通过Nginx来写入Redis中,这样的速度是无法接受的,而且会影响Nginx对正常请求的服务能力。所以,需要将这些数据以离线的方式导入redis集群中,这样就要重新实现HttpUpstreamConsistent模块了,才能保证读写的哈希策略一致。

    下面的源码演示了如何将HttpUpstreamConsistent模块翻译成Lua的过程,(使用了CRC32作散列,依赖库的路径已列在Reference中)。

    #!/usr/bin/lua
    
    -- chenqi@2014/04/02
    --[Reference]
    --https://github.com/yaoweibin/ngx_http_consistent_hash
    --https://github.com/davidm/lua-digest-crc32lua
    
    local CRC = require('CRC32')
    
    local M = {}
    
    local CONSISTENT_BUCKETS = 1024
    local VIRTUAL_NODE = 160
    
    local HASH_PEERS = {}
    local CONTINUUM = {}
    local BUCKETS = {}
    
    local function hash_fn(key)
        return CRC.crc32(key)
    end
    
    -- in-place quicksort
    function quicksort(array,compareFunc)  
        quick(array,1,#array,compareFunc)  
    end  
    
    function quick(array,left,right,compareFunc)  
        if(left < right ) then  
            local index = partion(array,left,right,compareFunc)  
            quick(array,left,index-1,compareFunc)  
            quick(array,index+1,right,compareFunc)  
        end  
    end  
      
    function partion(array,left,right,compareFunc)  
        local key = array[left] 
        local index = left  
        array[index],array[right] = array[right],array[index]
        local i = left  
        while i< right do  
            if compareFunc( key,array[i]) then  
                array[index],array[i] = array[i],array[index]
                index = index + 1  
            end  
            i = i + 1  
        end  
        array[right],array[index] = array[index],array[right]
        return index;  
    end  
    
    -- binary search
    local function chash_find(point)
        local mid, lo, hi = 1, 1, #CONTINUUM
        while 1 do
            if point <= CONTINUUM[lo][2] or point > CONTINUUM[hi][2] then
                return CONTINUUM[lo]
            end
    
            -- test middle point
            mid = lo + math.floor((hi-lo)/2)
    
            -- perfect match
            if point <= CONTINUUM[mid][2] and point > (mid > 1 and CONTINUUM[mid-1][2] or 0) then
                return CONTINUUM[mid]
            end
    
            -- too low, go up
            if CONTINUUM[mid][2] < point then
                lo = mid + 1
            else
                hi = mid - 1
            end
        end
    end
    
    local function chash_init()
        local n = #HASH_PEERS
        if n == 0 then
            print("There is no backend servers")
            return
        end
    
        local C = {}
        for i,peer in ipairs(HASH_PEERS) do
            for k=1, math.floor(VIRTUAL_NODE * peer[1]) do
                local hash_data = peer[2] .. "-" .. (k - 1)
                table.insert(C, {peer[2], hash_fn(hash_data)})
            end
        end
    
        quicksort(C, function(a,b) return a[2] > b[2] end)
        CONTINUUM = C
    
    --[[
        for i=1,#C do
            print(CONTINUUM[i][1],CONTINUUM[i][2])
        end
    --]]
    
        local step = math.floor(0xFFFFFFFF / CONSISTENT_BUCKETS)
    
        BUCKETS = {}
        for i=1, CONSISTENT_BUCKETS do
            table.insert(BUCKETS, i, chash_find(math.floor(step * (i - 1))))
            -- print(BUCKETS[i][1],BUCKETS[i][2])
        end
    
    end
    M.init = chash_init
    
    local function chash_get_upstream_crc32(point)
        return BUCKETS[(point % CONSISTENT_BUCKETS)+1][1]
    end
    M.get_upstream_crc32 = chash_get_upstream_crc32
    
    local function chash_get_upstream(key)
        local point = math.floor(hash_fn(key)) 
        return chash_get_upstream_crc32(point)
    end
    M.get_upstream = chash_get_upstream
    
    local function chash_add_upstream(upstream, weigth)
        weight = weight or 1
        table.insert(HASH_PEERS, {weight, upstream})
    end
    M.add_upstream = chash_add_upstream
    
    return M

    API调用方式:

    local redis_login= {
        "10.50.1.3:11211",
        "10.50.1.4:11211",
        "10.50.1.5:11211",
    }
    
    for k, backend in ipairs(redis_login) do
        chash_login.add_upstream(backend)
    end
    chash_login.init()
    
    uid="309473941"
    chash_login.chash_get_upstream(uid)

    返回一个backend地址,将该uid对应的数据写入对应的redis中即可,稍后可以使用Nginx读到。

    PS:关于redis的mass insertion问题,最高效的方式是批量写入文件(文件格式遵循redis协议),然后使用 redis-cli --pipe 直接导入。

  • 相关阅读:
    net core 使用 rabbitmq
    asp.net core WebApi 返回 HttpResponseMessage
    asp.net core 2.1 WebApi 快速入门
    JQuery EasyUI combobox动态添加option
    php截取字符去掉最后一个字符
    JQuery EasyUI Combobox的onChange事件
    对于不返回任何键列信息的 selectcommand 不支持 updatecommand 的动态 sql 生成
    Access2007 操作或事件已被禁用模式阻止解决办法
    Easyui 中 Tabsr的常用方法
    Win 7 IE11不能下载文件,右键另存为也不行
  • 原文地址:https://www.cnblogs.com/chenny7/p/3640990.html
Copyright © 2011-2022 走看看