DBSCAN预测 (DbscanPredictBatchOp)

Java 类名:com.alibaba.alink.operator.batch.clustering.DbscanPredictBatchOp

Python 类名:DbscanPredictBatchOp

功能介绍

DBSCAN,Density-Based Spatial Clustering of Applications with Noise,是一个比较有代表性的基于密度的聚类算法。与划分和层次聚类方法不同,它将簇定义为密度相连的点的最大集合,能够把具有足够高密度的区域划分为簇,并可在噪声的空间数据库中发现任意形状的聚类。

本算法为DBSCAN对应的预测组件,输入有两个,分别是:(1)DBSCAN模型(可以通过DBscan训练组件得到)(2)输出有一个,为预测数据。

距离度量方式

参数名称 参数描述 说明
EUCLIDEAN 欧式距离
COSINE 夹角余弦距离
CITYBLOCK 街区距离

参数说明

名称 中文名称 描述 类型 是否必须? 取值范围 默认值
predictionCol 预测结果列名 预测结果列名 String
modelFilePath 模型的文件路径 模型的文件路径 String null
predictionDetailCol 预测详细信息列名 预测详细信息列名 String
reservedCols 算法保留列名 算法保留列 String[] null
numThreads 组件多线程线程个数 组件多线程线程个数 Integer 1

代码示例

Python 代码

from pyalink.alink import *

import pandas as pd

useLocalEnv(1)

df = pd.DataFrame([
        ["id_1", "2.0,3.0"],
        ["id_2", "2.1,3.1"],
        ["id_3", "200.1,300.1"],
        ["id_4", "200.2,300.2"],
        ["id_5", "200.3,300.3"],
        ["id_6", "200.4,300.4"],
        ["id_7", "200.5,300.5"],
        ["id_8", "200.6,300.6"],
        ["id_9", "2.1,3.1"],
        ["id_10", "2.1,3.1"],
        ["id_11", "2.1,3.1"],
        ["id_12", "2.1,3.1"],
        ["id_16", "300.,3.2"]
])

inOp1 = BatchOperator.fromDataframe(df, schemaStr='id string, vec string')
inOp2 = StreamOperator.fromDataframe(df, schemaStr='id string, vec string')

dbscan = DbscanBatchOp()\
    .setIdCol("id")\
    .setVectorCol("vec")\
    .setMinPoints(3)\
    .setEpsilon(0.5)\
    .setPredictionCol("pred")\
    .linkFrom(inOp1)

dbscan.print()

predict = DbscanPredictBatchOp()\
    .setPredictionCol("pred")\
    .linkFrom(dbscan.getSideOutput(0), inOp1)
    
predict.print()

predict = DbscanPredictStreamOp(dbscan.getSideOutput(0))\
    .setPredictionCol("pred")\
    .linkFrom(inOp2)
    
predict.print()

StreamOperator.execute()

Java 代码

import org.apache.flink.types.Row;

import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.clustering.DbscanPredictStreamOp;
import com.alibaba.alink.operator.batch.clustering.DbscanPredictBatchOp;
import com.alibaba.alink.operator.batch.clustering.DbscanBatchOp;
import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;

import org.junit.Test;

import java.util.Arrays;
import java.util.List;

public class DbscanPredictBatchOpTest {

	@Test
	public void testDbscanPredictBatchop() throws Exception {
		List <Row> dataPoints = Arrays.asList(
			Row.of("id_1", "2.0,3.0"),
			Row.of("id_2", "2.1,3.1"),
			Row.of("id_3", "200.1,300.1"),
			Row.of("id_4", "200.2,300.2"),
			Row.of("id_5", "200.3,300.3"),
			Row.of("id_6", "200.4,300.4"),
			Row.of("id_7", "200.5,300.5"),
			Row.of("id_8", "200.6,300.6"),
			Row.of("id_9", "2.1,3.1"),
			Row.of("id_10", "2.1,3.1"),
			Row.of("id_11", "2.1,3.1"),
			Row.of("id_12", "2.1,3.1"),
			Row.of("id_16", "300.,3.2"));

		MemSourceBatchOp inOp1 = new MemSourceBatchOp(dataPoints, "id string, vec string");
		MemSourceStreamOp inOp2 = new MemSourceStreamOp(dataPoints, "id string, vec string");

		DbscanBatchOp dbscanBatchOp = new DbscanBatchOp()
			.setIdCol("id")
			.setVectorCol("vec")
			.setMinPoints(3)
			.setEpsilon(0.5)
			.setPredictionCol("pred")
			.linkFrom(inOp1);
		dbscanBatchOp.print();

		DbscanPredictBatchOp dbscanPredictBatchOp = new DbscanPredictBatchOp()
			.setPredictionCol("pred")
			.linkFrom(dbscanBatchOp.getSideOutput(0), inOp1);
		dbscanPredictBatchOp.print();

		DbscanPredictStreamOp dbscanPredictStreamOp = new DbscanPredictStreamOp(dbscanBatchOp.getSideOutput(0))
			.setPredictionCol("pred")
			.linkFrom(inOp2);
		dbscanPredictStreamOp.print();
		StreamOperator.execute();
	}
}

运行结果

训练结果

id type pred
id_4 CORE 1
id_8 CORE 1
id_2 CORE 0
id_6 CORE 1
id_16 NOISE -2147483648
id_7 CORE 1
id_12 CORE 0
id_5 CORE 1
id_1 CORE 0
id_3 CORE 1
id_9 CORE 0
id_10 CORE 0
id_11 CORE 0

批式预测结果

id vec pred
id_1 2.0,3.0 0
id_2 2.1,3.1 0
id_3 200.1,300.1 1
id_4 200.2,300.2 1
id_5 200.3,300.3 1
id_6 200.4,300.4 1
id_7 200.5,300.5 1
id_8 200.6,300.6 1
id_9 2.1,3.1 0
id_10 2.1,3.1 0
id_11 2.1,3.1 0
id_12 2.1,3.1 0
id_16 300.,3.2 -2147483648

流式预测结果

id vec pred
id_11 2.1,3.1 0
id_1 2.0,3.0 0
id_16 300.,3.2 -2147483648
id_12 2.1,3.1 0
id_6 200.4,300.4 1
id_3 200.1,300.1 1
id_7 200.5,300.5 1
id_9 2.1,3.1 0
id_2 2.1,3.1 0
id_10 2.1,3.1 0
id_4 200.2,300.2 1
id_5 200.3,300.3 1
id_8 200.6,300.6 1

备注

资源如何预估

  • 每个worker的内存大小如何估计?
    • 将模型的大小乘以30,即假设输入数据的大小是1GB,那么每个worker的大小可以设置为30GB。
  • 如何设置worker的个数?
    • 一般情况下,随着worker数目的增加,由于通信开销的存在,分布式训练任务会先变快,然后变慢。用户如果观测到worker数目增加之后,速度变慢,那么应该停止增加worker数目。