A: 资源不足,减少申请的资源(worker数和内存)。
A: 由于flink managed memory不够大,并且单条record太大(例如图操作中的热点问题)。可尝试增
memory。
A:这个报错是因为SQL解析的时候遇到了保留字符串 “comment”,SQL的保留字符串包括:action
add aggregate all alter after and as asc avg avg_row_length auto_increment between bigint bit
binary 等
A:类型转换失败,SparseVector不能转换成数值型,这种情况一般是上下游组件对应的数据类型不匹配导
致,建议检查数据的Schema
A:没有连数据导出Op。
A:这个警告意思是:使用 flink run来运行 PyAlink 作业时,需要手动把 Alink 的 jar 包拷贝到 PyFlink 的
lib 目录下。如果不拷贝,在少数情况下,作业运行会报错。
A:在高版本的 Python 中,这种用法被废弃了,请用其他 UDTF 的定义方式。
A:目前Alink大部分组件还只支持append-only模式。
A:在部署Alink到Flink clt时,需要将Alink的jar包拷贝到Flink clt的lib目录下。
A:这是waring, 不影响运行。安装blas之后就不会报了。
A:前面运行中可能有失败的情况,在 Jupyter Notebook 中需要重启 kernel。
A:这个功能要求运行 Flink 作业的机器能连到执行 PyAlink 脚本的机器。如果不能的话,就不能用
streamOp 的 print() 操作。
A:参考https://www.yuque.com/pinshu/alink_guide/czg4cx下载对应的插件,并设置pluginVersion参数为下载的版本。
A:运行期间堆内存溢出了,可能原因:
a) 用户内存设置太小,导致作业运行期间出错
b) 用户算法参数设置不合理,类似gbdt 算法树的深度设置太深等
c) 用户的数据有问题,类似对一个浮点型的数据做OneHot,导致模型巨大,其实这种情形应该
是做分桶.
遇到这种问题,首先加大内存试试能够跑过;如果还是出错建议检查算法参数设置是否合理以及数据是否存
在问题
A:需要安装scala插件
org/apache/flink/shaded/curator4/com/google/common/collect/ImmutableList,见
https://github.com/alibaba/Alink/issues/175。
A:这是 PyFlink 从旧版本升级到 1.13.0 时会出现的问题,重新安装 PyFlink 即可:
pip uninstall apache-flink apache-flink-libraries pip install -U apache-flink==1.13.0 apache-flink-libraries==1.13.0。
注意如果系统里有多个 Python 版本的话,你可能需要使用 pip3
代替 pip
。
A:跟了下代码的jar遍历,发现他这个jar加载的目录比较奇怪,最终输出是这个路径
C:\Users\janso\AppData\Roaming\Python\Python37\site-packages\py4j\../../../py4j-
java/py4j0.10.8.1.jar2、这边把jar放到C:\Users\janso\AppData\Roaming\Python\Python37\py4j-
java/py4j0.10.8.1.jar这个位置就可以了。
A:需要使用Alink自带的Source函数(CatalogSourceBatchOp等),或者使用Flink old table planer生成
一个Batch的Flink Table。
A:下载kafka插件,参考文档https://www.yuque.com/pinshu/alink_guide/czg4cx
A:使用CatalogSourceBatchOp, CatalogSinkBatchOp
A:使用CatalogSourceBatchOp, CatalogSinkBatchOp
A:ak source/sink + hdfs file system
A:HadoopFileSystem构造函数上可以指定hadoop的配置路径,使用hadoop配置文件指定多namenode即
可。
A:从1.3.1版本起始,去掉了Kafka011SourceStreamOp,引入了Plugin机制,支持多版本,通过
KafkaSourceStreamOp提供服务,参考文档
https://www.yuque.com/pinshu/alink_doc/6388a63ed03cc4ad3a40046c8e081e06
A:用CSV数据源, 其中路径格式是hdfs:///...
A:参考https://www.yuque.com/pinshu/alink_guide/czg4cx下载对应的插件,并设置pluginVersion参数为下载的版本。
A:目前批式写入Mysql还不支持Insert,后续会补充。
A:通过AlinkGlobalConfigure类可以指定。
A:可能的原因有:没有安装插件,参考https://www.yuque.com/pinshu/alink_guide/czg4cx进行安装。
Python中没有useEnv,调用useLocalEnv或者useRemoteEnv即可。
A:搭flink集群时flink-conf.yml 加上class loader.resolve-order:parent-first 然后把kafka以及alink-core,alink的kafka,flink的kafka相关jar包放到lib后启动,就正常啦。
A:在高版本不会遇到这个问题。也可以自行修改插件。修改方法:下载源码,在connectors/connector-jdbc/connector-jdbc-mysq 目录下找到 MySqlCatalog.java 这个文件,在 genMySqlJdbcUrl 方法的最后一行加上 stringBuilder.append("?characterEncoding=utf8"); 然后用编译出的jar包替换一下原来的插件里的jar包。
A:Pipeline的Save和Load方法
//PipelineModel是Pipeline fit出来的 PipelineModel model = ....; //保存模型 model.save("/Users/yangxu/alink/temp/sentiment_hotel_model.ak"); //加载模型 PipelineModel loadedModel = PipelineModel.load("/temp/sentiment_hotel_model.ak");
A:可以的。Alink 里模型跟数据一样,可以接 sink 组件保存出去,然后接 source 组件读进来。
A:需要明确传进来一个HadoopFileSystem,来指定文件系统
A:首先调用PipelineModel中的save方法,将模型保存到一个文件路径下,然后调用LocalPredictor的构造
函数中的文件路径,将模型载入到LocalPredictor中,即可使用LocalPredictor进行单机预测。
A:不要用new ,load是PipelineModel的static方法,直接 PipelineModel model =
PipelineModel.load(filedir)
A:在1.4版本中已修复这个问题,更新到1.4版本以上即可。
A:使用LocalPredictor, 参考文档 https://www.yuque.com/pinshu/alink_guide/zo1y6q
A: 可能是数据集较大,造成磁盘空间已满。建议加大worker数。
cluster数目设置了5000个,debug的时候发现,KmeansParallel产生的中心点个数不到5000个,甚至
为0?
A: 两种解决方式:(1)cluster数目设置的小一点,比如10 (2)距离度量换成Euclid。
A:fit之后,进行transform。具体看pipeline文档。
A: 这是vector的格式, $6$是指vector维度是6,详情看vector(向量)的介绍
A:任务没有执行,需要加StreamOperator.execute()
A:DataStream不可以直接转换成DataSet,训练流式数据使用online learning。
A:走的是和flink的web来管理任务,本质上Alink任务是一个Flink任务。
A:是必须的,在intellij中使用代码需要配置对应的scala sdk。
A:应该是提交机器和jobmaster的网络不通,需要排查对应网络问题。
A:目前使用1.8版本,其他版本未测试。
A:可能存在下载数据网络不通的情况,请查看本机是否可以下载对应数据。
A:不能。Alink 所有的 BatchOp 都可以通过 collectToDataFrame 得到 pandas 的 DataFrame,之后可以
使用 pandas 进行操作。但 BatchOp 本身是不支持的。
全了,用系统 ak 数据的 print,显示不全,都是省略号。
A:PyAlink 里的所有 BatchOp 都可能通过 collectToDataFrame 得到一个 pandas DataFrame。显示不全
是 pandas DataFrame 自身的显示问题,可以在代码最前面加这两句,然后重新运行:
import pandas as pd pd.set_option('display.max_colwidth', None)
A:可以直接 sink 出去,比如接个 CsvSinkBatchOp;数据量小的话,也可以直接 collect 回来,在
PyAlink 里可以 collectToDataFrame。
A:对于流式作业来说,需要调 StreamOperator.execute() 才能执行。
入给向量近似最近领训练中lsh组件么?还是中间接个其他组件送到lsh中?
A:可以直接输入。
tfidf_TrainBatch-->lsh_TrainBatch,在训练到lsh会报错,不能这样接是么?tfidf_TrainBatch--
>tfidf_PredictBatch--〉lsh_TrainBatch这种是可以的,就是说必须训练tfidf并用其预测后才能训练
lsh。
A:是的,需要先接预测后才能接 lsh。
A:目前Alink中任务,在detach模式下,可以跑纯流的任务或者纯批的任务。如果要跑批流混合的任务,需
要使用交互模式。
A:MLEnvironmentFactory可以registerMLEnvironment,有两种方式
1) 直接new一个
2) setDefault, 这样不用维护id
A:1. 如果是单独的批模型,可以通过两种方式加载,第一种是直接在内存中,批预测linkFrom批模型和预测数据即可。第二种如果有保存模型多次预测的需求,则可以将批模型link一个sink组件保存到一个可以持久化的地方,然后使用Source组件,从这个持久化的地方读入,然后再linkFrom批模型和预测数据即可。使用流预测单独的批模型,和这两种方式类似,不赘述。
2. 如果是一个时间相关的批模型(比如10分钟产出一个模型),同时流预测中需要在批模型更新时,同步生效,则可以考虑使用模型流。