Using streaming data and ensemble classifier for incremental learning

Imagine you have a predictive analytics problem and data is accumulating real-time. You want to train a model for decision making, but not sure if you are capturing changing trends in the data. This is a concept sometimes referred to as trend adaptation or concept drift. Alternatively, you may have a fairly big data set but do not have computational power/memory to fit all the data into your model and you believe a representative sampling approach might be biased.

How are you going to use streaming data to continuously train a model to reduce bias? Some models (such as decision trees) would need to see entire data set in once, so they don't provide a productive solution. Fortunately, more flexible and adapdative machine learning algorithms exist, including Naive Bayes and Gradient Descent. These fairly simple algorithms can be extremely useful when dealing with streaming data, especially when the data can be reduced into few dimensions. They generalize well.

In this example, I would like to demonstrate this approach using a binary classification problem, where the entire data set contains 200 million training samples. You can find more information about this data set and feature engineering pipeline here.

We will follow the following approach:

  1. Train first tier classifiers using a small (~1 million examples) training set.
  2. Establish the out-of the-box-performance of these classifiers.
  3. Pick a few best performing 1st tier classifiers, get their predictions. This will reduce the dimension of the data significantly.
  4. Ensemble a second tier classifier using the predictions of first tier classifiers.
  5. Stream entire training set in small batches through steps 4 and 5, in order to perform incremental (online) learning, and monitor performance of two distinct ensemble classifiers.

Training first tier classifiers

Logistic Regression Classifier

In [14]:
# Load sample training set
import pickle
from sklearn.metrics import roc_auc_score

with open("X_train_balanced_trans_pl2.pkl","rb") as f:
    X_train_balanced_trans_pl2 = pickle.load(f) 

with open("y_train_balanced.pkl", "rb") as f:
    y_train_balanced = pickle.load(f) 
In [ ]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
import datetime

start = datetime.datetime.now()

logbal = LogisticRegression(verbose=10, n_jobs=3, C= 3.0589,penalty= "l1")

logbal.fit(X_train_balanced_trans_pl2, y_train_balanced)

end2 = datetime.datetime.now()
process_time = start - end2
print("Trained model, it took: " + str((process_time.seconds)/60) + " minutes.")

start = datetime.datetime.now()

diskname = ".."

# Test performance using the validation sets
# Load validation sets previously prepared


# Features:
with open("X_val1_trans_pl2.pkl","rb") as f:
    X_val1_trans_pl2 = pickle.load(f)
with open("X_val2_trans_pl2.pkl","rb") as f:
    X_val2_trans_pl2 = pickle.load(f) 
    
# Target labels:    
with open("y_val1.pkl","rb") as f:
    y_val1= pickle.load(f)
with open("y_val2.pkl","rb") as f:
    y_val2= pickle.load(f)
    
# Make predictions and calculate average valdation roc score 
# calculate out-of-the-box roc_score using validation set 1
probs = logbal.predict_proba(X_val1_trans_pl2)
probs = probs[:,1]
print("Val1 ROC score: " +str(roc_auc_score(y_val1,probs)))
       
# calculate out-of-the-box roc_score using validation set 2
probs = logbal.predict_proba(X_val2_trans_pl2)
probs = probs[:,1]
print("Val2 ROC score: " +str(roc_auc_score(y_val2,probs))) 


# Save the final classifier
with open((diskname + str("log_final.pkl")), "wb") as f:
    pickle.dump(logbal,f)
    
print("Saved final logistic regression classifier.")    

Support Vector Machines Classifier

In [9]:
import datetime
start = datetime.datetime.now()

from sklearn.svm import LinearSVC
from sklearn.calibration import CalibratedClassifierCV
# Note that we can't get probabilities directly from this LinearSVC function
# We need to wrap into Calibrated Classifier 
# (see: https://stackoverflow.com/questions/35212213/sklearn-how-to-get-decision-probabilities-for-linearsvc-classifier)

lsvcbal = LinearSVC(verbose=10, C = 0.0564)

cal_lsvcbal = CalibratedClassifierCV(base_estimator = lsvcbal,
                                  cv = 3, # Also performs cross-validation
                                  method= "sigmoid") # We use sigmoid function to get probabilities

cal_lsvcbal.fit(X_train_balanced_trans_pl2,y_train_balanced)

end = datetime.datetime.now()
process_time = end - start
print("It took: " + str(process_time.seconds/60) + " minutes.")


# Make predictions and calculate average valdation roc score 
# calculate out-of-the-box roc_score using validation set 1
probs = cal_lsvcbal.predict_proba(X_val1_trans_pl2)
probs = probs[:,1]
print("Val1 ROC score: " +str(roc_auc_score(y_val1,probs)))
       
# calculate out-of-the-box roc_score using validation set 2
probs = cal_lsvcbal.predict_proba(X_val2_trans_pl2)
probs = probs[:,1]
print("Val2 ROC score: " +str(roc_auc_score(y_val2,probs)))


# Save the final classifier
with open((diskname + str("svm_final.pkl")), "wb") as f:
    pickle.dump(cal_lsvcbal,f)
    
print("Saved final SVM regression classifier.") 
[LibLinear][LibLinear][LibLinear]It took: 0.4 minutes.
Val1 ROC score: 0.952513357043
Val2 ROC score: 0.952253716778
Saved final SVM regression classifier.

Random Forest Classifier

In [19]:
#let's try to get a small random sample from the class-balanced set to train the RF algorithm
import numpy as np
import random
random.seed(112)
rows = random.sample(list(range(0,X_train_balanced_trans_pl2.shape[0])),50000)
X_train_balanced_trans_pl2_sample = X_train_balanced_trans_pl2[rows,]
y_train_balanced_sample = y_train_balanced.iloc[rows]

print("Sampled a smaller training set.")

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
import datetime

start = datetime.datetime.now()

rf = RandomForestClassifier(n_estimators=1320,
                            min_samples_leaf= 2,
                            min_samples_split= 9,
                            max_features= 306,
                            max_depth= 297,n_jobs=3)

rf.fit(X_train_balanced_trans_pl2_sample, y_train_balanced_sample)

end2 = datetime.datetime.now()
process_time = start - end2
print("Trained model, it took: " + str((process_time.seconds)/60) + " minutes.")

start = datetime.datetime.now()

diskname = ".."

# Test performance using the validation sets
# Load validation sets previously prepared


# Features:
with open("X_val1_trans_pl2.pkl","rb") as f:
    X_val1_trans_pl2 = pickle.load(f)
with open("X_val2_trans_pl2.pkl","rb") as f:
    X_val2_trans_pl2 = pickle.load(f) 
    
# Target labels:    
with open("y_val1.pkl","rb") as f:
    y_val1= pickle.load(f)
with open("y_val2.pkl","rb") as f:
    y_val2= pickle.load(f)
    
# Make predictions and calculate average valdation roc score 
# calculate out-of-the-box roc_score using validation set 1
probs = rf.predict_proba(X_val1_trans_pl2)
probs = probs[:,1]
print("Val1 ROC score: " +str(roc_auc_score(y_val1,probs)))
       
# calculate out-of-the-box roc_score using validation set 2
probs = rf.predict_proba(X_val2_trans_pl2)
probs = probs[:,1]
print("Val2 ROC score: " +str(roc_auc_score(y_val2,probs))) 


# Save the final classifier
with open((diskname + str("rf_final.pkl")), "wb") as f:
    pickle.dump(rf,f)
    
print("Saved final random forest  classifier.")
Sampled a smaller training set.
Trained model, it took: 1436.2166666666667 minutes.
Val1 ROC score: 0.947543785832
Val2 ROC score: 0.945366327648
Saved final random forest  classifier.

Adaboost classifier

In [36]:
from sklearn.ensemble import AdaBoostClassifier
In [53]:
#let's try to get a small random sample from the class-balanced set to train the RF algorithm
import numpy as np
import random
random.seed(112)
rows = random.sample(list(range(0,X_train_balanced_trans_pl2.shape[0])),100000)
X_train_balanced_trans_pl2_sample = X_train_balanced_trans_pl2[rows,]
y_train_balanced_sample = y_train_balanced.iloc[rows]

print("Sampled a smaller training set.")

from sklearn.ensemble import AdaBoostClassifier
from sklearn.metrics import roc_auc_score
import datetime

start = datetime.datetime.now()

ab = AdaBoostClassifier(n_estimators=600, learning_rate= 0.2)

ab.fit(X_train_balanced_trans_pl2_sample, y_train_balanced_sample)

end2 = datetime.datetime.now()
process_time = start - end2
print("Trained model, it took: " + str((process_time.seconds)/60) + " minutes.")

start = datetime.datetime.now()

diskname = ".."

# Test performance using the validation sets
# Load validation sets previously prepared


# Features:
with open("X_val1_trans_pl2.pkl","rb") as f:
    X_val1_trans_pl2 = pickle.load(f)
with open("X_val2_trans_pl2.pkl","rb") as f:
    X_val2_trans_pl2 = pickle.load(f) 
    
# Target labels:    
with open("y_val1.pkl","rb") as f:
    y_val1= pickle.load(f)
with open("y_val2.pkl","rb") as f:
    y_val2= pickle.load(f)
    
# Make predictions and calculate average valdation roc score 
# calculate out-of-the-box roc_score using validation set 1
probs = ab.predict_proba(X_val1_trans_pl2)
probs = probs[:,1]
print("Val1 ROC score: " +str(roc_auc_score(y_val1,probs)))
       
# calculate out-of-the-box roc_score using validation set 2
probs = ab.predict_proba(X_val2_trans_pl2)
probs = probs[:,1]
print("Val2 ROC score: " +str(roc_auc_score(y_val2,probs))) 


# Save the final classifier
with open((diskname + str("ab_final.pkl")), "wb") as f:
    pickle.dump(ab,f)
    
print("Saved ada boost  classifier.")
Sampled a smaller training set.
Trained model, it took: 1435.4333333333334 minutes.
Val1 ROC score: 0.945943193181
Val2 ROC score: 0.946552900812
Saved ada boost  classifier.

Online (incremental) learning to train Ensemble Model using streaming data

In [65]:
# Define a function to stream and process mini-batches of training data
# We will avoid the first 3 million rows from the training set since this includes the validation set 
import pandas as pd
import datetime
import numpy as np
from sklearn.metrics import roc_auc_score

# Label text features
Text_features = ["app","device","os","channel"]

##############################################################
# Define utility function to parse and process text features
##############################################################
# Note we avoid lambda functions since they don't pickle when we want to save the pipeline later   
def column_text_processer_nolambda(df,text_columns = Text_features):
    import pandas as pd
    import numpy as np
    """"A function that will merge/join all text in a given row to make it ready for tokenization. 
    - This function should take care of converting missing values to empty strings. 
    - It should also convert the text to lowercase.
    df= pandas dataframe
    text_columns = names of the text features in df
    """ 
    # Select only non-text columns that are in the df
    text_data = df[text_columns]
    
    # Fill the missing values in text_data using empty strings
    text_data.fillna("",inplace=True)
    
    # Concatenate feature name to each category encoding for each row
    # E.g: encoding 3 at device column will read as device3 to make each encoding unique for a given feature
    for col_index in list(text_data.columns):
        text_data[col_index] = col_index + text_data[col_index].astype(str)
    
    # Join all the strings in a given row to make a vector
    # text_vector = text_data.apply(lambda x: " ".join(x), axis = 1)
    text_vector = []
    for index,rows in text_data.iterrows():
        text_item = " ".join(rows).lower()
        text_vector.append(text_item)

    # return text_vector as pd.Series object to enter the tokenization pipeline
    return pd.Series(text_vector)

#######################################################################
# Define custom processing functions to add the log_total_clicks and 
# log_total_click_time features, and remove the unwanted base features
#######################################################################
def column_time_processer(X_train):
    import pandas as pd
    import numpy as np

    # Convert click_time to datetime64 dtype 
    X_train.click_time = pd.to_datetime(X_train.click_time)

    # Calculate the log_total_clicks for each ip and add as a new feature to temp_data
    temp_data = pd.DataFrame(np.log(X_train.groupby(["ip"]).size()),
                                    columns = ["log_total_clicks"]).reset_index()


    # Calculate the log_total_click_time for each ip and add as a new feature to temp_data
    # First define a function to process selected ip group 
    def get_log_total_click_time(group):
        diff = (max(group.click_time) - min(group.click_time)).seconds
        return np.log(diff+1)

    # Then apply this function to each ip group and extract the total click time per ip group
    log_time_frame = pd.DataFrame(X_train.groupby(["ip"]).apply(get_log_total_click_time),
                                  columns=["log_total_click_time"]).reset_index()

    # Then add this new feature to the temp_data
    temp_data = pd.merge(temp_data,log_time_frame, how = "left",on = "ip")

    # Combine temp_data with X_train to maintain X_train key order
    temp_data = pd.merge(X_train,temp_data,how = "left",on = "ip")

    # Drop features that are not needed
    temp_data = temp_data[["log_total_clicks","log_total_click_time"]]

    # Return only the numeric features as a tensor to integrate into the numeric feature branch of the pipeline
    return temp_data


#############################################################################
# We need to wrap these custom utility functions using FunctionTransformer
from sklearn.preprocessing import FunctionTransformer
# FunctionTransformer wrapper of utility functions to parse text and numeric features
# Note how we avoid putting any arguments into column_text_processer or column_time_processer
#############################################################################
get_numeric_data = FunctionTransformer(func = column_time_processer, validate=False) 
get_text_data = FunctionTransformer(func = column_text_processer_nolambda,validate=False) 

#############################################################################
# Create the token pattern: TOKENS_ALPHANUMERIC
# #Note this regex will match either a whitespace or a punctuation to tokenize 
# the string vector on these preferences, in our case we only have white spaces in our text  
#############################################################################
TOKENS_ALPHANUMERIC = '[A-Za-z0-9]+(?=\\s+)'

# Re-load the userclick_pipeline2 to work with
import pickle
with open("userclick_pipeline2.pkl","rb") as f:
    userclick_pipeline2 = pickle.load(f)


# This is just to initially get the column names
training_set = pd.read_csv("train.csv",
                           nrows=2,
                           dtype = "str")
column_names = training_set.columns

train_file = "train.csv"

def stream_training_data(filename,batchsize,skiprows = 3000000):
    import warnings
    warnings.filterwarnings('ignore')
    batch = pd.read_csv(filename,skiprows = skiprows,names = list(column_names),nrows= batchsize,dtype = "str")
    print("Read the new batch.")
    # Seperate features and labels
    X_train = batch.drop(["is_attributed","attributed_time"], axis = 1)
    y_train = pd.to_numeric(batch.is_attributed)
    # Process features using the pipeline
    X_train = userclick_pipeline2.transform(X_train)
    print("Processed the new batch.")
    # Return features and labels ready for training
    return (X_train, y_train)
In [71]:
# Write a function that return secondary features
diskname = ".."
import pandas as pd
# Load established classifiers
with open((diskname + str("log_final.pkl")), "rb") as f:
    clf1 = pickle.load(f)
with open((diskname + str("svm_final.pkl")), "rb") as f:
    clf2 = pickle.load(f)
with open((diskname + str("ab_final.pkl")), "rb") as f:
    clf3 = pickle.load(f)    
print("Loaded classifiers.") 
    
def secondary_features(X_train):   
    # Collect prediction probabilities as new features    
    sec_features = pd.DataFrame()
    sec_features["f1"] = clf1.predict_proba(X_train)[:,1]
    sec_features["f2"] = clf2.predict_proba(X_train)[:,1]
    sec_features["f3"] = clf3.predict_proba(X_train)[:,1]
    print(">>> From secondary_features: Collected features.")
    # Return new features as array
    return sec_features.values

# Prepare two validation sets: load and transform them into secondary features
# Features:
with open("X_val1_trans_pl2.pkl","rb") as f:
    X_val1 = secondary_features(pickle.load(f)) 
# labels:    
with open("y_val1.pkl","rb") as f:
    y_val1= pickle.load(f)

print("Loaded and transformed Val 1.")    
    
# Features:
with open("X_val2_trans_pl2.pkl","rb") as f:
    X_val2 = secondary_features(pickle.load(f)) 
# labels:    
with open("y_val2.pkl","rb") as f:
    y_val2= pickle.load(f)

print("Loaded and transformed Val 2.")  
Loaded classifiers.
In [ ]:
# Write a main loop to train classifiers and accumulate performance metrics
from sklearn.linear_model import SGDClassifier
from sklearn.naive_bayes import MultinomialNB  

# Instantiate classifiers
mnbi = MultinomialNB(alpha = 0.01)
sgdc = SGDClassifier(loss="log")

all_classes = np.array([0, 1])
skiprows = 3000000
batchsize = 10000
mnbi_vscore = []
sgdc_vcsore = []
mnbi_vscore_max = 0
sgdc_vcsore_max = 0

start = datetime.datetime.now()

for i in range(1,10000):
    start_batch = datetime.datetime.now()
    print("Starting a new batch.")
    
    ##################
    # Data Streaming
    ##################
    
    # Stream a small batch of training data to convert into secondary features
    X_train, y_train = stream_training_data(filename= train_file,batchsize= batchsize, skiprows= skiprows)
    print("Streamed new batch of training data.")
    X_train_ensemble = secondary_features(X_train)
    print("Processed new batch of training data.")
    skiprows += batchsize
    
    #######################
    # Training Classifiers
    #######################
    
    # Train mnbi classifier using partial_fit and secondary features
    mnbi.partial_fit(X_train_ensemble, y_train,all_classes)
    print("Trained mnbi classifier.")
    
    # Train sgdc classifier using partial_fit and secondary features
    sgdc.partial_fit(X_train_ensemble, y_train,all_classes)
    print("Trained sgdc classifier.")
    
    ###############################
    # Model validation and Update
    ###############################
    
    
    ###############
    clf = mnbi
    # Make predictions and calculate average valdation roc score 
    # calculate out-of-the-box roc_score using validation set 1
    probs = clf.predict_proba(X_val1)
    probs = probs[:,1]
    val1_roc = roc_auc_score(y_val1,probs)
    
    # calculate out-of-the-box roc_score using validation set 2
    probs = clf.predict_proba(X_val2)
    probs = probs[:,1]
    val2_roc = roc_auc_score(y_val2,probs)
    
    # Accumulate the mean validation score to be maximized 
    mnbi_vscore.append(np.array([val1_roc,val2_roc]).mean())
    print("mnbi completed batch " + str(i) + " with validation score: " + str(np.array([val1_roc,val2_roc]).mean()))
    
    # Save the interim model if validation score has increased
    if np.array([val1_roc,val2_roc]).mean() > mnbi_vscore_max:
        mnbi_vscore_max = np.array([val1_roc,val2_roc]).mean()
        with open((diskname + str("ensmb_mnbi_final.pkl")), "wb") as f:
            pickle.dump(mnbi,f)
        print("**** mnbi_vscore_max exceeded, model updated.****")
    
    ################
    clf = sgdc
    # Make predictions and calculate average valdation roc score 
    # calculate out-of-the-box roc_score using validation set 1
    probs = clf.predict_proba(X_val1)
    probs = probs[:,1]
    val1_roc = roc_auc_score(y_val1,probs)
    
    # calculate out-of-the-box roc_score using validation set 2
    probs = clf.predict_proba(X_val2)
    probs = probs[:,1]
    val2_roc = roc_auc_score(y_val2,probs)
    
    # Accumulate the mean validation score to be maximized 
    sgdc_vcsore.append(np.array([val1_roc,val2_roc]).mean())
    print("sgdc completed batch " + str(i) + " with validation score: " + str(np.array([val1_roc,val2_roc]).mean()))
    
    # Save the interim model if validation score has increased
    if np.array([val1_roc,val2_roc]).mean() > sgdc_vcsore_max:
        sgdc_vcsore_max = np.array([val1_roc,val2_roc]).mean()
        with open((diskname + str("ensmb_sgdc_final.pkl")), "wb") as f:
            pickle.dump(sgdc,f)
        print("**** sgdc_vscore_max exceeded, model updated. ****")
    
    
    end_batch = datetime.datetime.now()
    process_time = end_batch - start_batch
    process_todate = end_batch - start
    print("Batch completed in: " +str(process_time.seconds/60) + " minutes." )
    print("So far it took: " +str(process_todate.seconds/60) + " minutes." )
    print("So far model have seen " + str(i * batchsize) + " training samples.")
    print("--" * 40)
    
In [88]:
import matplotlib.pyplot as plt
training_samples = np.array(range(1,1068)) * 10000    
In [116]:
plt.scatter(x = training_samples[:500], y = mnbi_vscore[:500], label = "MNBayes", s = 1)
plt.scatter(x = training_samples[:500], y = sgdc_vcsore[:500], label = "SGDC",s = 1)
plt.xlabel("# of training samples")
plt.ylabel("ROC score")
plt.title("Classifier performance with streaming data")
plt.ylim(ymin = 0.952, ymax = 0.9528)
plt.legend()
plt.show()

As we can notice, SGD classifier perform better in this data set. What you can also notice is that the performance of Naive Bayes classifier increased until the model seen about 30 million training examples, but after that the perfomance seems to be decaying. Therefore, when you choose to perform online training, it is important to train multiple classifiers and follow their performance for quite some time.

Naive Bayes is an expensive algorithm, but you can change the training batch size to boost computational performance in certain cases. However, remember that reducing bias this way does not gurantee the best predictive performance. As the daat set grows, your model will experience new trends and attributes in the data it has previously never seen. Since these models are linear in their nature, their capacity to fully capture some of these relationships will be limited. You may add polynomial interaction terms in your limited number of features to capture some of these changes.

As a general principle, incremental learning of a Naive Bayes or similar model can be most beneficial when most of the relatioships between features and the outcome is linear or first/second degree polynomial. For more complex relationships, we may choose a Neural network, which we can also train using streaming data. I hope to follow up and illustrate this approach in a future write up.