zoukankan      html  css  js  c++  java
  • 基于Google Guava之BloomFilter的Redis的重构

    对另一篇博客代码的补充。

    原博是谁不知道,参考博文:https://segmentfault.com/a/1190000012620152

    不再基于jedis,改用redisTemplate。跑了几次,发现确实可以动态扩容。原博牛逼!!!

    RedisBloomFilter.java

    package com.ylzinfo.ehc.server.bloomFilter.redis;
    
    import com.google.common.base.Predicate;
    import com.google.common.hash.Funnel;
    import org.checkerframework.checker.nullness.qual.Nullable;
    
    import java.io.Serializable;
    
    import static com.google.common.base.Preconditions.checkArgument;
    import static com.google.common.base.Preconditions.checkNotNull;
    
    /**
     * @Auther: syh
     * @Date: 2020/7/10
     * @Description: 基于redis和guava的bloomFilter
     */
    public class RedisBloomFilter<T> implements Predicate<T>, Serializable {
    
        private final RedisBitmaps bits;
        private final int numHashFunctions;
        private final Funnel<? super T> funnel;
        private final RedisBloomFilter.Strategy strategy;
    
        private RedisBloomFilter(
                RedisBitmaps bits, int numHashFunctions, Funnel<? super T> funnel, RedisBloomFilter.Strategy strategy) {
            checkArgument(numHashFunctions > 0, "numHashFunctions (%s) must be > 0", numHashFunctions);
            checkArgument(
                    numHashFunctions <= 255, "numHashFunctions (%s) must be <= 255", numHashFunctions);
            this.bits = checkNotNull(bits);
            this.numHashFunctions = numHashFunctions;
            this.funnel = checkNotNull(funnel);
            this.strategy = checkNotNull(strategy);
        }
    
        public static <T> RedisBloomFilter create(Funnel<? super T> funnel, int expectedInsertions, double fpp) {
            return create(funnel, (long) expectedInsertions, fpp);
        }
    
        public static <T> RedisBloomFilter<T> create(
                Funnel<? super T> funnel, long expectedInsertions, double fpp) {
            return create(funnel, expectedInsertions, fpp, RedisBloomFilterStrategies.MURMUR128_MITZ_64);
        }
    
        static <T> RedisBloomFilter<T> create(
                Funnel<? super T> funnel, long expectedInsertions, double fpp, RedisBloomFilter.Strategy strategy) {
            checkNotNull(funnel);
            checkArgument(
                    expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions);
            checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp);
            checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp);
            checkNotNull(strategy);
    
            if (expectedInsertions == 0) {
                expectedInsertions = 1;
            }
    
            long numBits = optimalNumOfBits(expectedInsertions, fpp);
            int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
            try {
                return new RedisBloomFilter<T>(new RedisBitmaps(numBits), numHashFunctions, funnel, strategy);
            } catch (IllegalArgumentException e) {
                throw new IllegalArgumentException("Could not create BloomFilter of " + numBits + " bits", e);
            }
        }
    
        @Override
        public boolean apply(@Nullable T input) {
            return mightContain(input);
        }
    
        public boolean put(T object) {
            return strategy.put(object, funnel, numHashFunctions, bits);
        }
    
        public boolean mightContain(T object) {
            return strategy.mightContain(object, funnel, numHashFunctions, bits);
        }
    
        static long optimalNumOfBits(long n, double p) {
            if (p == 0) {
                p = Double.MIN_VALUE;
            }
            return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
        }
    
        static int optimalNumOfHashFunctions(long n, long m) {
            return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
        }
    
        interface Strategy extends Serializable {
            <T> boolean put(T object, Funnel<? super T> funnel, int numHashFunctions, RedisBitmaps bits);
    
            <T> boolean mightContain(T object, Funnel<? super T> funnel, int numHashFunctions, RedisBitmaps bits);
    
            int ordinal();
        }
    }
    

      

    RedisBloomFilterStrategies.java

    package com.ylzinfo.ehc.server.bloomFilter.redis;
    
    import com.google.common.hash.Funnel;
    import com.google.common.hash.Hashing;
    import com.google.common.primitives.Longs;
    
    /**
     * @Auther: syh
     * @Date: 2020/7/10
     * @Description:
     */
    public enum RedisBloomFilterStrategies implements RedisBloomFilter.Strategy {
    
        MURMUR128_MITZ_64() {
            @Override
            public <T> boolean put(T object, Funnel<? super T> funnel, int numHashFunctions, RedisBitmaps bits) {
                long bitSize = bits.bitSize();
                byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).asBytes();
                long hash1 = lowerEight(bytes);
                long hash2 = upperEight(bytes);
    
                boolean bitsChanged = false;
                long combinedHash = hash1;
    
                long[] offsets = new long[numHashFunctions];
                for (int i = 0; i < numHashFunctions; i++) {
                    offsets[i] = (combinedHash & Long.MAX_VALUE) % bitSize;
                    combinedHash += hash2;
                }
                bitsChanged = bits.set(offsets);
                bits.ensureCapacityInternal();//自动扩容
                return bitsChanged;
            }
    
            @Override
            public <T> boolean mightContain(T object, Funnel<? super T> funnel, int numHashFunctions, RedisBitmaps bits) {
                long bitSize = bits.bitSize();
                byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).asBytes();
                long hash1 = lowerEight(bytes);
                long hash2 = upperEight(bytes);
                long combinedHash = hash1;
    
                long[] offsets = new long[numHashFunctions];
                for (int i = 0; i < numHashFunctions; i++) {
                    offsets[i] = (combinedHash & Long.MAX_VALUE) % bitSize;
                    combinedHash += hash2;
                }
                return bits.get(offsets);
            }
    
            private /* static */ long lowerEight(byte[] bytes) {
                return Longs.fromBytes(
                        bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]);
            }
    
            private /* static */ long upperEight(byte[] bytes) {
                return Longs.fromBytes(
                        bytes[15], bytes[14], bytes[13], bytes[12], bytes[11], bytes[10], bytes[9], bytes[8]);
            }
        }
    }
    

      

    RedisBitmaps.java

    package com.ylzinfo.ehc.server.bloomFilter.redis;
    
    import com.google.common.math.LongMath;
    import com.google.common.primitives.Longs;
    
    import java.math.RoundingMode;
    import java.util.Arrays;
    import java.util.List;
    import java.util.stream.LongStream;
    
    /**
     * @Auther: syh
     * @Date: 2020/7/10
     * @Description:
     */
    public class RedisBitmaps {
    
        private static final String BASE_KEY = "bloomfilter";
        private static final String CURSOR = "cursor";
        private long bitSize;
    
        RedisBitmaps(long bits) {
    
            this.bitSize = LongMath.divide(bits, 64, RoundingMode.CEILING) * Long.SIZE;//位数组的长度,相当于n个long的长度
    
            if (bitCount() == 0) {
                RedisExecutor.newExecutor()
                        .execute(conn -> conn.setBit(currentKey().getBytes(), bitSize - 1, false));
            }
        }
    
        boolean get(long[] offsets) {
            for (long i = 0; i < cursor() + 1; i++) {
                final long cursor = i;
                //只要有一个cursor对应的bitmap中,offsets全部命中,则表示可能存在
                boolean match = Arrays.stream(offsets).boxed()
                        .map(offset -> {
                            List<Boolean> list = RedisExecutor.newExecutor()
                                    .executePipelined(conn -> conn.getBit(genkey(cursor).getBytes(), offset));
                            return !list.contains(false);
                        })
                        .allMatch(b -> b == null ? false : b);
                if (match)
                    return true;
            }
            return false;
        }
    
        boolean get(final long offset) {
            Boolean rst = RedisExecutor.newExecutor()
                    .execute(conn -> conn.getBit(currentKey().getBytes(), offset));
            return rst == null ? false : rst;
        }
    
        boolean set(long[] offsets) {
            if (cursor() > 0 && get(offsets)) {
                return false;
            }
            boolean bitsChanged = false;
            for (long offset : offsets)
                bitsChanged |= set(offset);
            return bitsChanged;
        }
    
        boolean set(long offset) {
            if (!get(offset)) {
                RedisExecutor.newExecutor()
                        .execute(conn -> conn.setBit(currentKey().getBytes(), offset, true));
                return true;
            }
            return false;
        }
    
        long bitCount() {
            Long rst = RedisExecutor.newExecutor()
                    .execute(conn -> conn.bitCount(currentKey().getBytes()));
            return rst == null ? 0 : rst;
        }
    
        long bitSize() {
            return this.bitSize;
        }
    
        private String currentKey() {
            return genkey(cursor());
        }
    
        private String genkey(long cursor) {
            return BASE_KEY + "-" + cursor;
        }
    
        private long cursor() {
            String cursor = RedisExecutor.newExecutor()
                    .execute(conn -> conn.get(CURSOR.getBytes()));
            return cursor == null ? 0 : Longs.tryParse(cursor);
        }
    
        void ensureCapacityInternal() {
            if (bitCount() * 2 > bitSize())
                grow();
        }
    
        void grow() {
            Long cursor = RedisExecutor.newExecutor()
                    .execute((conn) -> conn.incr(CURSOR.getBytes()));
    
            RedisExecutor.newExecutor()
                    .execute(conn -> conn.setBit(genkey(cursor).getBytes(), bitSize - 1, false));
        }
    
        void reset() {
            byte[][] keys = LongStream.range(0, cursor() + 1).boxed().map(k -> genkey(k).getBytes()).toArray(byte[][]::new);
    
            RedisExecutor.newExecutor()
                    .execute(conn -> conn.del(keys));
            RedisExecutor.newExecutor()
                    .execute(conn -> conn.set(CURSOR.getBytes(), "0".getBytes()));
            RedisExecutor.newExecutor()
                    .execute(conn -> conn.setBit(currentKey().getBytes(), bitSize - 1, false));
        }
    }
    

      

    RedisExecutor.java

    package com.ylzinfo.ehc.server.bloomFilter.redis;
    
    import com.ylzinfo.ehc.core.gateway.SpringContextUtil;
    import org.springframework.data.redis.connection.RedisConnection;
    import org.springframework.data.redis.core.RedisCallback;
    import org.springframework.data.redis.core.RedisTemplate;
    
    import java.util.List;
    
    /**
     * @Auther: syh
     * @Date: 2020/7/10
     * @Description:
     */
    public class RedisExecutor<T> {
    
        private RedisTemplate redisTemplate;
    
        public static <T> RedisExecutor<T> newExecutor() {
            return new RedisExecutor<>();
        }
    
        public RedisExecutor() {
            redisTemplate = SpringContextUtil.getBean("redisTemplate");
        }
    
        public <T> T execute(PipelineExecutor executor) {
            return (T) redisTemplate.execute((RedisCallback) conn -> {
                conn.openPipeline();
                T rst = (T) executor.exec(conn);
                conn.close();
                return rst;
            });
        }
    
    
        public<T> List<T> executePipelined(PipelineExecutor executor) {
            List<T> list = redisTemplate.executePipelined((RedisCallback) conn -> {
                conn.openPipeline();
                executor.exec(conn);
                conn.close();
                return null;
            });
            return list;
        }
    
        @FunctionalInterface
        public interface PipelineExecutor<T> {
            T exec(RedisConnection conn);
        }
    }
    

      

    测试controller: 参数num表示待校验数据,grow表示是否开启扩容,每次扩容1000条。

    package com.ylzinfo.ehc.server.bloomFilter.redis;
    
    import com.google.common.hash.Funnels;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.servlet.http.HttpServletRequest;
    import java.nio.charset.Charset;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @Auther: syh
     * @Date: 2020/7/10
     * @Description:
     */
    @RestController
    public class RedisTest {
        RedisBloomFilter<String> bloomFilter = RedisBloomFilter.create(
                Funnels.stringFunnel(Charset.forName("utf-8")), 1000, 0.1);
        private boolean instance =true;
        private AtomicInteger incr = new AtomicInteger(0);
    
        @RequestMapping("bloom/filter/test")
        public Object test(HttpServletRequest request) {
            String num = request.getParameter("num");
            String grow = request.getParameter("grow");
    
            if (instance || "true".equals(grow)) {
                for (int i = 0; i < 1000; i++) {
                    bloomFilter.put(String.valueOf(incr.getAndIncrement()));
                }
                instance = false;
            }
    
            return bloomFilter.mightContain(num);
        }
    }
    

      

    num=1,grow=false时,返回true(命中目标)

    num-1000,grow=false时,返回false(因为还没扩容,所以未命中)
    num-1000,grow=true时,返回true(已扩容,所以命中)

  • 相关阅读:
    requestAnimationFrame替代setTimeout和setInterval
    回流和重绘
    11.24
    11.23
    成员访问.,需计算的成员访问[],new,函数调用(),可选链(?.)——宰相级别20级
    圆括号()——最高级别21级
    运算符优先级
    求幂(**)
    加号(+)
    垃圾回收
  • 原文地址:https://www.cnblogs.com/braska/p/13280791.html
Copyright © 2011-2022 走看看