Bloom Filter (Python)

Bloom Filter (Python)

2020, Jul 20    

Code

Test Code

n = 2_00_00_000 # number of items to add
p = 0.000001 # false positive probability
bloomf = BloomFilter(n, p)

for item in ['abound', 'abounds', 'abundance', 'abundant']:
    bloomf.add(item)

print(bloomf.check('abound')) # False
print(bloomf.check('abounds')) # True

Implementation

from bitarray import bitarray
import mmh3
import math

class BloomFilter:
    def __init__(self, items_count: int, fp_prob: float) -> None:
        """
        :param items_count: int
            Number of items expected to be stored in bloom filter
        :param fp_prob: float
            False Positive probability in decimal
        """
        # False possible probability in decimal
        self.fp_prob = fp_prob
        # Size of bit array to use
        self.size = self.get_size(items_count, fp_prob)
        # number of hash functions to use
        self.hash_count = self.get_hash_count(self.size, items_count)
        # Bit array of given size
        self.bit_array = bitarray(self.size)
        self.bit_array.setall(0)

    def add(self, item):
        """
        Add an item in the filter
        :param item:
        :return:
        """
        for i in range(self.hash_count):
            digest = mmh3.hash(item, i) % self.size
            self.bit_array[digest] = True

    def check(self, item):
        """
        Check for existence of an item in filter
        :param item:
        :return:
        """
        for i in range(self.hash_count):
            digest = mmh3.hash(item, i) % self.size
            if not self.bit_array[digest]:
                return False
        return True

    @classmethod
    def get_size(cls, n: int, p: float):
        """
        Return the size of bit array(m) to used using
        following formula
        m = -(n * lg(p)) / (lg(2)^2)
        :param n : int
            number of items expected to be stored in filter
        :param p : float
            False Positive probability in decimal
        """
        m = - (n * math.log(p)) / (math.log(2) ** 2)
        return int(m)

    @classmethod
    def get_hash_count(cls, m: int, n: int):
        """
        Return the hash function(k) to be used using
        following formula
        k = (m/n) * lg(2)
        :param m:int
            size of bit array
        :param n:int
            number of items expected to be stored in filter
        :return: int
        """
        k = (m / n) * math.log(2)
        return int(k)