Java 类名:com.alibaba.alink.operator.batch.clustering.DbscanBatchOp
Python 类名:DbscanBatchOp
DBSCAN,Density-Based Spatial Clustering of Applications with Noise,是一个比较有代表性的基于密度的聚类算法。与划分和层次聚类方法不同,它将簇定义为密度相连的点的最大集合,能够把具有足够高密度的区域划分为簇,并可在噪声的空间数据库中发现任意形状的聚类。
本算法为DBSCAN对应的训练组件,输入为训练数据,输出有两个:(1)每个数据点的ID,节点类型以及聚类中心,(2)可以用来预测新数据的DBSCAN模型。
参数名称 | 参数描述 | 说明 |
---|---|---|
EUCLIDEAN | 欧式距离 | |
COSINE | 夹角余弦距离 | |
CITYBLOCK | 街区距离 |
名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 |
---|---|---|---|---|---|---|
epsilon | 邻域距离阈值 | 邻域距离阈值 | Double | ✓ | ||
idCol | ID列名 | ID列对应的列名 | String | ✓ | ||
minPoints | 邻域中样本个数的阈值 | 邻域中样本个数的阈值 | Integer | ✓ | ||
predictionCol | 预测结果列名 | 预测结果列名 | String | ✓ | ||
vectorCol | 向量列名 | 向量列对应的列名 | String | ✓ | 所选列类型为 [DENSE_VECTOR, SPARSE_VECTOR, STRING, VECTOR] | |
distanceType | 距离度量方式 | 距离类型 | String | “EUCLIDEAN”, “COSINE”, “CITYBLOCK” | “EUCLIDEAN” |
from pyalink.alink import * import pandas as pd useLocalEnv(1) data = pd.DataFrame([ ["id_1", "2.0,3.0"], ["id_2", "2.1,3.1"], ["id_3", "200.1,300.1"], ["id_4", "200.2,300.2"], ["id_5", "200.3,300.3"], ["id_6", "200.4,300.4"], ["id_7", "200.5,300.5"], ["id_8", "200.6,300.6"], ["id_9", "2.1,3.1"], ["id_10", "2.1,3.1"], ["id_11", "2.1,3.1"], ["id_12", "2.1,3.1"], ["id_16", "300.,3.2"] ]) inOp1 = BatchOperator.fromDataframe(data, schemaStr='id string, vec string') inOp2 = StreamOperator.fromDataframe(data, schemaStr='id string, vec string') dbscan = DbscanBatchOp()\ .setIdCol("id")\ .setVectorCol("vec")\ .setMinPoints(3)\ .setEpsilon(0.5)\ .setPredictionCol("pred")\ .linkFrom(inOp1) dbscan.print() predict = DbscanPredictBatchOp()\ .setPredictionCol("pred")\ .linkFrom(dbscan.getSideOutput(0), inOp1) predict.print() predict = DbscanPredictStreamOp(dbscan.getSideOutput(0))\ .setPredictionCol("pred")\ .linkFrom(inOp2) predict.print() StreamOperator.execute()
import org.apache.flink.types.Row; import com.alibaba.alink.operator.batch.source.MemSourceBatchOp; import com.alibaba.alink.operator.stream.StreamOperator; import com.alibaba.alink.operator.stream.clustering.DbscanPredictStreamOp; import com.alibaba.alink.operator.batch.clustering.DbscanPredictBatchOp; import com.alibaba.alink.operator.batch.clustering.DbscanBatchOp; import com.alibaba.alink.operator.stream.source.MemSourceStreamOp; import org.junit.Test; import java.util.Arrays; import java.util.List; public class DbscanBatchOpTest { @Test public void testDbscanBatchop() throws Exception { List <Row> dataPoints = Arrays.asList( Row.of("id_1", "2.0,3.0"), Row.of("id_2", "2.1,3.1"), Row.of("id_3", "200.1,300.1"), Row.of("id_4", "200.2,300.2"), Row.of("id_5", "200.3,300.3"), Row.of("id_6", "200.4,300.4"), Row.of("id_7", "200.5,300.5"), Row.of("id_8", "200.6,300.6"), Row.of("id_9", "2.1,3.1"), Row.of("id_10", "2.1,3.1"), Row.of("id_11", "2.1,3.1"), Row.of("id_12", "2.1,3.1"), Row.of("id_16", "300.,3.2")); MemSourceBatchOp inOp1 = new MemSourceBatchOp(dataPoints, "id string, vec string"); MemSourceStreamOp inOp2 = new MemSourceStreamOp(dataPoints, "id string, vec string"); DbscanBatchOp dbscanBatchOp = new DbscanBatchOp() .setIdCol("id") .setVectorCol("vec") .setMinPoints(3) .setEpsilon(0.5) .setPredictionCol("pred") .linkFrom(inOp1); dbscanBatchOp.print(); DbscanPredictBatchOp dbscanPredictBatchOp = new DbscanPredictBatchOp() .setPredictionCol("pred") .linkFrom(dbscanBatchOp.getSideOutput(0), inOp1); dbscanPredictBatchOp.print(); DbscanPredictStreamOp dbscanPredictStreamOp = new DbscanPredictStreamOp(dbscanBatchOp.getSideOutput(0)) .setPredictionCol("pred") .linkFrom(inOp2); dbscanPredictStreamOp.print(); StreamOperator.execute(); } }
id | type | pred |
---|---|---|
id_4 | CORE | 1 |
id_8 | CORE | 1 |
id_2 | CORE | 0 |
id_6 | CORE | 1 |
id_16 | NOISE | -2147483648 |
id_7 | CORE | 1 |
id_12 | CORE | 0 |
id_5 | CORE | 1 |
id_1 | CORE | 0 |
id_3 | CORE | 1 |
id_9 | CORE | 0 |
id_10 | CORE | 0 |
id_11 | CORE | 0 |
id | vec | pred |
---|---|---|
id_1 | 2.0,3.0 | 0 |
id_2 | 2.1,3.1 | 0 |
id_3 | 200.1,300.1 | 1 |
id_4 | 200.2,300.2 | 1 |
id_5 | 200.3,300.3 | 1 |
id_6 | 200.4,300.4 | 1 |
id_7 | 200.5,300.5 | 1 |
id_8 | 200.6,300.6 | 1 |
id_9 | 2.1,3.1 | 0 |
id_10 | 2.1,3.1 | 0 |
id_11 | 2.1,3.1 | 0 |
id_12 | 2.1,3.1 | 0 |
id_16 | 300.,3.2 | -2147483648 |
id | vec | pred |
---|---|---|
id_11 | 2.1,3.1 | 0 |
id_1 | 2.0,3.0 | 0 |
id_16 | 300.,3.2 | -2147483648 |
id_12 | 2.1,3.1 | 0 |
id_6 | 200.4,300.4 | 1 |
id_3 | 200.1,300.1 | 1 |
id_7 | 200.5,300.5 | 1 |
id_9 | 2.1,3.1 | 0 |
id_2 | 2.1,3.1 | 0 |
id_10 | 2.1,3.1 | 0 |
id_4 | 200.2,300.2 | 1 |
id_5 | 200.3,300.3 | 1 |
id_8 | 200.6,300.6 | 1 |
DBSCAN中常用的两个参数为:临域中样本个数的阈值(minPoints) 和 临域距离阈值(epsilon):
- 当观测到cluster数目过多,想要减少cluster数目时,建议调大minPoints,调小epsilon(建议优先调节minPoints)。
- 当观测到cluster数目过少,想要增加cluster数目时,建议调小minPoints,调大epsilon(建议优先调节minPoints)。