Alink教程(Python版)

第11章 构造新特征

本章包括下面各节:
11.1 数据探索
11.2 思路
11.2.1 用户和品牌的各种特征
11.2.2 二分类模型训练
11.3 计算训练集
11.3.1 原始数据划分
11.3.2 计算特征
11.3.3 计算标签
11.4 正负样本配比
11.5 决策树
11.6 集成学习
11.6.1 Bootstrap aggregating
11.6.2 Boosting
11.6.3 随机森林与GBDT
11.7 使用随机森林算法
11.8 使用GBDT算法

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。

from pyalink.alink import *
useLocalEnv(1)

from utils import *
import os
import pandas as pd

DATA_DIR = ROOT_DIR + "tmall" + os.sep

ORIGIN_FILE = "action_log.csv";

FEATURE_LABEL_FILE = "feature_label.ak";
TRAIN_FILE = "train.ak";
TEST_FILE = "test.ak";
TRAIN_SAMPLE_FILE = "train_sample.ak";

LABEL_COL_NAME = "label";
PREDICTION_COL_NAME = "pred";
PRED_DETAIL_COL_NAME = "predInfo";

def getSource(): 
    return CsvSourceBatchOp()\
            .setFilePath(DATA_DIR + ORIGIN_FILE)\
            .setSchemaStr("user_id long, brand_id long, type int, ts timestamp")\
            .setFieldDelimiter("\t");
#c_1
source = getSource();

source.lazyPrint(10, "origin file");

source.lazyPrintStatistics("stat of origin file");

source.link(
    CorrelationBatchOp()\
        .setSelectedCols(["user_id", "brand_id", "type"])\
        .lazyPrintCorrelation()
);

source.select("min(ts) AS min_ts, max(ts) AS max_ts").lazyPrint(-1);

source.groupBy("type", "type, COUNT(*) AS cnt").lazyPrint(-1);

BatchOperator.execute()
#c_3
source = getSource();

t1 = source.filter("ts < CAST('2014-07-16 00:00:00' AS TIMESTAMP)");
t2 = source.filter("ts >= CAST('2014-07-16 00:00:00' AS TIMESTAMP)");

t1.lazyPrint(10, "t1").lazyPrintStatistics("t1");

t2.lazyPrint(10, "t2").lazyPrintStatistics("t2");

BatchOperator.execute()

clausePreProc = "user_id, brand_id, type, ts, past_days,"\
+ "case when type=0 then 1 else 0 end AS is_click,"\
+ "case when type=1 then 1 else 0 end AS is_buy,"\
+ "case when type=2 then 1 else 0 end AS is_collect,"\
+ "case when type=3 then 1 else 0 end AS is_cart,"\
+ "case when type=0 and past_days<=30 then 1 else 0 end AS is_click_1m,"\
+ "case when type=1 and past_days<=30 then 1 else 0 end AS is_buy_1m,"\
+ "case when type=2 and past_days<=30 then 1 else 0 end AS is_collect_1m,"\
+ "case when type=3 and past_days<=30 then 1 else 0 end AS is_cart_1m,"\
+ "case when type=0 and past_days<=60 then 1 else 0 end AS is_click_2m,"\
+ "case when type=1 and past_days<=60 then 1 else 0 end AS is_buy_2m,"\
+ "case when type=2 and past_days<=60 then 1 else 0 end AS is_collect_2m,"\
+ "case when type=3 and past_days<=60 then 1 else 0 end AS is_cart_2m,"\
+ "case when type=0 and past_days<=90 then 1 else 0 end AS is_click_3m,"\
+ "case when type=1 and past_days<=90 then 1 else 0 end AS is_buy_3m,"\
+ "case when type=2 and past_days<=90 then 1 else 0 end AS is_collect_3m,"\
+ "case when type=3 and past_days<=90 then 1 else 0 end AS is_cart_3m,"\
+ "case when type=0 and past_days>30 and past_days<=60 then 1 else 0 end AS is_click_m2nd,"\
+ "case when type=1 and past_days>30 and past_days<=60 then 1 else 0 end AS is_buy_m2nd,"\
+ "case when type=2 and past_days>30 and past_days<=60 then 1 else 0 end AS is_collect_m2nd,"\
+ "case when type=3 and past_days>30 and past_days<=60 then 1 else 0 end AS is_cart_m2nd,"\
+ "case when type=0 and past_days>60 and past_days<=90 then 1 else 0 end AS is_click_m3th,"\
+ "case when type=1 and past_days>60 and past_days<=90 then 1 else 0 end AS is_buy_m3th,"\
+ "case when type=2 and past_days>60 and past_days<=90 then 1 else 0 end AS is_collect_m3th,"\
+ "case when type=3 and past_days>60 and past_days<=90 then 1 else 0 end AS is_cart_m3th,"\
+ "case when type=0 and past_days<=3 then 1 else 0 end AS is_click_3d,"\
+ "case when type=1 and past_days<=3 then 1 else 0 end AS is_buy_3d,"\
+ "case when type=2 and past_days<=3 then 1 else 0 end AS is_collect_3d,"\
+ "case when type=3 and past_days<=3 then 1 else 0 end AS is_cart_3d,"\
+ "case when type=0 and past_days>3 and past_days<=6 then 1 else 0 end AS is_click_3d2nd,"\
+ "case when type=1 and past_days>3 and past_days<=6 then 1 else 0 end AS is_buy_3d2nd,"\
+ "case when type=2 and past_days>3 and past_days<=6 then 1 else 0 end AS is_collect_3d2nd,"\
+ "case when type=3 and past_days>3 and past_days<=6 then 1 else 0 end AS is_cart_3d2nd,"\
+ "case when type=0 and past_days>6 and past_days<=9 then 1 else 0 end AS is_click_3d3th,"\
+ "case when type=1 and past_days>6 and past_days<=9 then 1 else 0 end AS is_buy_3d3th,"\
+ "case when type=2 and past_days>6 and past_days<=9 then 1 else 0 end AS is_collect_3d3th,"\
+ "case when type=3 and past_days>6 and past_days<=9 then 1 else 0 end AS is_cart_3d3th,"\
+ "case when type=0 and past_days<=7 then 1 else 0 end AS is_click_1w,"\
+ "case when type=1 and past_days<=7 then 1 else 0 end AS is_buy_1w,"\
+ "case when type=2 and past_days<=7 then 1 else 0 end AS is_collect_1w,"\
+ "case when type=3 and past_days<=7 then 1 else 0 end AS is_cart_1w,"\
+ "case when type=0 and past_days>7 and past_days<=14 then 1 else 0 end AS is_click_w2nd,"\
+ "case when type=1 and past_days>7 and past_days<=14 then 1 else 0 end AS is_buy_w2nd,"\
+ "case when type=2 and past_days>7 and past_days<=14 then 1 else 0 end AS is_collect_w2nd,"\
+ "case when type=3 and past_days>7 and past_days<=14 then 1 else 0 end AS is_cart_w2nd,"\
+ "case when type=0 and past_days>14 and past_days<=21 then 1 else 0 end AS is_click_w3th,"\
+ "case when type=1 and past_days>14 and past_days<=21 then 1 else 0 end AS is_buy_w3th,"\
+ "case when type=2 and past_days>14 and past_days<=21 then 1 else 0 end AS is_collect_w3th,"\
+ "case when type=3 and past_days>14 and past_days<=21 then 1 else 0 end AS is_cart_w3th";

t1_preproc = t1\
    .select("user_id, brand_id, type, ts, "
            + "TIMESTAMPDIFF(DAY, ts, TIMESTAMP '2014-07-16 00:00:00') AS past_days")\
    .select(clausePreProc);

clauseUserBrand = "user_id, brand_id, SUM(is_click) as cnt_click, SUM(is_buy) as cnt_buy, "\
+ "SUM(is_collect) as cnt_collect, SUM(is_cart) as cnt_cart, "\
+ "SUM(is_click_1m) as cnt_click_1m, SUM(is_buy_1m) as cnt_buy_1m, "\
+ "SUM(is_collect_1m) as cnt_collect_1m, SUM(is_cart_1m) as cnt_cart_1m, "\
+ "SUM(is_click_2m) as cnt_click_2m, SUM(is_buy_2m) as cnt_buy_2m, "\
+ "SUM(is_collect_2m) as cnt_collect_2m, SUM(is_cart_2m) as cnt_cart_2m, "\
+ "SUM(is_click_3m) as cnt_click_3m, SUM(is_buy_3m) as cnt_buy_3m, "\
+ "SUM(is_collect_3m) as cnt_collect_3m, SUM(is_cart_3m) as cnt_cart_3m, "\
+ "SUM(is_click_m2nd) as cnt_click_m2nd, SUM(is_buy_m2nd) as cnt_buy_m2nd, "\
+ "SUM(is_collect_m2nd) as cnt_collect_m2nd, SUM(is_cart_m2nd) as cnt_cart_m2nd, "\
+ "SUM(is_click_m3th) as cnt_click_m3th, SUM(is_buy_m3th) as cnt_buy_m3th, "\
+ "SUM(is_collect_m3th) as cnt_collect_m3th, SUM(is_cart_m3th) as cnt_cart_m3th, "\
+ "SUM(is_click_3d) as cnt_click_3d, SUM(is_buy_3d) as cnt_buy_3d, "\
+ "SUM(is_collect_3d) as cnt_collect_3d, SUM(is_cart_3d) as cnt_cart_3d, "\
+ "SUM(is_click_3d2nd) as cnt_click_3d2nd, SUM(is_buy_3d2nd) as cnt_buy_3d2nd, "\
+ "SUM(is_collect_3d2nd) as cnt_collect_3d2nd, SUM(is_cart_3d2nd) as cnt_cart_3d2nd, "\
+ "SUM(is_click_3d3th) as cnt_click_3d3th, SUM(is_buy_3d3th) as cnt_buy_3d3th, "\
+ "SUM(is_collect_3d3th) as cnt_collect_3d3th, SUM(is_cart_3d3th) as cnt_cart_3d3th, "\
+ "SUM(is_click_1w) as cnt_click_1w, SUM(is_buy_1w) as cnt_buy_1w, "\
+ "SUM(is_collect_1w) as cnt_collect_1w, SUM(is_cart_1w) as cnt_cart_1w, "\
+ "SUM(is_click_w2nd) as cnt_click_w2nd, SUM(is_buy_w2nd) as cnt_buy_w2nd, "\
+ "SUM(is_collect_w2nd) as cnt_collect_w2nd, SUM(is_cart_w2nd) as cnt_cart_w2nd, "\
+ "SUM(is_click_w3th) as cnt_click_w3th, SUM(is_buy_w3th) as cnt_buy_w3th, "\
+ "SUM(is_collect_w3th) as cnt_collect_w3th, SUM(is_cart_w3th) as cnt_cart_w3th";

t1_userbrand = t1_preproc.groupBy("user_id, brand_id", clauseUserBrand);

clauseUserBrand_Rate = "user_id,brand_id,"\
+ "cnt_click,cnt_buy,cnt_collect,cnt_cart,"\
+ "cnt_click_1m,cnt_buy_1m,cnt_collect_1m,cnt_cart_1m,"\
+ "cnt_click_2m,cnt_buy_2m,cnt_collect_2m,cnt_cart_2m,"\
+ "cnt_click_3m,cnt_buy_3m,cnt_collect_3m,cnt_cart_3m,"\
+ "cnt_click_m2nd,cnt_buy_m2nd,cnt_collect_m2nd,cnt_cart_m2nd,"\
+ "cnt_click_m3th,cnt_buy_m3th,cnt_collect_m3th,cnt_cart_m3th,"\
+ "cnt_click_3d,cnt_buy_3d,cnt_collect_3d,cnt_cart_3d,"\
+ "cnt_click_3d2nd,cnt_buy_3d2nd,cnt_collect_3d2nd,cnt_cart_3d2nd,"\
+ "cnt_click_3d3th,cnt_buy_3d3th,cnt_collect_3d3th,cnt_cart_3d3th,"\
+ "cnt_click_1w,cnt_buy_1w,cnt_collect_1w,cnt_cart_1w,"\
+ "cnt_click_w2nd,cnt_buy_w2nd,cnt_collect_w2nd,cnt_cart_w2nd,"\
+ "cnt_click_w3th,cnt_buy_w3th,cnt_collect_w3th,cnt_cart_w3th,"\
+ "case when cnt_buy>cnt_click then 1.0 when cnt_buy=0 then 0.0 else cnt_buy*1.0/cnt_click end AS "\
+ "rt_click2buy,"\
+ "case when cnt_buy>cnt_collect then 1.0 when cnt_buy=0 then 0.0 else cnt_buy*1.0/cnt_collect end AS "\
+ "rt_collect2buy,"\
+ "case when cnt_buy>cnt_cart then 1.0 when cnt_buy=0 then 0.0 else cnt_buy*1.0/cnt_cart end AS "\
+ "rt_cart2buy,"\
+ "case when cnt_buy_3d>cnt_click_3d then 1.0 when cnt_buy_3d=0 then 0.0 else cnt_buy_3d*1.0/cnt_click_3d"\
+ " end AS rt_click2buy_3d,"\
+ "case when cnt_buy_3d>cnt_collect_3d then 1.0 when cnt_buy_3d=0 then 0.0 else "\
+ "cnt_buy_3d*1.0/cnt_collect_3d end AS rt_collect2buy_3d,"\
+ "case when cnt_buy_3d>cnt_cart_3d then 1.0 when cnt_buy_3d=0 then 0.0 else cnt_buy_3d*1.0/cnt_cart_3d "\
+ "end AS rt_cart2buy_3d,"\
+ "case when cnt_buy_1w>cnt_click_1w then 1.0 when cnt_buy_1w=0 then 0.0 else cnt_buy_1w*1.0/cnt_click_1w"\
+ " end AS rt_click2buy_1w,"\
+ "case when cnt_buy_1w>cnt_collect_1w then 1.0 when cnt_buy_1w=0 then 0.0 else "\
+ "cnt_buy_1w*1.0/cnt_collect_1w end AS rt_collect2buy_1w,"\
+ "case when cnt_buy_1w>cnt_cart_1w then 1.0 when cnt_buy_1w=0 then 0.0 else cnt_buy_1w*1.0/cnt_cart_1w "\
+ "end AS rt_cart2buy_1w,"\
+ "case when cnt_buy_1m>cnt_click_1m then 1.0 when cnt_buy_1m=0 then 0.0 else cnt_buy_1m*1.0/cnt_click_1m"\
+ " end AS rt_click2buy_1m,"\
+ "case when cnt_buy_1m>cnt_collect_1m then 1.0 when cnt_buy_1m=0 then 0.0 else "\
+ "cnt_buy_1m*1.0/cnt_collect_1m end AS rt_collect2buy_1m,"\
+ "case when cnt_buy_1m>cnt_cart_1m then 1.0 when cnt_buy_1m=0 then 0.0 else cnt_buy_1m*1.0/cnt_cart_1m "\
+ "end AS rt_cart2buy_1m,"\
+ "case when cnt_click_3d=0 then 0.0 when cnt_click_3d>=120.0*cnt_click_3d2nd then 120.0 else "\
+ "cnt_click_3d*1.0/cnt_click_3d2nd end AS rt_click_3d,"\
+ "case when cnt_buy_3d=0 then 0.0 when cnt_buy_3d>=10.0*cnt_buy_3d2nd then 10.0 else "\
+ "cnt_buy_3d*1.0/cnt_buy_3d2nd end AS rt_buy_3d,"\
+ "case when cnt_collect_3d=0 then 0.0 when cnt_collect_3d>=10.0*cnt_collect_3d2nd then 10.0 else "\
+ "cnt_collect_3d*1.0/cnt_collect_3d2nd end AS rt_collect_3d,"\
+ "case when cnt_cart_3d=0 then 0.0 when cnt_cart_3d>=20.0*cnt_cart_3d2nd then 20.0 else "\
+ "cnt_cart_3d*1.0/cnt_cart_3d2nd end AS rt_cart_3d,"\
+ "case when cnt_click_1w=0 then 0.0 when cnt_click_1w>=300.0*cnt_click_w2nd then 300.0 else "\
+ "cnt_click_1w*1.0/cnt_click_w2nd end AS rt_click_1w,"\
+ "case when cnt_buy_1w=0 then 0.0 when cnt_buy_1w>=15.0*cnt_buy_w2nd then 15.0 else "\
+ "cnt_buy_1w*1.0/cnt_buy_w2nd end AS rt_buy_1w,"\
+ "case when cnt_collect_1w=0 then 0.0 when cnt_collect_1w>=15.0*cnt_collect_w2nd then 15.0 else "\
+ "cnt_collect_1w*1.0/cnt_collect_w2nd end AS rt_collect_1w,"\
+ "case when cnt_cart_1w=0 then 0.0 when cnt_cart_1w>=40.0*cnt_cart_w2nd then 40.0 else "\
+ "cnt_cart_1w*1.0/cnt_cart_w2nd end AS rt_cart_1w,"\
+ "case when cnt_click_1m=0 then 0.0 when cnt_click_1m>=500.0*cnt_click_m2nd then 500.0 else "\
+ "cnt_click_1m*1.0/cnt_click_m2nd end AS rt_click_1m,"\
+ "case when cnt_buy_1m=0 then 0.0 when cnt_buy_1m>=30.0*cnt_buy_m2nd then 30.0 else "\
+ "cnt_buy_1m*1.0/cnt_buy_m2nd end AS rt_buy_1m,"\
+ "case when cnt_collect_1m=0 then 0.0 when cnt_collect_1m>=30.0*cnt_collect_m2nd then 30.0 else "\
+ "cnt_collect_1m*1.0/cnt_collect_m2nd end AS rt_collect_1m,"\
+ "case when cnt_cart_1m=0 then 0.0 when cnt_cart_1m>=50.0*cnt_cart_m2nd then 50.0 else "\
+ "cnt_cart_1m*1.0/cnt_cart_m2nd end AS rt_cart_1m";

t1_userbrand = t1_userbrand.select(clauseUserBrand_Rate);

clauseUser = "user_id, "\
+ "SUM(is_click) as user_cnt_click, SUM(is_buy) as user_cnt_buy, "\
+ "SUM(is_collect) as user_cnt_collect, SUM(is_cart) as user_cnt_cart, "\
+ "SUM(is_click_1m) as user_cnt_click_1m, SUM(is_buy_1m) as user_cnt_buy_1m, "\
+ "SUM(is_collect_1m) as user_cnt_collect_1m, SUM(is_cart_1m) as user_cnt_cart_1m, "\
+ "SUM(is_click_2m) as user_cnt_click_2m, SUM(is_buy_2m) as user_cnt_buy_2m, "\
+ "SUM(is_collect_2m) as user_cnt_collect_2m, SUM(is_cart_2m) as user_cnt_cart_2m, "\
+ "SUM(is_click_3m) as user_cnt_click_3m, SUM(is_buy_3m) as user_cnt_buy_3m, "\
+ "SUM(is_collect_3m) as user_cnt_collect_3m, SUM(is_cart_3m) as user_cnt_cart_3m, "\
+ "SUM(is_click_m2nd) as user_cnt_click_m2nd, SUM(is_buy_m2nd) as user_cnt_buy_m2nd, "\
+ "SUM(is_collect_m2nd) as user_cnt_collect_m2nd, SUM(is_cart_m2nd) as user_cnt_cart_m2nd, "\
+ "SUM(is_click_m3th) as user_cnt_click_m3th, SUM(is_buy_m3th) as user_cnt_buy_m3th, "\
+ "SUM(is_collect_m3th) as user_cnt_collect_m3th, SUM(is_cart_m3th) as user_cnt_cart_m3th, "\
+ "SUM(is_click_3d) as user_cnt_click_3d, SUM(is_buy_3d) as user_cnt_buy_3d, "\
+ "SUM(is_collect_3d) as user_cnt_collect_3d, SUM(is_cart_3d) as user_cnt_cart_3d, "\
+ "SUM(is_click_3d2nd) as user_cnt_click_3d2nd, SUM(is_buy_3d2nd) as user_cnt_buy_3d2nd, "\
+ "SUM(is_collect_3d2nd) as user_cnt_collect_3d2nd, SUM(is_cart_3d2nd) as user_cnt_cart_3d2nd, "\
+ "SUM(is_click_3d3th) as user_cnt_click_3d3th, SUM(is_buy_3d3th) as user_cnt_buy_3d3th, "\
+ "SUM(is_collect_3d3th) as user_cnt_collect_3d3th, SUM(is_cart_3d3th) as user_cnt_cart_3d3th, "\
+ "SUM(is_click_1w) as user_cnt_click_1w, SUM(is_buy_1w) as user_cnt_buy_1w, "\
+ "SUM(is_collect_1w) as user_cnt_collect_1w, SUM(is_cart_1w) as user_cnt_cart_1w, "\
+ "SUM(is_click_w2nd) as user_cnt_click_w2nd, SUM(is_buy_w2nd) as user_cnt_buy_w2nd, "\
+ "SUM(is_collect_w2nd) as user_cnt_collect_w2nd, SUM(is_cart_w2nd) as user_cnt_cart_w2nd, "\
+ "SUM(is_click_w3th) as user_cnt_click_w3th, SUM(is_buy_w3th) as user_cnt_buy_w3th, "\
+ "SUM(is_collect_w3th) as user_cnt_collect_w3th, SUM(is_cart_w3th) as user_cnt_cart_w3th";

t1_user = t1_preproc.groupBy("user_id", clauseUser);

clauseUser_Rate = "user_id AS user_id4join,"\
+ "user_cnt_click,user_cnt_buy,user_cnt_collect,user_cnt_cart,"\
+ "user_cnt_click_1m,user_cnt_buy_1m,user_cnt_collect_1m,user_cnt_cart_1m,"\
+ "user_cnt_click_2m,user_cnt_buy_2m,user_cnt_collect_2m,user_cnt_cart_2m,"\
+ "user_cnt_click_3m,user_cnt_buy_3m,user_cnt_collect_3m,user_cnt_cart_3m,"\
+ "user_cnt_click_m2nd,user_cnt_buy_m2nd,user_cnt_collect_m2nd,user_cnt_cart_m2nd,"\
+ "user_cnt_click_m3th,user_cnt_buy_m3th,user_cnt_collect_m3th,user_cnt_cart_m3th,"\
+ "user_cnt_click_3d,user_cnt_buy_3d,user_cnt_collect_3d,user_cnt_cart_3d,"\
+ "user_cnt_click_3d2nd,user_cnt_buy_3d2nd,user_cnt_collect_3d2nd,user_cnt_cart_3d2nd,"\
+ "user_cnt_click_3d3th,user_cnt_buy_3d3th,user_cnt_collect_3d3th,user_cnt_cart_3d3th,"\
+ "user_cnt_click_1w,user_cnt_buy_1w,user_cnt_collect_1w,user_cnt_cart_1w,"\
+ "user_cnt_click_w2nd,user_cnt_buy_w2nd,user_cnt_collect_w2nd,user_cnt_cart_w2nd,"\
+ "user_cnt_click_w3th,user_cnt_buy_w3th,user_cnt_collect_w3th,user_cnt_cart_w3th,"\
+ "case when user_cnt_buy>user_cnt_click then 1.0 when user_cnt_buy=0 then 0.0 else "\
+ "user_cnt_buy*1.0/user_cnt_click end AS user_rt_click2buy,"\
+ "case when user_cnt_buy>user_cnt_collect then 1.0 when user_cnt_buy=0 then 0.0 else "\
+ "user_cnt_buy*1.0/user_cnt_collect end AS user_rt_collect2buy,"\
+ "case when user_cnt_buy>user_cnt_cart then 1.0 when user_cnt_buy=0 then 0.0 else "\
+ "user_cnt_buy*1.0/user_cnt_cart end AS user_rt_cart2buy,"\
+ "case when user_cnt_buy_3d>user_cnt_click_3d then 1.0 when user_cnt_buy_3d=0 then 0.0 else "\
+ "user_cnt_buy_3d*1.0/user_cnt_click_3d end AS user_rt_click2buy_3d,"\
+ "case when user_cnt_buy_3d>user_cnt_collect_3d then 1.0 when user_cnt_buy_3d=0 then 0.0 else "\
+ "user_cnt_buy_3d*1.0/user_cnt_collect_3d end AS user_rt_collect2buy_3d,"\
+ "case when user_cnt_buy_3d>user_cnt_cart_3d then 1.0 when user_cnt_buy_3d=0 then 0.0 else "\
+ "user_cnt_buy_3d*1.0/user_cnt_cart_3d end AS user_rt_cart2buy_3d,"\
+ "case when user_cnt_buy_1w>user_cnt_click_1w then 1.0 when user_cnt_buy_1w=0 then 0.0 else "\
+ "user_cnt_buy_1w*1.0/user_cnt_click_1w end AS user_rt_click2buy_1w,"\
+ "case when user_cnt_buy_1w>user_cnt_collect_1w then 1.0 when user_cnt_buy_1w=0 then 0.0 else "\
+ "user_cnt_buy_1w*1.0/user_cnt_collect_1w end AS user_rt_collect2buy_1w,"\
+ "case when user_cnt_buy_1w>user_cnt_cart_1w then 1.0 when user_cnt_buy_1w=0 then 0.0 else "\
+ "user_cnt_buy_1w*1.0/user_cnt_cart_1w end AS user_rt_cart2buy_1w,"\
+ "case when user_cnt_buy_1m>user_cnt_click_1m then 1.0 when user_cnt_buy_1m=0 then 0.0 else "\
+ "user_cnt_buy_1m*1.0/user_cnt_click_1m end AS user_rt_click2buy_1m,"\
+ "case when user_cnt_buy_1m>user_cnt_collect_1m then 1.0 when user_cnt_buy_1m=0 then 0.0 else "\
+ "user_cnt_buy_1m*1.0/user_cnt_collect_1m end AS user_rt_collect2buy_1m,"\
+ "case when user_cnt_buy_1m>user_cnt_cart_1m then 1.0 when user_cnt_buy_1m=0 then 0.0 else "\
+ "user_cnt_buy_1m*1.0/user_cnt_cart_1m end AS user_rt_cart2buy_1m";

t1_user = t1_user.select(clauseUser_Rate);

clauseBrand = "brand_id, "\
+ "SUM(is_click) as brand_cnt_click, SUM(is_buy) as brand_cnt_buy, "\
+ "SUM(is_collect) as brand_cnt_collect, SUM(is_cart) as brand_cnt_cart, "\
+ "SUM(is_click_1m) as brand_cnt_click_1m, SUM(is_buy_1m) as brand_cnt_buy_1m, "\
+ "SUM(is_collect_1m) as brand_cnt_collect_1m, SUM(is_cart_1m) as brand_cnt_cart_1m, "\
+ "SUM(is_click_2m) as brand_cnt_click_2m, SUM(is_buy_2m) as brand_cnt_buy_2m, "\
+ "SUM(is_collect_2m) as brand_cnt_collect_2m, SUM(is_cart_2m) as brand_cnt_cart_2m, "\
+ "SUM(is_click_3m) as brand_cnt_click_3m, SUM(is_buy_3m) as brand_cnt_buy_3m, "\
+ "SUM(is_collect_3m) as brand_cnt_collect_3m, SUM(is_cart_3m) as brand_cnt_cart_3m, "\
+ "SUM(is_click_m2nd) as brand_cnt_click_m2nd, SUM(is_buy_m2nd) as brand_cnt_buy_m2nd, "\
+ "SUM(is_collect_m2nd) as brand_cnt_collect_m2nd, SUM(is_cart_m2nd) as brand_cnt_cart_m2nd, "\
+ "SUM(is_click_m3th) as brand_cnt_click_m3th, SUM(is_buy_m3th) as brand_cnt_buy_m3th, "\
+ "SUM(is_collect_m3th) as brand_cnt_collect_m3th, SUM(is_cart_m3th) as brand_cnt_cart_m3th, "\
+ "SUM(is_click_3d) as brand_cnt_click_3d, SUM(is_buy_3d) as brand_cnt_buy_3d, "\
+ "SUM(is_collect_3d) as brand_cnt_collect_3d, SUM(is_cart_3d) as brand_cnt_cart_3d, "\
+ "SUM(is_click_3d2nd) as brand_cnt_click_3d2nd, SUM(is_buy_3d2nd) as brand_cnt_buy_3d2nd, "\
+ "SUM(is_collect_3d2nd) as brand_cnt_collect_3d2nd, SUM(is_cart_3d2nd) as brand_cnt_cart_3d2nd, "\
+ "SUM(is_click_3d3th) as brand_cnt_click_3d3th, SUM(is_buy_3d3th) as brand_cnt_buy_3d3th, "\
+ "SUM(is_collect_3d3th) as brand_cnt_collect_3d3th, SUM(is_cart_3d3th) as brand_cnt_cart_3d3th, "\
+ "SUM(is_click_1w) as brand_cnt_click_1w, SUM(is_buy_1w) as brand_cnt_buy_1w, "\
+ "SUM(is_collect_1w) as brand_cnt_collect_1w, SUM(is_cart_1w) as brand_cnt_cart_1w, "\
+ "SUM(is_click_w2nd) as brand_cnt_click_w2nd, SUM(is_buy_w2nd) as brand_cnt_buy_w2nd, "\
+ "SUM(is_collect_w2nd) as brand_cnt_collect_w2nd, SUM(is_cart_w2nd) as brand_cnt_cart_w2nd, "\
+ "SUM(is_click_w3th) as brand_cnt_click_w3th, SUM(is_buy_w3th) as brand_cnt_buy_w3th, "\
+ "SUM(is_collect_w3th) as brand_cnt_collect_w3th, SUM(is_cart_w3th) as brand_cnt_cart_w3th";

t1_brand = t1_preproc.groupBy("brand_id", clauseBrand);

clauseBrand_Rate = "brand_id AS brand_id4join,"\
+ "brand_cnt_click,brand_cnt_buy,brand_cnt_collect,brand_cnt_cart,"\
+ "brand_cnt_click_1m,brand_cnt_buy_1m,brand_cnt_collect_1m,brand_cnt_cart_1m,"\
+ "brand_cnt_click_2m,brand_cnt_buy_2m,brand_cnt_collect_2m,brand_cnt_cart_2m,"\
+ "brand_cnt_click_3m,brand_cnt_buy_3m,brand_cnt_collect_3m,brand_cnt_cart_3m,"\
+ "brand_cnt_click_m2nd,brand_cnt_buy_m2nd,brand_cnt_collect_m2nd,brand_cnt_cart_m2nd,"\
+ "brand_cnt_click_m3th,brand_cnt_buy_m3th,brand_cnt_collect_m3th,brand_cnt_cart_m3th,"\
+ "brand_cnt_click_3d,brand_cnt_buy_3d,brand_cnt_collect_3d,brand_cnt_cart_3d,"\
+ "brand_cnt_click_3d2nd,brand_cnt_buy_3d2nd,brand_cnt_collect_3d2nd,brand_cnt_cart_3d2nd,"\
+ "brand_cnt_click_3d3th,brand_cnt_buy_3d3th,brand_cnt_collect_3d3th,brand_cnt_cart_3d3th,"\
+ "brand_cnt_click_1w,brand_cnt_buy_1w,brand_cnt_collect_1w,brand_cnt_cart_1w,"\
+ "brand_cnt_click_w2nd,brand_cnt_buy_w2nd,brand_cnt_collect_w2nd,brand_cnt_cart_w2nd,"\
+ "brand_cnt_click_w3th,brand_cnt_buy_w3th,brand_cnt_collect_w3th,brand_cnt_cart_w3th,"\
+ "case when brand_cnt_buy>brand_cnt_click then 1.0 when brand_cnt_buy=0 then 0.0 else "\
+ "brand_cnt_buy*1.0/brand_cnt_click end AS brand_rt_click2buy,"\
+ "case when brand_cnt_buy>brand_cnt_collect then 1.0 when brand_cnt_buy=0 then 0.0 else "\
+ "brand_cnt_buy*1.0/brand_cnt_collect end AS brand_rt_collect2buy,"\
+ "case when brand_cnt_buy>brand_cnt_cart then 1.0 when brand_cnt_buy=0 then 0.0 else "\
+ "brand_cnt_buy*1.0/brand_cnt_cart end AS brand_rt_cart2buy,"\
+ "case when brand_cnt_buy_3d>brand_cnt_click_3d then 1.0 when brand_cnt_buy_3d=0 then 0.0 else "\
+ "brand_cnt_buy_3d*1.0/brand_cnt_click_3d end AS brand_rt_click2buy_3d,"\
+ "case when brand_cnt_buy_3d>brand_cnt_collect_3d then 1.0 when brand_cnt_buy_3d=0 then 0.0 else "\
+ "brand_cnt_buy_3d*1.0/brand_cnt_collect_3d end AS brand_rt_collect2buy_3d,"\
+ "case when brand_cnt_buy_3d>brand_cnt_cart_3d then 1.0 when brand_cnt_buy_3d=0 then 0.0 else "\
+ "brand_cnt_buy_3d*1.0/brand_cnt_cart_3d end AS brand_rt_cart2buy_3d,"\
+ "case when brand_cnt_buy_1w>brand_cnt_click_1w then 1.0 when brand_cnt_buy_1w=0 then 0.0 else "\
+ "brand_cnt_buy_1w*1.0/brand_cnt_click_1w end AS brand_rt_click2buy_1w,"\
+ "case when brand_cnt_buy_1w>brand_cnt_collect_1w then 1.0 when brand_cnt_buy_1w=0 then 0.0 else "\
+ "brand_cnt_buy_1w*1.0/brand_cnt_collect_1w end AS brand_rt_collect2buy_1w,"\
+ "case when brand_cnt_buy_1w>brand_cnt_cart_1w then 1.0 when brand_cnt_buy_1w=0 then 0.0 else "\
+ "brand_cnt_buy_1w*1.0/brand_cnt_cart_1w end AS brand_cart2buy_1w,"\
+ "case when brand_cnt_buy_1m>brand_cnt_click_1m then 1.0 when brand_cnt_buy_1m=0 then 0.0 else "\
+ "brand_cnt_buy_1m*1.0/brand_cnt_click_1m end AS brand_rt_click2buy_1m,"\
+ "case when brand_cnt_buy_1m>brand_cnt_collect_1m then 1.0 when brand_cnt_buy_1m=0 then 0.0 else "\
+ "brand_cnt_buy_1m*1.0/brand_cnt_collect_1m end AS brand_rt_collect2buy_1m,"\
+ "case when brand_cnt_buy_1m>brand_cnt_cart_1m then 1.0 when brand_cnt_buy_1m=0 then 0.0 else "\
+ "brand_cnt_buy_1m*1.0/brand_cnt_cart_1m end AS brand_rt_cart2buy_1m";

t1_brand = t1_brand.select(clauseBrand_Rate);

t1_join = JoinBatchOp()\
    .setSelectClause("*")\
    .setJoinPredicate("user_id=user_id4join")\
    .linkFrom(t1_userbrand, t1_user);

t1_join = JoinBatchOp()\
    .setSelectClause("*")\
    .setJoinPredicate("brand_id=brand_id4join")\
    .linkFrom(t1_join, t1_brand);

t2_label = t2\
    .filter("type=1")\
    .select("user_id AS user_id4label, brand_id AS brand_id4label, 1 as label")\
    .distinct();

feature_label = LeftOuterJoinBatchOp()\
    .setSelectClause("*")\
    .setJoinPredicate("user_id = user_id4label AND brand_id = brand_id4label")\
    .linkFrom(t1_join, t2_label);

imputer = Imputer()\
    .setStrategy("value")\
    .setFillValue("0")\
    .setSelectedCols(["label"]);

feature_label = imputer.fit(feature_label).transform(feature_label);

print(feature_label.getColNames());

featureColNames = feature_label.getColNames()
for col_name in ["user_id", "brand_id","user_id4join", "brand_id4join",
                 "user_id4label", "brand_id4label",LABEL_COL_NAME] :
    featureColNames.remove(col_name)

str = ''
for t in featureColNames :
    str = str + "CAST(" + t + " AS DOUBLE) AS " + t + ", "
str = str + LABEL_COL_NAME

feature_label\
    .select(str)\
    .link(
        AkSinkBatchOp()\
            .setFilePath(DATA_DIR + FEATURE_LABEL_FILE)\
            .setOverwriteSink(True)
    );

BatchOperator.execute()
#c_4_1
all_data = AkSourceBatchOp().setFilePath(DATA_DIR + FEATURE_LABEL_FILE);

all_data\
    .lazyPrintStatistics()\
    .groupBy("label", "label, COUNT(*) AS cnt")\
    .print();

splitTrainTestIfNotExist(all_data, DATA_DIR + TRAIN_FILE, DATA_DIR + TEST_FILE, 0.8);

train_data = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE);
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE);

featureColNames = train_data.getColNames()
featureColNames.remove(LABEL_COL_NAME)

LogisticRegression()\
    .setFeatureCols(featureColNames)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
    .fit(train_data)\
    .transform(test_data)\
    .link(
        EvalBinaryClassBatchOp()\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics("LogisticRegression")
    );
BatchOperator.execute();

if not(os.path.exists(DATA_DIR + TRAIN_SAMPLE_FILE)) :
    train_data\
        .link(
            StratifiedSampleBatchOp()\
                .setStrataRatios("0:0.05,1:1.0")\
                .setStrataCol(LABEL_COL_NAME)
        )\
        .link(
            AkSinkBatchOp()\
                .setFilePath(DATA_DIR + TRAIN_SAMPLE_FILE)
        );
    BatchOperator.execute();


train_sample = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_SAMPLE_FILE);

LogisticRegression()\
    .setFeatureCols(featureColNames)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
    .fit(train_sample)\
    .transform(test_data)\
    .link(
        EvalBinaryClassBatchOp()\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics("LogisticRegression with Stratified Sample")
    );
BatchOperator.execute();
#c_4_2
train_sample = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_SAMPLE_FILE);
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE);

featureColNames = test_data.getColNames()
featureColNames.remove(LABEL_COL_NAME)

for treeType in ['GINI', 'INFOGAIN', 'INFOGAINRATIO'] :
    DecisionTreeClassifier()\
        .setTreeType(treeType)\
        .setFeatureCols(featureColNames)\
        .setLabelCol(LABEL_COL_NAME)\
        .setPredictionCol(PREDICTION_COL_NAME)\
        .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
        .fit(train_sample)\
        .transform(test_data)\
        .link(
            EvalBinaryClassBatchOp()\
                .setPositiveLabelValueString("1")\
                .setLabelCol(LABEL_COL_NAME)\
                .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
                .lazyPrintMetrics(treeType)
        );

BatchOperator.execute()

RandomForestClassifier()\
    .setNumTrees(20)\
    .setMaxDepth(4)\
    .setMaxBins(512)\
    .setFeatureCols(featureColNames)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
    .fit(train_sample)\
    .transform(test_data)\
    .link(
        EvalBinaryClassBatchOp()\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics("RandomForest with Stratified Sample")
    );

BatchOperator.execute();
#c_4_3
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE);
train_sample = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_SAMPLE_FILE);

featureColNames = test_data.getColNames()
featureColNames.remove(LABEL_COL_NAME)

GbdtClassifier()\
    .setNumTrees(100)\
    .setMaxDepth(5)\
    .setMaxBins(256)\
    .setFeatureCols(featureColNames)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
    .fit(train_sample)\
    .transform(test_data)\
    .link(
        EvalBinaryClassBatchOp()\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics("GBDT with Stratified Sample")
    );

BatchOperator.execute();