Alink教程(Java版)
Alink教程(Python版)

常见问题汇总

运行报错


  1. Q:Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate enough slots to run the job. Please make sure that the cluster has enough resources.

A: 资源不足,减少申请的资源(worker数和内存)。


  1. Q: Thread 'SortMerger Reading Thread' terminated due to an exception: The record exceeds the maximum size of a sort buffer (current maximum: 72351744 bytes)

A: 由于flink managed memory不够大,并且单条record太大(例如图操作中的热点问题)。可尝试增

memory。


  1. Q:SqlParserException : SQL parse failed. Encountered “comment” at line 1, column 2353.


A:这个报错是因为SQL解析的时候遇到了保留字符串 “comment”,SQL的保留字符串包括:action

add aggregate all alter after and as asc avg avg_row_length auto_increment between bigint bit

binary 等


  1. Q:com.alibaba.alink.common.linalg.SparseVector cannot be cast to java.lang.Number


A:类型转换失败,SparseVector不能转换成数值型,这种情况一般是上下游组件对应的数据类型不匹配导

致,建议检查数据的Schema


  1. Q: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.


A:没有连数据导出Op。

  1. Q:"Warning: it seems you're running the script using 'flink run'. In this case, you have to manually add alink jars to pyflink lib path." 请问一下,Alink 在 yarn 集群执行时这个警告需要解决吗?


A:这个警告意思是:使用 flink run来运行 PyAlink 作业时,需要手动把 Alink 的 jar 包拷贝到 PyFlink 的

lib 目录下。如果不拷贝,在少数情况下,作业运行会报错。

  1. Q:在执行 PyAlink 的 udtfs lambda funtion 执行报错:SyntaxError: 'yield' inside list comprehension。f_udtf4 = udtf(lambda *args: [ (yield index, arg) for index, arg in enumerate(args) ], input_types=[DataTypes.DOUBLE(), DataTypes.DOUBLE()], result_types=[DataTypes.INT(), DataTypes.DOUBLE()])

A:在高版本的 Python 中,这种用法被废弃了,请用其他 UDTF 的定义方式。


  1. Q:Table is not an append-only table.

A:目前Alink大部分组件还只支持append-only模式。


  1. Q:Unable to instantiate java compiler.

A:在部署Alink到Flink clt时,需要将Alink的jar包拷贝到Flink clt的lib目录下。

  1. Q: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS

A:这是waring, 不影响运行。安装blas之后就不会报了。


  1. Q:AttributeError: 'NoneType' object has no attribute 'close'

A:前面运行中可能有失败的情况,在 Jupyter Notebook 中需要重启 kernel。


  1. Q:使用 StreamOp 的 print() 功能时,报错 java.io.IOException: Cannot connect to socket server at xxxx:xxx。


A:这个功能要求运行 Flink 作业的机器能连到执行 PyAlink 脚本的机器。如果不能的话,就不能用

streamOp 的 print() 操作。


  1. Q:Not have parameter: pluginVersion

A:参考https://www.yuque.com/pinshu/alink_guide/czg4cx下载对应的插件,并设置pluginVersion参数为下载的版本。


  1. Q:java.lang.OutOfMemoryError: Java heap space

A:运行期间堆内存溢出了,可能原因:

a) 用户内存设置太小,导致作业运行期间出错

b) 用户算法参数设置不合理,类似gbdt 算法树的深度设置太深等

c) 用户的数据有问题,类似对一个浮点型的数据做OneHot,导致模型巨大,其实这种情形应该

是做分桶.

遇到这种问题,首先加大内存试试能够跑过;如果还是出错建议检查算法参数设置是否合理以及数据是否存

在问题

  1. Q:java报 linearSolver找不到

A:需要安装scala插件


  1. Q:PyAlink 包从 1.4.0 以前的版本升级到 1.4.0 以后的版本,作业运行报错: java.lang.NoClassDefFoundError:

org/apache/flink/shaded/curator4/com/google/common/collect/ImmutableList,见

https://github.com/alibaba/Alink/issues/175


A:这是 PyFlink 从旧版本升级到 1.13.0 时会出现的问题,重新安装 PyFlink 即可:

pip uninstall apache-flink apache-flink-libraries
pip install -U apache-flink==1.13.0 apache-flink-libraries==1.13.0。

注意如果系统里有多个 Python 版本的话,你可能需要使用 pip3 代替 pip


  1. Q:alink Python的实例,提示 py4j.protocol.Py4JError: Could not find py4j jar at py4j已经下载的,是哪里没有设置好?

A:跟了下代码的jar遍历,发现他这个jar加载的目录比较奇怪,最终输出是这个路径

C:\Users\janso\AppData\Roaming\Python\Python37\site-packages\py4j\../../../py4j-

java/py4j0.10.8.1.jar2、这边把jar放到C:\Users\janso\AppData\Roaming\Python\Python37\py4j-

java/py4j0.10.8.1.jar这个位置就可以了。


  1. Q:OnlyBatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment?

A:需要使用Alink自带的Source函数(CatalogSourceBatchOp等),或者使用Flink old table planer生成

一个Batch的Flink Table。


数据源与数据导出


  1. Q:调用KafkaSourceStreamOp的时候时报 "Java.lang.ClassNotFoundException:com.alibaba.alink.common.io.kafka.plugin.KafkaSourceSinkInPluginFactory"


A:下载kafka插件,参考文档https://www.yuque.com/pinshu/alink_guide/czg4cx

  1. Q:Hive如何调用

A:使用CatalogSourceBatchOp, CatalogSinkBatchOp

  1. Q:MySql如何调用

A:使用CatalogSourceBatchOp, CatalogSinkBatchOp


  1. Q:alink里面能直接将hadoop里面的数据读出来进行批处理吗?

A:ak source/sink + hdfs file system

  1. Q:咨HadoopFileSystem怎么配置高可用?

A:HadoopFileSystem构造函数上可以指定hadoop的配置路径,使用hadoop配置文件指定多namenode即

可。

  1. Q: Kafka011SourceStreamOp为什么访问不到了?

A:从1.3.1版本起始,去掉了Kafka011SourceStreamOp,引入了Plugin机制,支持多版本,通过

KafkaSourceStreamOp提供服务,参考文档

https://www.yuque.com/pinshu/alink_doc/6388a63ed03cc4ad3a40046c8e081e06

  1. Q: pyalink怎么读取hdfs数据

A:用CSV数据源, 其中路径格式是hdfs:///...

  1. Q: 报错"Not have parameter: pluginVersion"

A:参考https://www.yuque.com/pinshu/alink_guide/czg4cx下载对应的插件,并设置pluginVersion参数为下载的版本。

  1. Q: Mysql流式写入已存在表可以,批式写报错是什么原因?

A:目前批式写入Mysql还不支持Insert,后续会补充。

  1. Q: 代码依赖的kafka、mysql 的包,提交到集群时有没有命令指定包的路径?

A:通过AlinkGlobalConfigure类可以指定。

  1. Q:在使用Kafka时,报错"An error occurred while calling o13.linkFrom.: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: com.alibaba.alink.common.io.kafka.plugin.KafkaSourceSinkInPluginFactory"

A:可能的原因有:没有安装插件,参考https://www.yuque.com/pinshu/alink_guide/czg4cx进行安装。

Python中没有useEnv,调用useLocalEnv或者useRemoteEnv即可。

  1. Q:从kafka拿到数据,预测结果再放到kafka,本地跑alink程序没有问题。带依赖打包,放到flink集群去跑,总提示类似ClassNotFound异常

A:搭flink集群时flink-conf.yml 加上class loader.resolve-order:parent-first 然后把kafka以及alink-core,alink的kafka,flink的kafka相关jar包放到lib后启动,就正常啦。

  1. Q:用Mysql 5.x 插件写出时,中文变成乱码,用Alink再读进来依旧是乱码

A:在高版本不会遇到这个问题。也可以自行修改插件。修改方法:下载源码,在connectors/connector-jdbc/connector-jdbc-mysq 目录下找到 MySqlCatalog.java 这个文件,在 genMySqlJdbcUrl 方法的最后一行加上 stringBuilder.append("?characterEncoding=utf8"); 然后用编译出的jar包替换一下原来的插件里的jar包。


模型

  1. Q:Alink怎样保存模型和加载模型

A:Pipeline的Save和Load方法

//PipelineModel是Pipeline fit出来的 
PipelineModel model = ....; 

//保存模型 
model.save("/Users/yangxu/alink/temp/sentiment_hotel_model.ak"); 

//加载模型 
PipelineModel loadedModel = PipelineModel.load("/temp/sentiment_hotel_model.ak");


  1. Q:PyAlink训练好的 tfidf 模型能保存么?想以后就直接预测,不用重新训练,不然训练太费时间。

A:可以的。Alink 里模型跟数据一样,可以接 sink 组件保存出去,然后接 source 组件读进来。


  1. Q:之前在1.1.0版本中java可以直接将pipeline的model.save()到一个hdfs://路径,但是升级到最新版本后,报错There are not file system matched the hdfs:// , Maybe that set the filesystem of hdfs in file path's constructor will be better.请问需要怎么修改?

A:需要明确传进来一个HadoopFileSystem,来指定文件系统


  1. Q:对于脱离Flink环境的Pipeline模型预测应该怎么做?

A:首先调用PipelineModel中的save方法,将模型保存到一个文件路径下,然后调用LocalPredictor的构造

函数中的文件路径,将模型载入到LocalPredictor中,即可使用LocalPredictor进行单机预测。


  1. Q: 用new PipelineModel 报错:PipelineModel is empty. 要怎么加载模型,进行预测?

A:不要用new ,load是PipelineModel的static方法,直接 PipelineModel model =

PipelineModel.load(filedir)

  1. Q:PipelineModel.load()报NPE错误?

A:在1.4版本中已修复这个问题,更新到1.4版本以上即可。


其他


  1. Q:如何部署在线服务

A:使用LocalPredictor, 参考文档 https://www.yuque.com/pinshu/alink_guide/zo1y6q


  1. Q: TF on Alink no device space left

A: 可能是数据集较大,造成磁盘空间已满。建议加大worker数。


  1. Q: Kmeans使用KmeansParallel初始化,出现array index out of bound -1。用户采用了cosine距离,

cluster数目设置了5000个,debug的时候发现,KmeansParallel产生的中心点个数不到5000个,甚至

为0?

A: 两种解决方式:(1)cluster数目设置的小一点,比如10 (2)距离度量换成Euclid。


  1. Q:fit之后,如何获取预测结果?

A:fit之后,进行transform。具体看pipeline文档。


  1. Q: 输出结果中有 $6$1:0.3 5:0.4, 是什么格式?

A: 这是vector的格式, $6$是指vector维度是6,详情看vector(向量)的介绍


  1. Q:stream任务数据没有输出,只输出列名?

A:任务没有执行,需要加StreamOperator.execute()


  1. Q:有的算法(比如Word2vec )需要dataset 数据集,而从kafka获取的数据是流式数据,怎么转换?

A:DataStream不可以直接转换成DataSet,训练流式数据使用online learning。


  1. Q:pyalink 有web管理端口没有?

A:走的是和flink的web来管理任务,本质上Alink任务是一个Flink任务。


  1. Q:Alink代码中scala这部分代码必须要嘛?

A:是必须的,在intellij中使用代码需要配置对应的scala sdk。


  1. Q:提交到yarn的任务在collect结果的时候,只有在提交机器和jobmaster是同一个节点的时候才collect成功,不在一个节点就会失败。

A:应该是提交机器和jobmaster的网络不通,需要排查对应网络问题。


  1. Q:Alink使用Java哪个版本?

A:目前使用1.8版本,其他版本未测试。


  1. Q:Alink Demo中的实验一直运行?

A:可能存在下载数据网络不通的情况,请查看本机是否可以下载对应数据。


  1. Q:请问 PyAlink 支持 Python 3.8 吗?
    A:支持。如果安装不成功,绝大部分原因是一些依赖(pyarrow、pandas等)在所在的系统上没有预编译好的 whl 包,此时 pip会下载源码在本地进行编译。强烈建议没有这类经验的同学,切换到其他 Python 版 本或者使用 anaconda。
  2. Q:想要卸载 PyAlink 然后重新安装时,发现卸载需要等待特别长的时间。
    A:PyAlink 的 plugin 机制默认会将 plugin 文件存放到 pyalink 安装目录下。如果之前有使用过 python_env 的 plugin,那在 pip uninstall 时就会扫描目录下所有文件,导致需要等待特别长的时间。
    临时解决方法:把 plugin 从 pyalink/lib 目录下移动到同分区(注意:不同分区间移动可能需要更长的时间)的其他路径下,然后卸载重装后再移动回来。长期解决办法:在使用 PyAlink 时,先调用 AlinkGlobalConfiguration.setPluginDir 将 plugin 目录设置到另外的路径下,这样会自己将 plugin 文件下载到其他目录;已经下载的文件可以移动到新路径下。



  1. Q:ak 数据就是 pandas 数据格式,可以用操作 pandas 的函数处理 ak 数据可以么?

A:不能。Alink 所有的 BatchOp 都可以通过 collectToDataFrame 得到 pandas 的 DataFrame,之后可以

使用 pandas 进行操作。但 BatchOp 本身是不支持的。


  1. Q:PyAlink 计算出的数据类型比如 ak 数据,可以转换成 Python 中的 list 或者 np 么?主要我想把数据看

全了,用系统 ak 数据的 print,显示不全,都是省略号。


A:PyAlink 里的所有 BatchOp 都可能通过 collectToDataFrame 得到一个 pandas DataFrame。显示不全

是 pandas DataFrame 自身的显示问题,可以在代码最前面加这两句,然后重新运行:

import pandas as pd pd.set_option('display.max_colwidth', None)


  1. Q:有谁知道怎样获取预测结果 不是打印结果那种?

A:可以直接 sink 出去,比如接个 CsvSinkBatchOp;数据量小的话,也可以直接 collect 回来,在

PyAlink 里可以 collectToDataFrame。


  1. Q:流式作业为什么调用了 print() 后没有输出?

A:对于流式作业来说,需要调 StreamOperator.execute() 才能执行。


  1. Q:请教个问题,tfidf组件输出数据经过collectDataframes是这种稀疏的kv特征,这种稀疏特征可以直接输

入给向量近似最近领训练中lsh组件么?还是中间接个其他组件送到lsh中?

A:可以直接输入。


  1. Q:同时训练 tfidf 和 lsh 两个模型,既用 tfidf 提特征,lsh 对特征进行相似度比较。训练时候组件连接

tfidf_TrainBatch-->lsh_TrainBatch,在训练到lsh会报错,不能这样接是么?tfidf_TrainBatch--

>tfidf_PredictBatch--〉lsh_TrainBatch这种是可以的,就是说必须训练tfidf并用其预测后才能训练

lsh。

A:是的,需要先接预测后才能接 lsh。


  1. Q:如果集群试detach 模式,跑ftrl 的demo 会对任务有啥影响?是会报错还是会导致流的任务起不来?有啥标志性的日志体现出来吗?


A:目前Alink中任务,在detach模式下,可以跑纯流的任务或者纯批的任务。如果要跑批流混合的任务,需

要使用交互模式。


  1. Q:如何在MLEnvironmentFactory获取streamTableEnvironment过程中设置EnvieonmentSettings呢?

A:MLEnvironmentFactory可以registerMLEnvironment,有两种方式

1) 直接new一个

2) setDefault, 这样不用维护id


  1. Q:如果我有一个批处理模型,就不能加载批处理模型,然后再进行流模型得更新么?


A:1. 如果是单独的批模型,可以通过两种方式加载,第一种是直接在内存中,批预测linkFrom批模型和预测数据即可。第二种如果有保存模型多次预测的需求,则可以将批模型link一个sink组件保存到一个可以持久化的地方,然后使用Source组件,从这个持久化的地方读入,然后再linkFrom批模型和预测数据即可。使用流预测单独的批模型,和这两种方式类似,不赘述。

2. 如果是一个时间相关的批模型(比如10分钟产出一个模型),同时流预测中需要在批模型更新时,同步生效,则可以考虑使用模型流。