
6 minutes read
March 29, 2023
Tutorial: How to integrate a Stable Diffusion pipeline with an external data store
How to upload you Stable Diffusion inference images to an external MinIO data store.
PipelineCloud
, it's clear how powerful of a platform it can be for running complex ML tasks without the need for expensive hardware or the hassle of managing ML-ops infrastructure. But you also have the ability to execute arbitrary Python on the PipelineCloud
servers, providing you with significant flexibility and authority over your ML projects. For instance, you could connect up to a third party data store and save your inference results there.In this guide, we'll show you how to save inference images to an external data store each time a prediction is made. As our inference model, we'll be using a HuggingFace pretrained stable-diffusion pipeline and building on deploy a stable diffusion pipeline. We'll be updating some of the code laid out in that guide, so make sure you've gone through that example first.Assumes prior knowledge. This guide assumes you are already familiar with deploying HuggingFace diffusers
models on PipelineCloud
and will build on deploy a stable diffusion pipeline.As our data store, we'll be connecting to a public MinIO server. MinIO is an open-source object storage server that allows you to store and access large amounts of unstructured data. It is compatible with the Amazon S3 API, which means that you can use the same tools and applications that you would use with S3, to interact with Minio. In practice, all you'll have to do to connect up with an Amazon S3 bucket instead, is change some environment variables!NOTE: This is a walkthrough, so many of the below code snippets are mere chunks of a larger script. If you're skimming or just want to see code, then skip to the conclusion where you'll find the complete script.Background
In deploy a stable diffusion pipeline, we saw how to package a pretrained stable-diffusion HuggingFace pipeline into a deployable unit of code and upload that code toPipelineCloud
. This enabled us to perform remote inferences in the cloud by making a HTTP POST
request to an endpoint on the pipeline-ai
API gateway. The deployable unit of code is a pipeline-ai
pipeline. This is where you define the set of instructions that should be followed when an inference call is made to the endpoint. For instance, in the stable diffusion pipeline, the instructions involved creating a model instance, loading the HF model into memory, parsing the input to the pipeline and then passing it to the model to generate a list of images.
POST
request to api.pipeline.ai/v2/runs
, waits for the response, and then handles the logic for uploading the result. Instead, you could set up some CRON
job that periodically queries your pipeline's runs, then uploads any missing run results. In this guide however, we'll show you how to handle the uploads server-side, within the execution of the pipeline itself. To achieve this, we'll just need to add an additional step at the end of the pipeline which will handle that logic.So that's where we're heading. First, though, we'll implement a simple storage client class to interface with MinIO and handle the uploads. Seeing as we'll be using the Python minio
SDK for this, we'll also need to set up a custom Python environment on PipelineCloud
, as it isn't available in any of the public environments.Uploading to MinIO
The MinIO server playground
The MinIO Server Playground is a web-based user interface that allows you to quickly and easily test the functionality of the MinIO server without having to set up your own instance. It provides a sandbox environment where you can experiment with MinIO features, such as bucket creation, object uploading, and metadata management. Navigate to https://play.min.io/ and login using the access keyQ3AM3UQ867SPQQA43P2F
and secret key zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG
. You should see a bunch of random buckets there.We'll be interacting with this public server instead of creating our own instance or setting up our own bucket on Amazon S3. In a production setting though, you'll obviously want to set something like that up. But after you've created your Amazon S3 bucket say, everything should work the same, you'll just need to update your environment variables to point to the right bucket.
In your project directory, create a .env
file with the following environment variables:1S3_ENDPOINT=play.min.io
2S3_ACCESS_KEY=Q3AM3UQ867SPQQA43P2F
3S3_SECRET_KEY=zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG
4S3_BUCKET=stable-diffusion-pipeline-files
stable-diffusion-pipeline-files
. You can see what images other folk following this guide have been generating by checking if the bucket exists on https://play.min.io/, although buckets are purged pretty regularly so there may not be anything.Implementing a Client wrapper
Let's build out a simpleStorageClient
class which wraps around the Minio Client
. We'll implement 2 methods for uploading data to a data store: one for creating a new bucket and another for uploading objects to the bucket. Since we'll be interacting with the MinIO API using theminio
Python SDK, make sure you have added it to your local environment, using pip
, poetry
or whichever package manager you prefer, e.g. poetry add minio
. We'll also be loading the .env
environment variables automatically using the dotenv
package, so you'll want to install that too. This saves us from having to export variables each time when we open a new shell.1import io
2import os
3
4from dotenv import load_dotenv
5from minio import Minio
6
7load_dotenv()
8
9
10class StorageClient:
11 """
12 A wrapper class for uploading files to an S3 bucket.
13 Default storage client is the Minio Client. Inject another client if you'd like,
14 but ensure the client interface matches the methods called here.
15 """
16
17 def __init__(
18 self,
19 Client=Minio,
20 endpoint=os.getenv("S3_ENDPOINT"),
21 access_key=os.getenv("S3_ACCESS_KEY"),
22 secret_key=os.getenv("S3_SECRET_KEY"),
23 ):
24 self.client = Client(endpoint, access_key, secret_key)
25
26 def _create_bucket(self, bucket: str):
27 """Create a new bucket for storing images, if it does not already exist"""
28 if not self.client.bucket_exists(bucket):
29 self.client.make_bucket(bucket)
30
31 def upload(
32 self,
33 object_name: str,
34 data: bytes,
35 bucket=os.getenv("S3_BUCKET"),
36 ):
37 """Upload a file to an S3 bucket"""
38 length = len(data)
39 data_stream = io.BytesIO(data)
40 self._create_bucket(bucket)
41 self.client.put_object(bucket, object_name, data_stream, length)
StorageClient
instance, we inject the Client
and associated credentials into the constructor. That way the StorageClient
is less coupled to the specific Minio
client, so we could swap it out for another client which implements the methods put_object
, make_bucket
and bucket_exists
. We then have a "private" method _create_bucket
, which creates a new storage bucket if it doesn't already exist. Finally, we have our core upload
method. The object_name
parameter allows you to distinguish the uploaded file from other files in the bucket. We also need to provide the file data, which we'll pass in as bytes and converted into a buffered stream inside the method.
You can try out the above script, for instance by adding the following lines of code:1if __name__ == "__main__":
2 from datetime import datetime
3 now = datetime.utcnow()
4 data = b"Some binary data: \x00\x01"
5 client = StorageClient()
6 client.upload(f"pipeline-test_{now}", data, bucket="pipeline-test")
If all went well you should now see a new file saved to the "pipeline-test" bucket on the MinIO playground server.
This is pretty much the bulk of what we'll need to save inference images of our stable-diffusion pipeline to the data store. The only extra thing we'll need is to embed logic for calling the StorageClient.upload
method in the workflow of the stable diffusion pipeline. But we'll get to that later, once we've set up the custom environment.
Creating the remote Python environment
For the pipeline that we'll be developing, we'll need some Python packages that aren't included in the default environment , e.g. minio
. This means that we'll need to create a new custom environment and add all the required packages. The easiest way to achieve this is by using the pipeline-ai
CLI. We recommend that you have the latest version of pipeline-ai
installed.
To create a new environment, named sd-minio
say, then simply run
1pipeline environments create sd-minio
in a shell with your local environment (with pipeline-ai
) activated. You can check that it was created successfully by fetching it by name:
1pipeline environments get -n sd-minio
Here you should see a response with an empty list of python_requirements
, which are the Python packages in your environment. Then create a local requirements.txt
file containing the following lines:
1transformers==4.26.1
2torch==1.13.1
3diffusers==0.13.1
4accelerate==0.17.1
5minio==7.1.14
and then add all these packages to your custom environment by running:
1cat requirements.txt | xargs pipeline environments update -n sd-minio add
You should now see these packages in the environment python_requirements
. Note that you'll need the ID of your custom environment when uploading the pipeline to PipelineCloud
.
Integrating MinIO uploads
Now that we've set up our MinIO client and custom environment, we're ready to integrate uploads to MinIO into the stable diffusion pipeline:1import typing as t
2
3from PIL.Image import Image
4from pipeline import Pipeline, Variable
5
6
7PIPELINE_NAME = "sd-dreambooth"
8
9# The `pipeline-ai` pipeline
10with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
11 # Define pipeline inputs
12 input_kwargs = Variable(dict, is_input=True)
13 pipeline.add_variables(input_kwargs)
14
15 # Create and load model
16 model = SDDreambooth()
17 model.load()
18
19 # Feed inputs to model
20 context: InputKwargs = model.set_kwargs(input_kwargs)
21 images: t.List[Image] = model.predict()
22
23 # Format the images and output result
24 formatted_images: t.List[str] = model.format_images(images)
25 pipeline.output(formatted_images)
26
27 # Upload images to MinIO
28 save_to_store(formatted_images)
save_to_store
at the end of the pipeline which will be responsible for uploading the output of the pipeline, formatted_images
, to MinIO.Since we are calling the save_to_store
within a Pipeline
context manager and want the runtime values to be passed to the function, we need to decorate it with a pipeline_function
decorator:1@pipeline_function
2def save_to_store(images: t.List[Image]) -> None:
3 import base64
4 from datetime import datetime
5
6 client = StorageClient()
7 for image in images:
8 now = datetime.utcnow()
9 image_bytes = base64.b64decode(image)
10 client.upload(f"dreambooth-{now}.jpeg", image_bytes)
base64
decode each image before uploading, in order to be able to preview them on the MinIO playground server.Running the pipeline on Pipeline Cloud
Before we can run the pipeline on Pipeline Cloud, we need to upload it to the servers. Assuming you have authenticated using the CLI , we 'get' the pipeline, before instantiating a connection to Pipeline Cloud and uploading our pipeline:1from pipeline import PipelineCloud
2
3pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
4api = PipelineCloud()
5uploaded_pipeline = api.upload_pipeline(
6 pipeline, environment="YOUR_ENVIRONMENT_ID"
7)
8print(f"Uploaded pipeline id: {uploaded_pipeline.id}")
YOUR_ENVIRONMENT_ID
with the ID of the custom environment you created previously, which you can get using the CLI:1pipeline environments get -n sd-minio
During this stage, the pipeline-ai
library will serialize all your code and post your pipeline to an endpoint for creating pipelines in our main API gateway.
And now we run the pipeline, supplying an input dictionary of type InputKwargs
:
1run = api.run_pipeline(
2 uploaded_pipeline.id,
3 {
4 "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
5 "num_inference_steps": 100
6 },
7)
After running this you should be able to preview your inference images on the server! The first time you run the pipeline, it will take about a minute because the pipeline won't be cached on our servers. Subsequent runs won't be subject to this cold start though and should be pretty speedy! Just make sure you move the run_pipeline
call into another script and don't execute the whole script again because you'll be uploading a new pipeline each time .
Conclusion
In this guide, we saw how to connect up our Python application with MinIO using the minio
package and start uploading data to a data store. We created a storage client wrapper class around the MinIO Client
which handles uploading files to a bucket on the server. We then updated the stable diffusion pipeline by adding a new pipeline_function
call which uploads the inference images after each prediction.
Complete Script
1import base64
2import io
3import os
4import random
5import typing as t
6
7import numpy as np
8import torch
9from diffusers.utils import logging
10from dotenv import load_dotenv
11from minio import Minio
12from PIL.Image import Image
13from pipeline import (
14 Pipeline,
15 PipelineCloud,
16 Variable,
17 pipeline_function,
18 pipeline_model,
19)
20
21load_dotenv()
22
23PIPELINE_NAME = "sd-dreambooth"
24
25
26class StorageClient:
27 """
28 A wrapper class for uploading files to an S3 bucket.
29 Default storage client is the Minio Client. Inject another client if you'd like,
30 but ensure the client interface matches the methods called here.
31 """
32
33 def __init__(
34 self,
35 Client=Minio,
36 endpoint=os.getenv("S3_ENDPOINT"),
37 access_key=os.getenv("S3_ACCESS_KEY"),
38 secret_key=os.getenv("S3_SECRET_KEY"),
39 ):
40 self.client = Client(endpoint, access_key, secret_key)
41
42 def _create_bucket(self, bucket: str):
43 """Create a new bucket for storing images, if it does not already exist"""
44 if not self.client.bucket_exists(bucket):
45 self.client.make_bucket(bucket)
46
47 def upload(
48 self,
49 object_name: str,
50 data: bytes,
51 bucket=os.getenv("S3_BUCKET"),
52 ):
53 """Upload a file to an S3 bucket"""
54 length = len(data)
55 data_stream = io.BytesIO(data)
56 self._create_bucket(bucket)
57 self.client.put_object(bucket, object_name, data_stream, length)
58
59
60logging.disable_progress_bar()
61logging.set_verbosity_error()
62
63
64# The shape of the input keyword arguments
65class InputKwargs(t.TypedDict):
66 prompt: str
67 num_images_per_prompt: t.Optional[int]
68 height: t.Optional[int]
69 width: t.Optional[int]
70 num_inference_steps: t.Optional[int]
71 guidance_scale: t.Optional[float]
72 eta: t.Optional[float]
73 seed: t.Optional[int]
74
75
76DEFAULT_KWARGS: InputKwargs = {
77 "prompt": "Mountain winds and babbling springs and moonlight seas.",
78 "num_images_per_prompt": 1,
79 "height": 512,
80 "width": 512,
81 "num_inference_steps": 50,
82 "guidance_scale": 7.5,
83 "eta": 0.0,
84 "seed": None,
85}
86
87
88@pipeline_model
89class SDDreambooth:
90 def __init__(self) -> None:
91 self.input_kwargs = None
92 self.model = None
93
94 @pipeline_function(run_once=True, on_startup=True)
95 def load(self) -> None:
96 """
97 Load the model into memory. The decorator parameters ensure the
98 model is loaded only when needed, i.e. it is not cached on the GPU.
99 """
100 from diffusers import DiffusionPipeline
101
102 device = torch.device("cuda:0")
103 self.model = DiffusionPipeline.from_pretrained(
104 "sd-dreambooth-library/herge-style"
105 )
106 self.model.to(device)
107
108 @pipeline_function
109 def set_kwargs(self, input_kwargs: InputKwargs) -> InputKwargs:
110 """
111 Set the model kwargs given the input kwargs.
112 These are used in other methods.
113 """
114 self.input_kwargs = {**DEFAULT_KWARGS, **input_kwargs}
115 return self.input_kwargs
116
117 @pipeline_function
118 def seed_everything(self) -> int:
119 """
120 Sets seed for pseudo-random number generators in: pytorch, numpy, python.random.
121 `PL_GLOBAL_SEED` ensures the seed is passed to any spawned subprocesses.
122 """
123 seed = self.input_kwargs.pop("seed") or random.randint(1, 1_000_000)
124 os.environ["PL_GLOBAL_SEED"] = str(seed)
125 random.seed(seed)
126 np.random.seed(seed)
127 torch.manual_seed(seed)
128 torch.cuda.manual_seed_all(seed)
129 return seed
130
131 @pipeline_function
132 def predict(self) -> t.List[Image]:
133 """
134 A forward pass through the network given the `input_kwargs`.
135 """
136 # Ensure the input kwargs have been set
137 if self.input_kwargs is None:
138 raise TypeError(
139 "Input kwargs cannot be None. Set them before calling this method."
140 )
141 seed = self.seed_everything()
142 generator = torch.Generator(device=0).manual_seed(seed)
143
144 images = self.model(**self.input_kwargs, generator=generator).images
145
146 return images
147
148 @pipeline_function
149 def to_string(self, image: Image) -> str:
150 """
151 Converts a `PIL` image to a base64 encoded string.
152 """
153 buffered = io.BytesIO()
154 image.save(buffered, format="JPEG")
155 img_str = base64.b64encode(buffered.getvalue()).decode()
156 return img_str
157
158 @pipeline_function
159 def format_images(self, images: t.List[Image]) -> t.List[str]:
160 """
161 Formats a list of `PIL` images into a list of base64 encoded strings.
162 """
163 return [self.to_string(image) for image in images]
164
165
166@pipeline_function
167def save_to_store(images: t.List[Image]) -> None:
168 import base64
169 from datetime import datetime
170
171 client = StorageClient()
172 for image in images:
173 now = datetime.utcnow()
174 image_bytes = base64.b64decode(image)
175 client.upload(f"dreambooth-{now}.jpeg", image_bytes)
176
177
178with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
179 # Define pipeline inputs
180 input_kwargs = Variable(dict, is_input=True)
181 pipeline.add_variables(input_kwargs)
182
183 # Create and load model
184 model = SDDreambooth()
185 model.load()
186
187 # Feed inputs to model
188 context: InputKwargs = model.set_kwargs(input_kwargs)
189 images: t.List[Image] = model.predict()
190
191 # Format the images and output result
192 formatted_images: t.List[str] = model.format_images(images)
193 pipeline.output(formatted_images)
194
195 # Upload images to MinIO
196 save_to_store(formatted_images)
197
198
199sd_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
200api = PipelineCloud()
201uploaded_pipeline = api.upload_pipeline(
202 sd_pipeline, environment="YOUR_ENVIRONMENT_ID"
203)
204print(f"Uploaded pipeline id: {uploaded_pipeline.id}")
205
206run = api.run_pipeline(
207 "pipeline_9364a03735bd41f8bbf40b6524d48d22",
208 {
209 "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
210 "num_inference_steps": 100,
211 },
212)
ABOUT PIPELINE.AIPipeline AI makes it easy to work with ML models and to deploy AI at scale. The self-serve platform provides a fast pay-as-you-go API to run pretrained or proprietory models in production. If you are looking to deploy a large product and would like to sign up as an Enterprise customer please get in touch.Follow us on Twitter and Linkedin.