Java 类名:com.alibaba.alink.operator.stream.sql.WindowGroupByStreamOp
Python 类名:WindowGroupByStreamOp
对流式数据按窗口做聚合计算。用户可指定窗口的类型、长度,然后通过sql聚合函数对窗口内的数据进行聚合运算,每个窗口输出一条计算结果。若有其它附加的key列,则在每个<窗口,key>内进行聚合运算,输出一条结果。
名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 |
---|---|---|---|---|---|---|
selectClause | select语句 | select语句 | String | ✓ | ||
sessionGap | session间隔长度 | session间隔长度 | Integer | ✓ | ||
slidingLength | 滑动窗口滑动长度 | 滑动窗口滑动长度 | Integer | ✓ | ||
windowLength | 窗口长度 | 窗口长度 | Integer | ✓ | ||
groupByClause | groupby语句 | groupby语句 | String | null | ||
intervalUnit | 时间长度单位 | 时间长度单位 | String | “SECOND”, “MINUTE”, “HOUR”, “DAY”, “MONTH” | “SECOND” | |
windowType | 窗口类型 | 窗口类型 | String | “TUMBLE”, “HOP”, “SESSION” | “TUMBLE” |
SQL语法 | 描述 |
---|---|
COUNT(value [, value]* ) | 返回值不为null 的输入行数。 |
COUNT(*) | 返回输入行数。 |
AVG(value) | 返回所有输入值的数值的平均值(算术平均值)。 |
SUM(value) | 返回所有输入值的数字总和。 |
MAX(value) | 返回的最大值值在所有的输入值。 |
MIN(value) | 返回的最小值的值在所有的输入值。 |
STDDEV_POP(value) | 返回所有输入值的数字字段的总体标准偏差。 |
STDDEV_SAMP(value) | 返回所有输入值的数字字段的样本标准偏差。 |
VAR_POP(value) | 返回所有输入值中数字字段的总体方差(总体标准差的平方)。 |
VAR_SAMP(value) | 返回所有输入值的数值字段的样本方差(样本标准差的平方)。 |
CONCAT_AGG(value, sep) | sep是分隔符,用指定的spearator做分隔符,连接value中的值。 |
不支持 count distinct
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html
<!–
from pyalink.alink import * import pandas as pd useLocalEnv(1) df = pd.DataFrame([ ['Ohio', 2000, 1.5], ['Ohio', 2001, 1.7], ['Ohio', 2002, 3.6], ['Nevada', 2001, 2.4], ['Nevada', 2002, 2.9], ['Nevada', 2003, 3.2] ]) stream_data = StreamOperator.fromDataframe(df, schemaStr='f1 string, f2 bigint, f3 double') op = WindowGroupByStreamOp() \ .setGroupByClause("f1") \ .setSelectClause("sum(f2) as f2, f1").setWindowLength(1) stream_data = stream_data.link(op) stream_data.print() StreamOperator.execute()
import org.apache.flink.types.Row; import com.alibaba.alink.operator.stream.StreamOperator; import com.alibaba.alink.operator.stream.source.MemSourceStreamOp; import com.alibaba.alink.operator.stream.sql.WindowGroupByStreamOp; import org.junit.Test; import java.util.Arrays; import java.util.List; public class WindowGroupByStreamOpTest { @Test public void testWindowGroupByStreamOp() throws Exception { List <Row> df = Arrays.asList( Row.of("Ohio", 2000, 1.5), Row.of("Ohio", 2001, 1.7), Row.of("Ohio", 2002, 3.6), Row.of("Nevada", 2001, 2.4), Row.of("Nevada", 2002, 2.9), Row.of("Nevada", 2003, 3.2) ); StreamOperator <?> stream_data = new MemSourceStreamOp(df, "f1 string, f2 int, f3 double"); StreamOperator <?> op = new WindowGroupByStreamOp() .setGroupByClause("f1") .setSelectClause("sum(f2) as f2, f1") .setWindowLength(1); stream_data = stream_data.link(op); stream_data.print(); StreamOperator.execute(); } }
f2 | f1 | window_start | window_end |
---|