Unifying Real-Time And Batch Inference With BentoML And Spark

Jan 24, 2023 • Written By Sauyon Lee

After talking to many members of the BentoML community, it has become clear that there is a common need to serve the same model both in real-time and for batch inference. While BentoML excels at real-time model serving, teams have to re-implement logic for batch serving. In response to this need, BentoML has developed a new batch inference API that utilizes Apache Spark, making it easy to run models on large datasets. More importantly, this new integration allows teams to use the same Bento for both real-time and batch serving without any additional code changes.

image.png

In a nutshell, BentoML introduced the run_in_spark function under the bentoml.batch APIs. This function allows users to perform inferences on Spark DataFrames using any Bento, by providing the Spark session, and returns the results as Spark DataFrames.

output_df = bentoml.batch.run_in_spark(bento, "classify", input_df, spark)

Without further ado, let’s dive into how to run BentoML on Spark.

Apache Spark

We chose to integrate with Apache Spark as a first step in the process because it is a de facto standard for working with large datasets in batches. Its role is to automate the division and parallelization of operations given to it.

The new BentoML batch processing API requires you to have an Apache Spark cluster with version 3.3.0 or above already setup. Here some are links to the documentation for some popular cloud-based services that provide managed Apache Spark clusters:

These services provide managed Spark clusters that you can use without having to worry about the underlying infrastructure.

If you don’t already have a Spark cluster available, Google VertexAI provides an easy way to start a serverless Spark cluster with access via Jupyter notebook. Alternatively, if you just want to test out the new Spark API, you can launch a local Spark cluster.

In addition, both BentoML and your service’s dependencies (including model dependencies) must also be installed in the Spark cluster. Most likely, the service you are hosting Spark on has its own mechanisms for doing this. If you are using a standalone cluster, you should install those dependencies on every node you expect to use.

pip install bentoml

Running Batch Inference

The following assumes you have a BentoML service ready and know the basics of BentoML. If you’d like to learn more about BentoML, see the BentoML tutorial. Also, ensure that you have at least BentoML 1.0.13 and Spark version 3.3.0.

For this example, we’ll be using the quickstart bento from that tutorial, but the commands should work for bentos with IO descriptors which support batch inference (at the time of writing, those are bentoml.io.NumpyNdarray, bentoml.io.PandasDataFrame and bentoml.io.PandasSeries).

Ensure the Bento APIs you wish to run are capable of accepting multiple inputs, and that the zeroth dimension must be the batch dimension. For example, batch_classify(np.array([[input_1], [input_2]])) must work, and return np.array([[output_1], [output_2]]). The quickstart bento supports this pattern because the iris_classifier model it contains does.

Create The SparkSession

Create a PySpark SparkSession object. This will be used to create a DataFrame from the input data, and to run the batch inference job. If you’re running in a notebook with spark already (e.g. a VertexAI PySpark notebook or a Databricks Notebook), you can skip this step.

from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate()

Prepare The Input

Load the input data into a PySpark DataFrame. If you are using multipart input, or your dataframe requires column names, you must also provide a schema for your DataFrame as you load it. You can do this using the spark.read.csv() method, which takes a file path as input and returns a DataFrame containing the data from the file.

from pyspark.sql.types import StructType, StructField, FloatType, StringType schema = SparkStruct( StructField(name=”sepal_length”, FloatType(), False), StructField(name=”sepal_width”, FloatType(), False), StructField(name=”petal_length”, FloatType(), False), StructField(name=”petal_width”, FloatType(), False), ) dataframe = pyspark.read.csv("https://docs.bentoml.org/en/latest/_static/examples/batch/input.csv", schema)

Prepare The Bento

Create a BentoService object using the BentoML service you want to use for the batch inference job. You can do this by calling the bentoml.get function, and passing the name of the bento and its version as a parameter.

import bentoml bento = bentoml.get("iris_classifier:latest")

Run Batch Inference

Run the batch inference job using the bentoml.batch.run_in_spark() method. This method takes the API name, the Spark DataFrame containing the input data, and the Spark session itself as parameters, and it returns a DataFrame containing the results of the batch inference job.

output_df = bentoml.batch.run_in_spark(bento, "classify", input_df, spark)

First, the bento is distributed to the cluster. Note that if the bento has already been distributed, i.e. you have already run a computation with that bento, this step is skipped.

Next, a process function is created, which runs the API method on every Spark batch given it. The batch size can be controlled by setting _spark.sql.execution.arrow.maxRecordsPerBatch_. PySpark pickles this process function and dispatches it, along with the relevant data, to the workers.

Finally, the function is evaluated on the given dataframe. Once all methods that the user defined in the script have been executed, the data is returned to the master node.

Save The Results

Finally, save the results of the batch inference job to a file using the DataFrame’s write.csv() method. This method saves the contents of the DataFrame to the specified folder.

output_df.write.csv("output")

Upon success, you should see multiple files in the output folder: an empty _SUCCESS file and one or more part-*.csv files containing your output.

output ├── part-00000-44a5cc63-a505-47c3-9d68-e9691d0010fc-c000.csv └── _SUCCESS

🎉 Congratulations! You've just unlocked the power of BentoML and PySpark by writing a script that utilizes BentoML's batch inference API. This script enables you to run batch inference jobs on large datasets with ease. Whether you're running on a standalone cluster or a distributed environment, you can use the spark-submit command to execute the script and see the results in the output file.

spark-submit my_script.py

With BentoML and PySpark, you now have a powerful solution for running batch inference jobs and scaling your machine learning models for both online and offline inferences. More detailed API documentation can be found on BentoML Spark integration. Share your experience with the community and let's learn together!