Issue
I am training a Calibrated Classifier on Google Cloud Scheduler every day which takes about 5 mins to run. My python script receives latest data (from that day) and concatenate it to the original data and then the model gets trained and saves the pickled files on Cloud Storage. The issue I am facing now is, if it takes more than 5 mins (which it will at some point), it gives an upstream request timeout error.
I imagine, that it because of the more time the model is taking to train and I can think of one solution where I train the model only on the new data and update the weights of the original model in the pickled file. However, I am not sure if its possible.
Below is my function that runs on the scheduler:
def train_model():
users, tasks, tags, task_tags, task_user, boards = connect_postgres() ##loading the data from a postgres function
storage_client = storage.Client()
bucket = storage_client.get_bucket('my-bucket')
blob = bucket.blob('original_data.pkl')
pickle_in0 = blob.download_as_string()
data = pickle.loads(pickle_in0)
tasks = tasks.rename(columns={'id': 'task_id', 'name': 'task_name'})
# Joining tasks and task_user_assigns tables
tasks = tasks[tasks.task_name.isnull() == False]
task_user = task_user[['id', 'task_id', 'user_id']].rename(columns={'id': 'task_user_id'})
task_data = tasks.merge(task_user, on='task_id', how='left')
# Joining users with the task_data
users = users[['id', 'email']].rename(columns={'id': 'user_id'})
users_tasks = task_data.merge(users, on='user_id', how='left')
users_tasks = users_tasks[users_tasks.user_id.isnull() == False].reset_index(drop=True)
# Joining boards table to user_tasks
boards = boards[['id', 'name']].rename(columns={'id': 'board_id', 'name': 'board_name'})
users_board = users_tasks.merge(boards, on='board_id', how='left').reset_index(drop=True)
# Data Cleaning
translator = Translator() # This is to translate if the tasks are not in English
users_board["task_trans"] = users_board["task_name"].map(lambda x: translator.translate(x, dest="en").text)
users_board['task_trans'] = users_board['task_trans'].apply(lambda x: remove_emoji(x)) #This calls a function to remove Emoticons from text
users_board['task_trans'] = users_board['task_trans'].apply(lambda x: remove_punct(x)) #This calls a function to remove punctuations from text
users_board = users_board[['task_id', 'email', 'board_id', 'user_id', 'task_trans']]
data1 = pd.concat([data, users_board], axis=0)
df1 = data1.copy
X = df1.task_trans #all the observations
y = df1.user_id #all the lables
print(y.nunique())
#FROM HERE ON, THE TRAINING SCRIPT BEGINS
count_vect = CountVectorizer()
X_train_counts = count_vect.fit_transform(X)
tf_transformer = TfidfTransformer().fit(X_train_counts)
X_train_transformed = tf_transformer.transform(X_train_counts)
print('model 1 done')
labels = LabelEncoder()
y_train_labels_fit = labels.fit(y)
y_train_lables_trf = labels.transform(y)
linear_svc = LinearSVC()
clf = linear_svc.fit(X_train_transformed, y_train_lables_trf)
print('model 2 done')
calibrated_svc = CalibratedClassifierCV(base_estimator=linear_svc, cv="prefit")
calibrated_svc.fit(X_train_transformed, y_train_lables_trf)
print('model 3 done')
# SAVING THE MODELS ON GOOGLE CLOUD STORAGE
# storage_client = storage.Client()
fs = gcsfs.GCSFileSystem(project='my-project')
filename = '~path/svc.sav'
pickle.dump(calibrated_svc, fs.open(filename, 'wb'))
filename = '~path/count_vectorizer.sav'
pickle.dump(count_vect, fs.open(filename, 'wb'))
filename = '~path/tfidf_vectorizer.sav'
pickle.dump(tf_transformer, fs.open(filename, 'wb'))
blob = bucket.blob('data.pkl')
pickle_out = pickle.dumps(df1)
blob.upload_from_string(pickle_out)
return "success"
Any idea how to achieve that? Or any other strategy that I can follow to solve this problem?
Solution
I couldn't find a way to update the weights of a pickle file and eventually settled with increasing the timeout parameter in cloud run to more than the training time and it fixed the issue for the time being.
Answered By - vaisxn
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.