Ray Serve is a simple high-throughput model serving library that can wrap around your ML model.
Simple Example of Logistic Regression
In this example, we train an external scikit-learn model to use for making
predictions.
Creating the Ray Serve Model
Let’s look at an actual model wrapped by a class that complies with the
requirements.
After saving the above code into rayserve.py
, run it using serve run rayserve:my_app
.
It is important to have the /train
and /predict
endpoints.
The /train
endpoint accepts two parameters to be sent via POST:
df
is a serialized dictionary that can be converted into a pandas dataframe.
target
is the name of the target column to be predicted.
It returns a JSON object containing the status
key and the ok
value.
The /predict
endpoint requires one parameter to be sent via POST:
df
is a serialized dictionary that can be converted into a pandas dataframe.
It returns a dictionary containing the prediction
and index
keys. It stores the
predictions. Additional keys can be returned for confidence and confidence
intervals.
Bringing the Ray Serve Model to MindsDB
Once you start the RayServe-wrapped model, you can create and train it in
MindsDB.
Now, you can fetch predictions using the standard MindsDB syntax. Follow the
guide on the SELECT
statement to learn more.
You can directly pass input data in the WHERE
clause to get a single
prediction.
Or you can JOIN
the model wth a data table to get bulk predictions.
Limit for POST Requests
Please note that if your model is behind a reverse proxy like nginx, you might
have to increase the maximum limit for POST requests in order to receive the
training data. MindsDB can send as much as you’d like - it has been
stress-tested with over a billion rows.
Example of Keras NLP Model
Here, we consider a
natural language processing (NLP) task
where we want to train a neural network using Keras to
detect if a tweet is related to a natural disaster, such as fires, earthquakes,
etc. Please download
this dataset to follow the
example.
Creating the Ray Serve Model
We create a Ray Serve service that wraps around the
Kaggle NLP Model
that can be trained and used for making predictions.
import re
import time
import json
import string
import requests
from collections import Counter, defaultdict
import ray
from ray import serve
import gensim
import numpy as np
import pandas as pd
from tqdm import tqdm
from nltk.util import ngrams
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from fastapi import Request, FastAPI
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense, SpatialDropout1D
from tensorflow.keras.initializers import Constant
from tensorflow.keras.optimizers import Adam
app = FastAPI()
stop = set(stopwords.words('english'))
async def parse_req(request: Request):
data = await request.json()
target = data.get('target', None)
di = json.loads(data['df'])
df = pd.DataFrame(di)
return df, target
@serve.deployment(route_prefix="/nlp_kaggle_model")
@serve.ingress(app)
class Model:
MAX_LEN = 100
GLOVE_DIM = 50
EPOCHS = 10
def __init__(self):
self.model = None
@app.post("/train")
async def train(self, request: Request):
df, target = await parse_req(request)
target_arr = df.pop(target).values
df = self.preprocess_df(df)
train_corpus = self.create_corpus(df)
self.embedding_dict = {}
with open('./glove.6B.50d.txt', 'r') as f:
for line in f:
values = line.split()
word = values[0]
vectors = np.asarray(values[1:], 'float32')
self.embedding_dict[word] = vectors
f.close()
self.tokenizer_obj = Tokenizer()
self.tokenizer_obj.fit_on_texts(train_corpus)
sequences = self.tokenizer_obj.texts_to_sequences(train_corpus)
tweet_pad = pad_sequences(sequences, maxlen=self.__class__.MAX_LEN, truncating='post', padding='post')
df = tweet_pad[:df.shape[0]]
word_index = self.tokenizer_obj.word_index
num_words = len(word_index) + 1
embedding_matrix = np.zeros((num_words, self.__class__.GLOVE_DIM))
for word, i in tqdm(word_index.items()):
if i > num_words:
continue
emb_vec = self.embedding_dict.get(word)
if emb_vec is not None:
embedding_matrix[i] = emb_vec
self.model = Sequential()
embedding = Embedding(num_words,
self.__class__.GLOVE_DIM,
embeddings_initializer=Constant(embedding_matrix),
input_length=self.__class__.MAX_LEN,
trainable=False)
self.model.add(embedding)
self.model.add(SpatialDropout1D(0.2))
self.model.add(LSTM(64, dropout=0.2, recurrent_dropout=0.2))
self.model.add(Dense(1, activation='sigmoid'))
optimizer = Adam(learning_rate=1e-5)
self.model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
X_train, X_test, y_train, y_test = train_test_split(df, target_arr, test_size=0.15)
self.model.fit(X_train, y_train, batch_size=4, epochs=self.__class__.EPOCHS, validation_data=(X_test, y_test), verbose=2)
return {'status': 'ok'}
@app.post("/predict")
async def predict(self, request: Request):
df, _ = await parse_req(request)
df = self.preprocess_df(df)
test_corpus = self.create_corpus(df)
sequences = self.tokenizer_obj.texts_to_sequences(test_corpus)
tweet_pad = pad_sequences(sequences, maxlen=self.__class__.MAX_LEN, truncating='post', padding='post')
df = tweet_pad[:df.shape[0]]
y_pre = self.model.predict(df)
y_pre = np.round(y_pre).astype(int).flatten().tolist()
sub = pd.DataFrame({'target': y_pre})
pred_dict = {'prediction': [float(x) for x in sub['target'].values]}
return pred_dict
def preprocess_df(self, df):
df = df[['text']]
df['text'] = df['text'].apply(lambda x: self.remove_URL(x))
df['text'] = df['text'].apply(lambda x: self.remove_html(x))
df['text'] = df['text'].apply(lambda x: self.remove_emoji(x))
df['text'] = df['text'].apply(lambda x: self.remove_punct(x))
return df
def remove_URL(self, text):
url = re.compile(r'https?://\S+|www\.\S+')
return url.sub(r'', text)
def remove_html(self, text):
html = re.compile(r'<.*?>')
return html.sub(r'', text)
def remove_punct(self, text):
table = str.maketrans('', '', string.punctuation)
return text.translate(table)
def remove_emoji(self, text):
emoji_pattern = re.compile("["
u"\U0001F600-\U0001F64F"
u"\U0001F300-\U0001F5FF"
u"\U0001F680-\U0001F6FF"
u"\U0001F1E0-\U0001F1FF"
u"\U00002702-\U000027B0"
u"\U000024C2-\U0001F251"
"]+", flags=re.UNICODE)
return emoji_pattern.sub(r'', text)
def create_corpus(self, df):
corpus = []
for tweet in tqdm(df['text']):
words = [word.lower() for word in word_tokenize(tweet) if ((word.isalpha() == 1) & (word not in stop))]
corpus.append(words)
return corpus
if __name__ == '__main__':
ray.init()
serve.start(detached=True)
Model.deploy()
while True:
time.sleep(1)
Now, we need access to the training data. For that, we create a table called
nlp_kaggle_train
to load the
dataset that the original model
uses. The nlp_kaggle_train
table contains the following columns:
Please note that the specifics of the schema/table and how to ingest the CSV
data vary depending on your database.
Bringing the Ray Serve Model to MindsDB
Now, we can create and train this custom model in MindsDB.
The training process takes some time, considering that this model is a neural
network rather than a simple logistic regression.
You can check the model status using this query:
Once the status of the predictor has a value of trained
, you can fetch
predictions using the standard MindsDB syntax. Follow the guide on the
SELECT
statement to learn more.
The expected output of the query above is 1
.
The expected output of the query above is 0
.
Wrong Results?
If your results do not match this example, try training the model for a longer
amount of epochs.