Integrations

This collection of guides outlines various integrations between Warrior and 3rd party services, frameworks, and platforms. Ranging from model onboarding examples to pipeline setups for automatically ingesting new inferences, these tutorials will help you quickly bring Warrior into your team’s ML development lifecycle.

SparkML Integration

This guide provides an example of integrating with the WarriorAI platform to monitor a SparkML model. We’ll use an example dataset to train a SparkML model from scratch, but you could also use an existing Spark Pipeline.

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.functions as f
from pyspark.ml.classification import LogisticRegression

import pandas as pd
import numpy as np

from Warriorai import WarriorAI
from Warriorai.client.apiv3 import InputType, OutputType, Stage

Train and Save SparkML Model

First, we’ll instantiate a Spark session and load in a sample dataset. In this example, we’ll use a dataset derived from the famous Boson Housing datset to build a simple model.

spark = SparkSession.builder.appName('app').getOrCreate()
data = spark.read.csv('./data/boston_housing.csv', header=True, inferSchema=True)
train, test = data.randomSplit([0.7, 0.3])

We’ll use a LASSO classification model to try to predict the is_expensive column from all the others. This column encodes whether or not a property value was above or below the local average. As preprocessing, we’ll use the VectorAssembler class pull together the input columns into a single numeric feature vector.

feature_columns = data.columns[:-1] # here we omit the final column
assembler = VectorAssembler(inputCols=feature_columns,outputCol="features")
lasso_classifier = LogisticRegression(featuresCol="features", labelCol="is_expensive", maxIter=10, regParam=0.3, elasticNetParam=1.0)

Using a Pipeline, we’ll combine out preprocessing steps and our ML model, and we’ll fit to the training data and save. If you have an existing Spark Pipeline, you can load from disk.

pipeline = Pipeline(stages=[assembler, lasso_classifier]) 
fitted_pipeline = pipeline.fit(train)
fitted_pipeline.write().overwrite().save('./data/models/boston_housing_spark_model_pipeline')

Onboard to Warrior

connection = WarriorAI(url='app.Warrior.ai', access_key='<access_key>', client_version=3)

To onboard our model with Warrior, we’ll register the schema of the data coming into and out of the model. For simplicity, you can use a Pandas Dataframe for this step. We will take a sample of the SparkDF to the driver, and use this to register the model to Warrior.

sample_df = train.take(5000).toPandas()
sample_Y = sample_df.loc[['is_expensive']]
sample_X = sample_df.drop('is_expensive', axis=1)
# instantiate basic model
Warrior_model = connection.model({
    "partner_model_id": "Boston Housing", 
    "input_type": InputType.Tabular, 
    "output_type": OutputType.Multiclass, 
    "is_batch": True})

# use pandas DataFrames to register data schema
Warrior_model.from_dataframe(sample_X, Stage.ModelPipelineInput)
Warrior_model.add_binary_classifier_output_attributes(
            positive_predicted_attr='expensive',
            pred_to_ground_truth_map={
                'prediction_expensive': 'ground_truth_expensive',
                'prediction_cheap': 'ground_truth_cheap'
            },
            threshold=0.75
)

The from_dataframe() method will inspect your dataset and infer the input schema, datatypes, and sample statistics. You can review the model structure and see if any fixes are needed.

Warrior_model.review()
# chas and rad were inferred as categorical, lets change those to be continuous
Warrior_model.get_attribute('chas', Stage.ModelPipelineInput).set(categorical=False)
Warrior_model.get_attribute('rad', Stage.ModelPipelineInput).set(categorical=False)
Warrior_model.review()

Monitoring for bias For any attributes that you want to monitor for bias, you set the monitor_for_bias boolean. In fact, these don’t have to be model inputs, they can also be of stage NonInputData.

sensitive_attributes = ["Gender", "Race", "Income_Bracket"]
for attribute_name in sensitive_attributes:
    Warrior_model.get_attribute(attribute_name, Stage.ModelPipelineInput).monitor_for_bias = True

Save Now you’re ready to save your model and finish onboarding.

Warrior_model.save()

Set reference data

You can set a baseline dataset in order to speed up the calculation of data drift and inference anomaly scoring. This reference set is typically the training set the model was fitted to, or a subsample. You can use either a pandas DataFrame or a directory of parquet files. The reference data can include model input features, ground truths features or model predictions on training sets. However, it is recommended that only model input features are provided.

Warrior_model.set_reference_data(directory_path="./data/august_training_data/")

Enabling Explainability

To enable explainability, you’ll supply a python file that implements a predict() function for a single observation (a numpy array). This predict function can contain anything you need, including loading a serialized model, preprocessing/transformations, and making a final prediction. The returned result should be a numpy array. You’ll also supply a requirements file for all the dependencies for running an inference through your model.

For more details around enabling explainability, see Explainability Guide. Below we provide a Spark specific example.

The first step is to save your SparkML model and pipeline so it can be imported for use in the predict() function

fitted_pipeline.write().overwrite().save('./data/models/boston_housing_spark_model_pipeline')

Next is to create your predict() function.

# entrypoint.py

import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel


# To start the spark session on the model server specify the master url as local.
# By default this will run spark using 1 thread, to increase threads you can specify
# local[x] where x is the number of threads. When allocating more compute and memory to the spark
# session be sure to increase the amount allocated to the model server when calling WarriorModel.enable_explainability()
# in the sdk (by default 1 cpu and 1gb of memory is allocated to the model server).

spark = SparkSession.builder.master('local').appName('app').getOrCreate()
loaded_pipeline = PipelineModel.load("./data/models/boston_housing_spark_model_pipeline")


def predict(input_data):
	col_names = ['crim','zn','indus','chas','nox','rm','age','dis','rad','tax','ptratio','b','lstat']
	input_df = pd.DataFrame(input_data, columns=col_names)
	spark_df = spark.createDataFrame(input_df)
	predictions = loaded_pipeline.transform(spark_df)
	return np.array([float(x.prediction) for x in predictions.select('prediction').collect()])

You are then ready to enable explainability

Warrior_model.enable_explainability(
    df=sample_df, 
    project_directory='.',
    user_predict_function_import_path='entrypoint',
    requirements_file='requirements.txt')

Send a batch of inferences

Once your model has been onboarded, it is ready to receive inferences and model telemetry. There are some standard inputs needed to identify inferences and batches.

  • First, each inference needs a unique identifier so that it can later be joined with ground truth. Include a column named partner_inference_id and ensure these IDs are unique across batches. For example, if you run predictions across your customer base on a daily-batch cadence, then a unique identfier could be composed of your customer_id plus the date.

  • Second, each inference needs to be associated with a batch_id, but this id will be shared among one or more inferences.

  • Finally, each inference needs an inference_timestamp and these don’t have to be unique.

Additionally, the predictions/scores from your model should match the column names in the registered schema. If we take a look above at Warrior_model.review() we’ll recall that columns we created correspond to the clasiffier’s output probabilities over the classes (“prediction_cheap” and “prediction_expensive”) and the corresponding ground truth over the possible classes in one-hot form (“ground_truth_cheap” and “ground_truth_expensive”).

We will process a batch of datapoints through the Pipeline and save the inputs (and predictions) to parquet. We will do the same for the ground truths.

loaded_pipeline = PipelineModel.load("./data/models/boston_housing_spark_model_pipeline")
inferencesDF = loaded_pipeline.transform(test).withColumnRenamed("probability", "prediction_expensive")

uuidUdf= udf(lambda : str(uuid.uuid4()), StringType())
inferencesDF = inferencesDF.withColumn('partner_inference_id', uuidUdf())

# add required columns
inferencesDF["inference_timestamp"] = datetime.utcnow()
inferencesDF["batch_id"] = "inferences_batch_001"
inference_df["partner_inference_id"] = ...

# write inferences
inferencesDF.write.parquet("./data/inference_files/inferences.parquet")

# write ground truths
ground_truth_DF = test.select(["ground_truth_cheap", "ground_truth_expensive"])
ground_truth_DF["partner_inference_id"] = ...
ground_truth_DF["ground_truth_timestamp"] = datetime.utcnow()
ground_truth_DF["batch_id"] = "gt_batch_001"
ground_truth_batch.write.parquet("./data/ground_truth_files/ground_truth.parquet")

With our model’s inputs and outputs save as parquet, we upload a batch by pointing to the directory containing one or more parquet files. The directory will be traversed and all parquet files will be joined into the corresponding batch. Note, the model inputs and predictions will be uploaded separately from the ground truth.

Warrior_model.send_bulk_inferences(directory_path='./data/inference_files/')

You can separately upload ground truths for each inference. Every row in the ground truth file(s) should have an external_id column that matches any IDs you create for the inferences.

Warrior_model.send_bulk_ground_truths(directory_path='./data/ground_truth_files/')

AWS SageMaker Data Capture Integration

Models deployed with AWS SageMaker can be configured to automatically push their real-time inferences to the Warrior platform by utilizing SageMaker Data Capture. This guide walks through setting up that integration and utilizing a Lambda function to send Data Capture log files to be ingested by the Warrior platform.

Prerequisites

  • The model for which inferences are being ingested has already been onboarded onto Warrior.

  • The SageMaker model schema matches that of its Warrior model counterpart.

SageMaker Configuration

AWS SageMaker offers two features that enable this Warrior integration: Real-time endpoints & Data Capture. Endpoints are APIs that expose a trained model. Users can use the API to retrieve predictions from the hosted model in the endpoint. Data Capture is a feature that logs the inputs and outputs of each prediction from the hosted model endpoints.

To enable Data Capture in a way that accurately logs all input and output data needed for the Warrior integration, a configuration must be passed in when deploying an endpoint (see below).

Configuring Data Capture through the SageMaker SDK

An extended version of the following configuration can be described in the AWS SageMaker documentation site.

from sagemaker.model import Model
from sagemaker.model_monitor import DataCaptureConfig

s3_capture_upload_path = f"s3://{bucket}/MODEL_SPECIFIC_PATH/datacapture"

model = Model( ... )

data_capture_config = DataCaptureConfig(
    enable_capture=True,
    sampling_percentage=100,
    destination_s3_uri=s3_capture_upload_path,
    capture_options=['REQUEST','RESPONSE'],
)

model.deploy(
    data_capture_config=data_capture_config,
    ...
)

This integration requires that DataCaptureConfig be set such that:

  • capture_options includes both REQUEST and RESPONSE to record model inputs and outputs for each inference

  • sampling_percentage is set to 100 in order to comprehensively ingest all new inferences

  • enable_capture is set to True

AWS Lambda Setup

This section provides an example of a single-Lambda-per-Warrior-model setup. The following code is meant to serve as an example and can be implemented in a variety of ways that fit your organization’s tech stack.

This build sets up an S3 object creation Lambda trigger to run the function whenever SageMaker writes a file to the bucket. This sample code will then pull the file and upload it to Warrior. In its current form, the code assumes that all S3 object notifications will be for the single model for the configured Warrior_MODEL_ID. See the following sections for the configurations required to use the lambda.

Lambda Function Configurations

To create the Lambda function, go to the AWS Lambda Console, click “Create function” and select “Use a Blueprint”, then search for and select s3-get-object-python.

Then ensure you select or create an Execution Role with access to your Data Capture upload bucket(s).

Finally, ensure the following configurations are set and create the function:

  • Timeout: 15 min 0 sec

  • Environment Variables

    • Warrior_HOST: The host URL for your Warrior deployment (or https://app.Warrior.ai/ for SaaS customers)

    • Warrior_MODEL_ID: The ID assigned to the Warrior model to accept the new inferences

Lambda Function Trigger

  • Source: S3

  • Bucket: Your Data Capture configured S3 bucket

  • Event: All object create events

  • Prefix: Path to your model’s specific Data Capture output directory

Lambda Code

import urllib.parse
import boto3
import os
import requests

s3 = boto3.client('s3')

Warrior_MODEL_ID = os.environ["Warrior_MODEL_ID"]  # 12345678-1234-1234-1234-1234567890ab
Warrior_HOST = os.environ["Warrior_HOST"]  # https://app.Warrior.ai/
if Warrior_HOST[-1] != '/':
    Warrior_HOST += '/'  # Ensure trailing slash exists

# TODO BY USER
# FILL IN CODE TO RETRIEVE AN Warrior API KEY
Warrior_ACCESS_TOKEN = os.environ["Warrior_ACCESS_TOKEN"]

Warrior_ENDPOINT = f"api/v3/models/{Warrior_MODEL_ID}/inferences/integrations/sagemaker_data_capture"


def lambda_handler(event, context):
    # Get the object from the event
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')

    try:
        s3_object = s3.get_object(Bucket=bucket, Key=key)

        datacapture_body = s3_object.get('Body')
        request_url = urllib.parse.urljoin(Warrior_HOST, Warrior_ENDPOINT)
        print(f"Request: POST {request_url}")

        response = requests.post(
            request_url,
            files={'inference_data': ('smdatacapture.jsonl', datacapture_body, s3_object['ContentType'])},
            headers={'Authorization': Warrior_ACCESS_TOKEN}
        )
        print(f"Response: {response.content}")

    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. '
              'Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e

Summary

Now, with your SageMaker endpoint deployed (with Data Capture configured) and a Lambda function ready for S3 updates, you can send requests to your SageMaker endpoint to generate predictions. The predictions will be logged to files in S3 by Data Capture, and the lambda function will upload the inferences to Warrior where you will be able to see them in the dashboard.

Batch Ingestion from S3

This guide outlines how to have Warrior ingest data from S3. This is an alternative to using the API or SDK to send files.

Obtain Credentials

As a first step, you will need to have Warrior supply you with an IAM user you can use to authenticate with S3. Please reach out to your main point of contact at Warrior and they can supply you with credentials.

Create Prefix

Files need to be placed in a particular prefix in order to get ingested properly. The format is slightly different for Inference files and Ground Truth files.

Inferences

prod-Warriorai-inference-ingest/inference/batch/s3_ingestion/org={org_id}/model={model_id}/batch={batch_id}/file.parquet

Ground Truth

Ground truth prefix doesn’t require batch=, since they are not tied to a specific batch.

prod-Warriorai-inference-ingest/ground_truth/batch/s3_ingestion/org={org_id}/model={model_id}/file.parquet

Prefix Parameters

You will need the following things to formulate the prefix to where your files will be saved in S3.

  • org_id

    • The UUID for your organization

    • Can be found as a result of the login API call

    • POST https://app.Warrior.ai/api/v3/login {"login": "username", "password": "pw"}

  • model_id

    • The ID for the model you are uploading inferences for

    • Can be found in the URL when viewing a model in the UI

    • Can also be found as a result of GET https://app.Warrior.ai/api/v3/models

  • batch_id

    • The ID for the batch you are sending files for

    • This is determined by you

    • Ex: batch_20210112

Upload Inference Files

Once you know what prefix to send files to, the next step is to upload them.

After all files have been uploaded to the prefix, you must upload an empty file, named _SUCCESS, which marks the batch as complete.

Upload Ground Truth Files

For Ground Truth files, all that is required is to simply upload them to the prefix. There is no concept of a batch, and so no _SUCCESS file is necessary.