import numpy as np
class DBSCAN:
def __init__(self, epsilon=1, min_pts=3):
self.epsilon = epsilon
self.min_pts = min_pts
self.core = None
self.labels_ = None
def _dist(self, X, i, j):
return np.sqrt(np.sum((X[i] - X[j])**2))
def _neighbor(self, X, i):
neig = []
for j in range(X.shape[0]):
if i != j and self._dist(X, i, j) <= self.epsilon:
neig.append(j)
return neig
def fit(self, X):
self.core = []
self.labels_ = np.zeros(X.shape[0])
# 查找核心对象
for i in range(X.shape[0]):
neig = self._neighbor(X, i)
if len(neig) >= self.min_pts:
self.core.append(i)
# 生成簇
k = 0
while self.core:
index = list(range(X.shape[0]))
temp = index.copy() # 未访问样本
root = np.random.choice(self.core, 1)
Q = [root]
temp.remove(root)
while Q: # 循环查找可达样本
q = Q.pop(0)
neig = self._neighbor(X, q)
if len(neig) >= self.min_pts:
delta = list(set(neig).intersection(set(temp)))
Q = list(set(Q).union(set(delta)))
temp = list(set(temp).difference(set(delta)))
# 存储簇
C_k = list(set(index).difference(set(temp)))
self.core = list(set(self.core).difference(set(C_k)))
self.labels_[C_k] = k
k += 1
return self
if __name__ == "__main__":
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.datasets import load_iris
sns.set()
iris = load_iris()
X = iris.data[:100]
y = iris.target[:100]
model = DBSCAN(epsilon=1, min_pts=3).fit(X)
sns.relplot(x=X[:, 0], y=X[:, 1], hue=y)
plt.show()
sns.relplot(x=X[:, 0], y=X[:, 1], hue=model.labels_)
plt.show()
评论0