0%

DBSCAN算法

本文简单介绍DBSCAN算法的原理及实现。

DBSCAN算法原理

基本概念

DBSCAN(Density-Based Spatial Clustering of Applications with Noise)是一种基于密度的空间聚类算法。该算法将具有足够密度的区域划分为簇,并在具有噪声的空间数据库中发现任意形状的簇,它将簇定义为密度相连的点的最大集合。

假设样本集为\(D=(x_1, x_2, ..., x_m)\),DBSCAN算法有如下相关定义:

  • \(\epsilon\)-邻域:对于\(x_j∈D\),其\(\epsilon\)-邻域包含样本集\(D\)中与\(x_j\)的距离不大于\(\epsilon\)的子样本集,即\(N_{\epsilon}(x_j)= \{x_i∈D|distance(x_i,x_j) \leqslant \epsilon \},\) 这个子样本集的个数记为\(|N_{\epsilon}(x_j)|\)
  • 核心点:对于任一样本\(x_j∈D\),如果其ϵϵ-邻域对应的\(N_{\epsilon}(x_j)\)至少包含\(MinPts\)个样本,即如果\(|N_{\epsilon}(x_j)|≥MinPts\),则\(x_j\)是核心对象。
  • 密度直达:如果\(x_i\)位于\(x_j\)\(\epsilon\)-邻域中,且\(x_j\)是核心对象,则称\(x_i\)\(x_j\)密度直达。
  • 密度可达:对于\(x_i\)\(x_j\),如果存在样本样本序列\(p_1,p_2,...,p_T\),满足\(p_1=x_i,pT=x_j\), 且\(p_{t+1}\)\(p_t\)密度直达,则称\(x_j\)\(x_i\)密度可达。
  • 密度相连:对于\(x_i\)\(x_j\),如果存在核心对象样本\(x_k\),使\(x_i\)\(x_j\)均由\(x_k\)密度可达,则称\(x_i\)\(x_j\)密度相连。

注意:

  • 密度直达是非对称的,若\(x_i\)\(x_j\)密度直达,只有当\(x_i\)也为核心对象时,才有\(x_j\)\(x_i\)密度直达
  • 密度可达是密度直达的传递闭包,是非对称的
  • 密度相连是对称的

算法原理

总体思路

  1. 遍历每一个未处理的点
    1. 如果为核心点,找出所有从该点密度可达的点,形成一个簇
    2. 否则,标记后遍历下一个点

详细算法

算法1

输入:样本集\(D=(x_1, x_2, ..., x_m)\),参数\((\epsilon, MinPts)\)

输出:核心点集合\(\Omega\)

  1. 初始化核心对象集合\(\Omega = \emptyset\)
  2. 对于所有点\(x_j\):
    1. 找到样本\(x_j\)\(\epsilon\)-邻域子样本集\(N_{\epsilon}(x_j)\)
    2. 如果子样本集合个数满足\(|N_{\epsilon}(x_j)|≥MinPts\),将样本\(x_j\)加入核心对象样本集合\(\Omega = \Omega \cup \{ x_j \}\)

算法2

输入:样本集\(D=(x_1, x_2, ..., x_m)\),参数\((\epsilon, MinPts)\)

输出:簇划分结果\(C=\{ C_1, C_2, ..., C_k \}\)

  1. 执行算法1,找到所有核心点\(\Omega\)
  2. 初始化簇类别\(k=0\), 未访问样本集合\(\Gamma = D\), 簇划分结果\(C=\emptyset\)
  3. 如果核心点集合\(\Omega\)不为空,则继续执行,否则算法结束
  4. 在核心对象集合\(\Omega\)中,随机选择一个核心对象\(o\),初始化当前簇核心对象队列\(Ω_{cur}=\{o\}\), 初始化类别序号\(k=k+1\),初始化当前簇样本集合\(C_k=\{o\}\), 更新未访问样本集合\(Γ=Γ−{o}\)
  5. Repeat
  6. 如果当前簇核心对象集合\(Ω_{cur}\ne∅\),在\(Ω_{cur}\)中取出一个核心对象\(o′\),通过邻域距离阈值\(ϵ\)找出所有的\(ϵ\)-邻域子样本集\(N_ϵ(o′)\),令\(Δ=N_ϵ(o′)∩Γ\), 更新当前簇样本集合\(C_k=C_k∪Δ\), 更新未访问样本集合\(Γ=Γ−Δ\), 更新\(Ω_{cur}=Ω_{cur}∪(Δ∩Ω)−o′\),更新核心对象集合\(Ω=Ω−C_k\)
  7. 如果当前簇核心对象队列\(Ω_{cur}=∅\),则当前聚类簇\(C_k\)生成完毕, 更新簇划分\(C=C \cup C_k\) ,转入步骤3。

DBSCAN算法实现

Sklearn DBSCAN

尝试使用sklearn中的DBSCAN算法。

1
2
3
4
5
6
# all import
from sklearn import datasets
import matplotlib.pyplot as plt
from sklearn.cluster import DBSCAN
import numpy as np
import time
1
2
3
4
# generate data
n_samples = 1500
noisy_circles = datasets.make_circles(n_samples=n_samples, factor=.5,noise=.05)
X, y = noisy_circles
1
2
3
4
5
6
7
8
9
# plot
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
# plt.figure(1)
plt.scatter(X[:, 0], X[:, 1])

plt.subplot(1, 2, 2)
# plt.figure(2)
plt.scatter(X[:, 0], X[:, 1], c=y)

上图是sklearn生成的数据,右图是根据label直接绘制的结果。

下面使用sklearn中的DBSCAN类对数据X进行聚类。

1
2
3
4
5
6
# run sklearn DBSCAN to predict
y_pred = DBSCAN(eps=0.1).fit_predict(X)

# plot sklearn DBSCAN result
plt.figure(figsize=(5, 5))
plt.scatter(X[:, 0], X[:, 1], c=y_pred)

My DBSCAN

此部分代码为笔者依据上述伪代码实现,未经过优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# my DBSCAN algorithm
from sklearn.metrics.pairwise import pairwise_distances

class MyDBSCAN:
def __init__(self, X, eps=0.5, minPts=5):
"""
@param:
X: input dataset, (n, m)
eps:
minPts:
"""
self.X = X
self.n = len(X)
self.eps = eps
self.minPts = minPts

def get_core_points(self):
"""
@return:
core_points: the index of core points
"""
core_points = []
for i in range(self.n):
neighbors = self.get_neighbors(i)
if len(neighbors) >= self.minPts:
core_points.append(i)
return core_points


def get_neighbors(self, point_idx):
"""
@param:
point_idx: index of point
@return:
neighbors: neighbors of point
"""
neighbors = []
for i in range(self.n):
if self.dis_mat[point_idx][i] < self.eps:
neighbors.append(i)
return neighbors

def compute_dis_mat(self):
"""
@param:
X: input dataset, (n, m)
@return:
self.dis_mat: (n, n)
"""
self.dis_mat = pairwise_distances(X)
return

def random_choose(self, points):
"""
random choose a point
"""
# return points[0]
return points[np.random.randint(0, len(points))]

def fit_predict(self):
"""
@param:
X: input dataset, (n, m)
@return:
y_pred: predicted result, (n,)
"""
# computer distance matrix
self.compute_dis_mat()

core_points = set(self.get_core_points())
waiting_points = set([i for i in range(len(self.X))])
cluster = -1 * np.ones(self.n, dtype=np.int32)
k = 0

while len(core_points) > 0:
o = self.random_choose(list(core_points))
cur_core_points = set()
cur_core_points.add(o)
cur_cluster = set()
cur_cluster.add(o)
waiting_points.remove(o)
while True:
if len(cur_core_points) > 0:
point = list(cur_core_points)[0]
neighbors = set(self.get_neighbors(point))
delta = neighbors & waiting_points
cur_cluster = cur_cluster | delta
waiting_points = waiting_points - delta
cur_core_points = cur_core_points | (delta & core_points)
cur_core_points.remove(point)
core_points = core_points - cur_cluster
else:
cluster[list(cur_cluster)] = k
k += 1
break
return cluster

将其用到sklearn生成的数据集上测试效果:

1
2
3
4
n_samples = 500
noisy_circles = datasets.make_circles(n_samples=n_samples, factor=.5,
noise=.05)
X, y = noisy_circles
1
2
3
4
5
6
7
8
9
start = time.time()
y_my_pred = MyDBSCAN(X=X, eps=0.15).fit_predict()
end1 = time.time()
y_pred = DBSCAN(eps=0.15).fit_predict(X)
end2 = time.time()

print('{:15} {:5} {:3}'.format('Algorithm', 'Time', 'NMI'))
print('{:15} {:5.2f} {:3}'.format('My DBSCAN ', end1 - start, metrics.normalized_mutual_info_score(y, y_my_pred)))
print('{:15} {:5.2f} {:3}'.format('Sklearn DBSCAN', end2 - end1, metrics.normalized_mutual_info_score(y, y_pred)))
1
2
3
Algorithm		Time  NMI
My DBSCAN 0.11 1.0
Sklearn DBSCAN 0.00 1.0
1
2
3
4
5
6
7
8
9
10
11
12
13
# plot sklearn DBSCAN result
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.scatter(X[:, 0], X[:, 1], c=y)
plt.title('origin data')

plt.subplot(1, 3, 2)
plt.scatter(X[:, 0], X[:, 1], c=y_pred)
plt.title('sklearn DBSCAN')

plt.subplot(1, 3, 3)
plt.scatter(X[:, 0], X[:, 1], c=y_my_pred)
plt.title('My DBSCAN')

修改eps后的测试结果(minPts=5):

eps result
0.1
0.12
0.15

可以观察到在sklearn生成的该类数据集上My DBSCAN与sklearn DBSCAN的聚类结果非常接近,但My DBSCAN的运行时间较长。

Reference

A Density-Based Algorithm for Discovering Clusters in Large Spatial Databases with Noise (aaai.org)

DBSCAN密度聚类算法 - 刘建平Pinard - 博客园

用scikit-learn学习DBSCAN聚类 - 刘建平Pinard - 博客园

DBSCAN - 维基百科

详解DBSCAN聚类 - 知乎

Visualizing DBSCAN Clustering