Skip to content

MindsDB and Ray Serve

Simple example - Logistic regression

Ray serve is a simple high-throughput service that can wrap over your own ml models. In this example, we will train and predict with an external scikit-learn model. First, let's look at the actual model wrapped inside a class that complies with the above requirements:

import ray
from fastapi import Request, FastAPI
from ray import serve
import time
import pandas as pd
import json
from sklearn.linear_model import LogisticRegression


app = FastAPI()
ray.init()
serve.start(detached=True)


async def parse_req(request: Request):
    data = await request.json()
    target = data.get('target', None)
    di = json.loads(data['df'])
    df = pd.DataFrame(di)
    return df, target


@serve.deployment(route_prefix="/my_model")
@serve.ingress(app)
class MyModel:
    @app.post("/train")
    async def train(self, request: Request):
        df, target = await parse_req(request)
        feature_cols = list(set(list(df.columns)) - set([target]))
        self.feature_cols = feature_cols
        X = df.loc[:, self.feature_cols]
        Y = list(df[target])
        self.model = LogisticRegression()
        self.model.fit(X, Y)
        return {'status': 'ok'}

    @app.post("/predict")
    async def predict(self, request: Request):
        df, _ = await parse_req(request)
        X = df.loc[:, self.feature_cols]
        predictions = self.model.predict(X)
        pred_dict = {'prediction': [float(x) for x in predictions]}
        return pred_dict


MyModel.deploy()

while True:
    time.sleep(1)

The important bits here are having train and predict endpoints.

The train endpoint accept two parameters in the JSON sent via POST: - df -- a serialized dictionary that can be converted into a pandas dataframe - target -- the name of the target column

The predict endpoint needs only one parameter: - df -- a serialized dictionary that can be converted into a pandas dataframe

The training endpoints must return a JSON that contains the keys status set to ok. The predict endpoint must return a dictionary containing the prediction key, storing the predictions. Additional keys can be returned for confidence and confidence intervals.

Once you start this RayServe-wrapped model you can train it using a query like this one:

CREATE PREDICTOR mindsdb.byom_ray_serve
FROM mydb (
    SELECT number_of_rooms, initial_price, rental_price 
    FROM test_data.home_rentals
) 
PREDICT number_of_rooms
USING
    url.train = 'http://127.0.0.1:8000/my_model/train',
    url.predict = 'http://127.0.0.1:8000/my_model/predict',
    dtype_dict={"number_of_rooms": "categorical", "initial_price": "integer", "rental_price": "integer"},
    format='ray_server';

And you can query predictions as usual, either by conditioning on a subset of input columns:

SELECT *
FROM byom_ray_serve
WHERE initial_price=3000
AND rental_price=3000;

Or by JOINING to do batch predictions:

SELECT tb.number_of_rooms, t.rental_price
FROM mydb.test_data.home_rentals AS t
JOIN mindsdb.byom_ray_serve AS tb
WHERE t.rental_price > 5300;

Please note that, if your model is behind a reverse proxy (e.g. nginx) you might have to increase the maximum limit for POST requests in order to receive the training data. MindsDB itself can send as much as you'd like and has been stress-tested with over a billion rows.

Example - Keras NLP model

For this example, we will consider a natural language processing (NLP) task where we want to train a neural network with Keras to detect if a tweet is related to a natural disaster (fires, earthquakes, etc.). Please download this dataset to follow the example.

The code for the model here is a bit more complex than in section 1.1, but the same rules apply: we create a Ray Server based service that wraps around a Kaggle NLP Model which can be trained and then used for predictions:

import re
import time
import json
import string
import requests
from collections import Counter, defaultdict

import ray
from ray import serve

import gensim
import numpy as np
import pandas as pd
from tqdm import tqdm
from nltk.util import ngrams
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from fastapi import Request, FastAPI
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense, SpatialDropout1D
from tensorflow.keras.initializers import Constant
from tensorflow.keras.optimizers import Adam

app = FastAPI()
stop = set(stopwords.words('english'))


async def parse_req(request: Request):
    data = await request.json()
    target = data.get('target', None)
    di = json.loads(data['df'])
    df = pd.DataFrame(di)
    return df, target


@serve.deployment(route_prefix="/nlp_kaggle_model")
@serve.ingress(app)
class Model:
    MAX_LEN = 100
    GLOVE_DIM = 50
    EPOCHS = 10

    def __init__(self):
        self.model = None

    @app.post("/train")
    async def train(self, request: Request):
        df, target = await parse_req(request)

        target_arr = df.pop(target).values
        df = self.preprocess_df(df)
        train_corpus = self.create_corpus(df)

        self.embedding_dict = {}
        with open('./glove.6B.50d.txt', 'r') as f:
            for line in f:
                values = line.split()
                word = values[0]
                vectors = np.asarray(values[1:], 'float32')
                self.embedding_dict[word] = vectors
        f.close()

        self.tokenizer_obj = Tokenizer()
        self.tokenizer_obj.fit_on_texts(train_corpus)

        sequences = self.tokenizer_obj.texts_to_sequences(train_corpus)
        tweet_pad = pad_sequences(sequences, maxlen=self.__class__.MAX_LEN, truncating='post', padding='post')
        df = tweet_pad[:df.shape[0]]

        word_index = self.tokenizer_obj.word_index
        num_words = len(word_index) + 1
        embedding_matrix = np.zeros((num_words, self.__class__.GLOVE_DIM))

        for word, i in tqdm(word_index.items()):
            if i > num_words:
                continue

            emb_vec = self.embedding_dict.get(word)
            if emb_vec is not None:
                embedding_matrix[i] = emb_vec

        self.model = Sequential()
        embedding = Embedding(num_words,
                              self.__class__.GLOVE_DIM,
                              embeddings_initializer=Constant(embedding_matrix),
                              input_length=self.__class__.MAX_LEN,
                              trainable=False)
        self.model.add(embedding)
        self.model.add(SpatialDropout1D(0.2))
        self.model.add(LSTM(64, dropout=0.2, recurrent_dropout=0.2))
        self.model.add(Dense(1, activation='sigmoid'))

        optimizer = Adam(learning_rate=1e-5)
        self.model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])

        X_train, X_test, y_train, y_test = train_test_split(df, target_arr, test_size=0.15)
        self.model.fit(X_train, y_train, batch_size=4, epochs=self.__class__.EPOCHS, validation_data=(X_test, y_test), verbose=2)

        return {'status': 'ok'}

    @app.post("/predict")
    async def predict(self, request: Request):
        df, _ = await parse_req(request)

        df = self.preprocess_df(df)
        test_corpus = self.create_corpus(df)

        sequences = self.tokenizer_obj.texts_to_sequences(test_corpus)
        tweet_pad = pad_sequences(sequences, maxlen=self.__class__.MAX_LEN, truncating='post', padding='post')
        df = tweet_pad[:df.shape[0]]

        y_pre = self.model.predict(df)
        y_pre = np.round(y_pre).astype(int).flatten().tolist()
        sub = pd.DataFrame({'target': y_pre})

        pred_dict = {'prediction': [float(x) for x in sub['target'].values]}
        return pred_dict

    def preprocess_df(self, df):
        df = df[['text']]
        df['text'] = df['text'].apply(lambda x: self.remove_URL(x))
        df['text'] = df['text'].apply(lambda x: self.remove_html(x))
        df['text'] = df['text'].apply(lambda x: self.remove_emoji(x))
        df['text'] = df['text'].apply(lambda x: self.remove_punct(x))
        return df

    def remove_URL(self, text):
        url = re.compile(r'https?://\S+|www\.\S+')
        return url.sub(r'', text)

    def remove_html(self, text):
        html = re.compile(r'<.*?>')
        return html.sub(r'', text)

    def remove_punct(self, text):
        table = str.maketrans('', '', string.punctuation)
        return text.translate(table)

    def remove_emoji(self, text):
        emoji_pattern = re.compile("["
                                   u"\U0001F600-\U0001F64F"  # emoticons
                                   u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                                   u"\U0001F680-\U0001F6FF"  # transport & map symbols
                                   u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                                   u"\U00002702-\U000027B0"
                                   u"\U000024C2-\U0001F251"
                                   "]+", flags=re.UNICODE)
        return emoji_pattern.sub(r'', text)

    def create_corpus(self, df):
        corpus = []
        for tweet in tqdm(df['text']):
            words = [word.lower() for word in word_tokenize(tweet) if ((word.isalpha() == 1) & (word not in stop))]
            corpus.append(words)
        return corpus


if __name__ == '__main__':

    ray.init()
    serve.start(detached=True)

    Model.deploy()

    while True:
        time.sleep(1)

We need access to the training data, so we'll create a table called nlp_kaggle_train to load the dataset that the original model uses.

And ingest it into a table with the following schema:

id INT,
keyword VARCHAR(255),
location VARCHAR(255),
text VARCHAR(5000),
target INT

Note: specifics of the schema and how to ingest the csv will vary depending on your database.

Next, we can register and train the above custom model using the following query:

CREATE PREDICTOR mindsdb.byom_ray_serve_nlp
FROM maria (
    SELECT text, target
    FROM test.nlp_kaggle_train
) PREDICT target
USING
    url.train = 'http://127.0.0.1:8000/nlp_kaggle_model/train',
    url.predict = 'http://127.0.0.1:8000/nlp_kaggle_model/predict',
    dtype_dict={"text": "rich_text", "target": "integer"},
    format='ray_server';

Training will take a while given that this model is a neural network rather than a simple logistic regression. You can check its status with the query SELECT * FROM mindsdb.predictors WHERE name = 'byom_ray_serve_nlp';, much like you'd do with a "normal" MindsDB predictor.

Once the predictor's status becomes trained we can query it for predictions as usual:

SELECT *
FROM mindsdb.byom_ray_serve_nlp
WHERE text='The tsunami is coming, seek high ground';

Which would, hopefully, output 1. Alternatively, we can try out this tweet to expect 0 as an output:

SELECT *
FROM mindsdb.byom_ray_serve_nlp
WHERE text='This is lovely dear friend';

If your results do not match this example, it could help to train the model for a longer amount of epochs.