sorry can't get this for you
P

6 minutes read

March 29, 2023

Tutorial: How to integrate a Stable Diffusion pipeline with an external data store

How to upload you Stable Diffusion inference images to an external MinIO data store.

For those already familiar with PipelineCloud, it's clear how powerful of a platform it can be for running complex ML tasks without the need for expensive hardware or the hassle of managing ML-ops infrastructure. But you also have the ability to execute arbitrary Python on the PipelineCloud servers, providing you with significant flexibility and authority over your ML projects. For instance, you could connect up to a third party data store and save your inference results there.In this guide, we'll show you how to save inference images to an external data store each time a prediction is made. As our inference model, we'll be using a HuggingFace pretrained stable-diffusion pipeline and building on deploy a stable diffusion pipeline. We'll be updating some of the code laid out in that guide, so make sure you've gone through that example first.Assumes prior knowledge. This guide assumes you are already familiar with deploying HuggingFace diffusers models on PipelineCloud and will build on deploy a stable diffusion pipeline.As our data store, we'll be connecting to a public MinIO server. MinIO is an open-source object storage server that allows you to store and access large amounts of unstructured data. It is compatible with the Amazon S3 API, which means that you can use the same tools and applications that you would use with S3, to interact with Minio. In practice, all you'll have to do to connect up with an Amazon S3 bucket instead, is change some environment variables!NOTE: This is a walkthrough, so many of the below code snippets are mere chunks of a larger script. If you're skimming or just want to see code, then skip to the conclusion where you'll find the complete script.

Background

In deploy a stable diffusion pipeline, we saw how to package a pretrained stable-diffusion HuggingFace pipeline into a deployable unit of code and upload that code to PipelineCloud. This enabled us to perform remote inferences in the cloud by making a HTTP POST request to an endpoint on the pipeline-ai API gateway. The deployable unit of code is a pipeline-ai pipeline. This is where you define the set of instructions that should be followed when an inference call is made to the endpoint. For instance, in the stable diffusion pipeline, the instructions involved creating a model instance, loading the HF model into memory, parsing the input to the pipeline and then passing it to the model to generate a list of images.
Now say you want the generated images to be uploaded to an external storage bucket. There are a number of ways you could achieve this. You could implement this client-side, where your client application makes a POST request to api.pipeline.ai/v2/runs, waits for the response, and then handles the logic for uploading the result. Instead, you could set up some CRON job that periodically queries your pipeline's runs, then uploads any missing run results. In this guide however, we'll show you how to handle the uploads server-side, within the execution of the pipeline itself. To achieve this, we'll just need to add an additional step at the end of the pipeline which will handle that logic.So that's where we're heading. First, though, we'll implement a simple storage client class to interface with MinIO and handle the uploads. Seeing as we'll be using the Python minio SDK for this, we'll also need to set up a custom Python environment on PipelineCloud, as it isn't available in any of the public environments.

Uploading to MinIO

The MinIO server playground

The MinIO Server Playground is a web-based user interface that allows you to quickly and easily test the functionality of the MinIO server without having to set up your own instance. It provides a sandbox environment where you can experiment with MinIO features, such as bucket creation, object uploading, and metadata management. Navigate to https://play.min.io/ and login using the access key Q3AM3UQ867SPQQA43P2F and secret key zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG. You should see a bunch of random buckets there.We'll be interacting with this public server instead of creating our own instance or setting up our own bucket on Amazon S3. In a production setting though, you'll obviously want to set something like that up. But after you've created your Amazon S3 bucket say, everything should work the same, you'll just need to update your environment variables to point to the right bucket. In your project directory, create a .env file with the following environment variables:
1S3_ENDPOINT=play.min.io
2S3_ACCESS_KEY=Q3AM3UQ867SPQQA43P2F
3S3_SECRET_KEY=zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG
4S3_BUCKET=stable-diffusion-pipeline-files
We'll be saving our images to a bucket called stable-diffusion-pipeline-files. You can see what images other folk following this guide have been generating by checking if the bucket exists on https://play.min.io/, although buckets are purged pretty regularly so there may not be anything.

Implementing a Client wrapper

Let's build out a simple StorageClient class which wraps around the Minio Client. We'll implement 2 methods for uploading data to a data store: one for creating a new bucket and another for uploading objects to the bucket. Since we'll be interacting with the MinIO API using theminio Python SDK, make sure you have added it to your local environment, using pippoetry or whichever package manager you prefer, e.g. poetry add minio. We'll also be loading the .env environment variables automatically using the dotenv package, so you'll want to install that too. This saves us from having to export variables each time when we open a new shell.
1import io
2import os
3
4from dotenv import load_dotenv
5from minio import Minio
6
7load_dotenv()
8
9
10class StorageClient:
11    """
12    A wrapper class for uploading files to an S3 bucket.
13    Default storage client is the Minio Client. Inject another client if you'd like,
14    but ensure the client interface matches the methods called here.
15    """
16
17    def __init__(
18        self,
19        Client=Minio,
20        endpoint=os.getenv("S3_ENDPOINT"),
21        access_key=os.getenv("S3_ACCESS_KEY"),
22        secret_key=os.getenv("S3_SECRET_KEY"),
23    ):
24        self.client = Client(endpoint, access_key, secret_key)
25
26    def _create_bucket(self, bucket: str):
27        """Create a new bucket for storing images, if it does not already exist"""
28        if not self.client.bucket_exists(bucket):
29            self.client.make_bucket(bucket)
30
31    def upload(
32        self,
33        object_name: str,
34        data: bytes,
35        bucket=os.getenv("S3_BUCKET"),
36    ):
37        """Upload a file to an S3 bucket"""
38        length = len(data)
39        data_stream = io.BytesIO(data)
40        self._create_bucket(bucket)
41        self.client.put_object(bucket, object_name, data_stream, length)
The code here should hopefully be pretty straight forward. When we create a new StorageClient instance, we inject the Client and associated credentials into the constructor. That way the StorageClient is less coupled to the specific Minio client, so we could swap it out for another client which implements the methods put_objectmake_bucket and bucket_exists . We then have a "private" method _create_bucket, which creates a new storage bucket if it doesn't already exist. Finally, we have our core upload method. The object_name parameter allows you to distinguish the uploaded file from other files in the bucket. We also need to provide the file data, which we'll pass in as bytes and converted into a buffered stream inside the method. You can try out the above script, for instance by adding the following lines of code:
1if __name__ == "__main__":
2    from datetime import datetime
3    now = datetime.utcnow()
4    data = b"Some binary data: \x00\x01"
5    client = StorageClient()
6    client.upload(f"pipeline-test_{now}", data, bucket="pipeline-test")

If all went well you should now see a new file saved to the "pipeline-test" bucket on the MinIO playground server.

This is pretty much the bulk of what we'll need to save inference images of our stable-diffusion pipeline to the data store. The only extra thing we'll need is to embed logic for calling the StorageClient.upload method in the workflow of the stable diffusion pipeline. But we'll get to that later, once we've set up the custom environment.

Creating the remote Python environment

For the pipeline that we'll be developing, we'll need some Python packages that aren't included in the default environment , e.g. minio. This means that we'll need to create a new custom environment and add all the required packages. The easiest way to achieve this is by using the pipeline-ai CLI. We recommend that you have the latest version of pipeline-ai installed.

First login with the pipeline CLI. We will be interacting with the Pipeline API using the CLI and assume you have authenticated. For more information about how to authenticate using the CLI, see the authentication guide.

To create a new environment, named sd-minio say, then simply run

1pipeline environments create sd-minio

in a shell with your local environment (with pipeline-ai) activated. You can check that it was created successfully by fetching it by name:

1pipeline environments get -n sd-minio

Here you should see a response with an empty list of python_requirements, which are the Python packages in your environment. Then create a local requirements.txt file containing the following lines:

1transformers==4.26.1
2torch==1.13.1
3diffusers==0.13.1
4accelerate==0.17.1
5minio==7.1.14

and then add all these packages to your custom environment by running:

1cat requirements.txt | xargs pipeline environments update -n sd-minio add

You should now see these packages in the environment python_requirements. Note that you'll need the ID of your custom environment when uploading the pipeline to PipelineCloud.

Integrating MinIO uploads

Now that we've set up our MinIO client and custom environment, we're ready to integrate uploads to MinIO into the stable diffusion pipeline:
1import typing as t
2
3from PIL.Image import Image
4from pipeline import Pipeline, Variable
5
6
7PIPELINE_NAME = "sd-dreambooth"
8
9# The `pipeline-ai` pipeline
10with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
11    # Define pipeline inputs
12    input_kwargs = Variable(dict, is_input=True)
13    pipeline.add_variables(input_kwargs)
14
15    # Create and load model
16    model = SDDreambooth()
17    model.load()
18
19    # Feed inputs to model
20    context: InputKwargs = model.set_kwargs(input_kwargs)
21    images: t.List[Image] = model.predict()
22
23    # Format the images and output result
24    formatted_images: t.List[str] = model.format_images(images)
25    pipeline.output(formatted_images)
26
27    # Upload images to MinIO
28    save_to_store(formatted_images)
Only 1 change has been made to the original pipeline, where have added a function call save_to_store at the end of the pipeline which will be responsible for uploading the output of the pipeline, formatted_images, to MinIO.Since we are calling the save_to_store within a Pipeline context manager and want the runtime values to be passed to the function, we need to decorate it with a pipeline_function decorator:
1@pipeline_function
2def save_to_store(images: t.List[Image]) -> None:
3    import base64
4    from datetime import datetime
5
6    client = StorageClient()
7    for image in images:
8        now = datetime.utcnow()
9        image_bytes = base64.b64decode(image)
10        client.upload(f"dreambooth-{now}.jpeg", image_bytes)
Here we simply instantiate the client and upload each of the generated images. Notice that we base64 decode each image before uploading, in order to be able to preview them on the MinIO playground server.

Running the pipeline on Pipeline Cloud

Before we can run the pipeline on Pipeline Cloud, we need to upload it to the servers. Assuming you have authenticated using the CLI , we 'get' the pipeline, before instantiating a connection to Pipeline Cloud and uploading our pipeline:
1from pipeline import PipelineCloud
2
3pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
4api = PipelineCloud()
5uploaded_pipeline = api.upload_pipeline(
6        pipeline, environment="YOUR_ENVIRONMENT_ID"
7)
8print(f"Uploaded pipeline id: {uploaded_pipeline.id}")
Just be sure to replace YOUR_ENVIRONMENT_ID with the ID of the custom environment you created previously, which you can get using the CLI:
1pipeline environments get -n sd-minio

During this stage, the pipeline-ai library will serialize all your code and post your pipeline to an endpoint for creating pipelines in our main API gateway.

And now we run the pipeline, supplying an input dictionary of type InputKwargs :

1run = api.run_pipeline(
2    uploaded_pipeline.id,
3    {
4        "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
5        "num_inference_steps": 100
6    },
7)

After running this you should be able to preview your inference images on the server! The first time you run the pipeline, it will take about a minute because the pipeline won't be cached on our servers. Subsequent runs won't be subject to this cold start though and should be pretty speedy! Just make sure you move the run_pipeline call into another script and don't execute the whole script again because you'll be uploading a new pipeline each time .

Conclusion

In this guide, we saw how to connect up our Python application with MinIO using the minio package and start uploading data to a data store. We created a storage client wrapper class around the MinIO Client which handles uploading files to a bucket on the server. We then updated the stable diffusion pipeline by adding a new pipeline_function call which uploads the inference images after each prediction.

Complete Script

1import base64
2import io
3import os
4import random
5import typing as t
6
7import numpy as np
8import torch
9from diffusers.utils import logging
10from dotenv import load_dotenv
11from minio import Minio
12from PIL.Image import Image
13from pipeline import (
14    Pipeline,
15    PipelineCloud,
16    Variable,
17    pipeline_function,
18    pipeline_model,
19)
20
21load_dotenv()
22
23PIPELINE_NAME = "sd-dreambooth"
24
25
26class StorageClient:
27    """
28    A wrapper class for uploading files to an S3 bucket.
29    Default storage client is the Minio Client. Inject another client if you'd like,
30    but ensure the client interface matches the methods called here.
31    """
32
33    def __init__(
34        self,
35        Client=Minio,
36        endpoint=os.getenv("S3_ENDPOINT"),
37        access_key=os.getenv("S3_ACCESS_KEY"),
38        secret_key=os.getenv("S3_SECRET_KEY"),
39    ):
40        self.client = Client(endpoint, access_key, secret_key)
41
42    def _create_bucket(self, bucket: str):
43        """Create a new bucket for storing images, if it does not already exist"""
44        if not self.client.bucket_exists(bucket):
45            self.client.make_bucket(bucket)
46
47    def upload(
48        self,
49        object_name: str,
50        data: bytes,
51        bucket=os.getenv("S3_BUCKET"),
52    ):
53        """Upload a file to an S3 bucket"""
54        length = len(data)
55        data_stream = io.BytesIO(data)
56        self._create_bucket(bucket)
57        self.client.put_object(bucket, object_name, data_stream, length)
58
59
60logging.disable_progress_bar()
61logging.set_verbosity_error()
62
63
64# The shape of the input keyword arguments
65class InputKwargs(t.TypedDict):
66    prompt: str
67    num_images_per_prompt: t.Optional[int]
68    height: t.Optional[int]
69    width: t.Optional[int]
70    num_inference_steps: t.Optional[int]
71    guidance_scale: t.Optional[float]
72    eta: t.Optional[float]
73    seed: t.Optional[int]
74
75
76DEFAULT_KWARGS: InputKwargs = {
77    "prompt": "Mountain winds and babbling springs and moonlight seas.",
78    "num_images_per_prompt": 1,
79    "height": 512,
80    "width": 512,
81    "num_inference_steps": 50,
82    "guidance_scale": 7.5,
83    "eta": 0.0,
84    "seed": None,
85}
86
87
88@pipeline_model
89class SDDreambooth:
90    def __init__(self) -> None:
91        self.input_kwargs = None
92        self.model = None
93
94    @pipeline_function(run_once=True, on_startup=True)
95    def load(self) -> None:
96        """
97        Load the model into memory. The decorator parameters ensure the
98        model is loaded only when needed, i.e. it is not cached on the GPU.
99        """
100        from diffusers import DiffusionPipeline
101
102        device = torch.device("cuda:0")
103        self.model = DiffusionPipeline.from_pretrained(
104            "sd-dreambooth-library/herge-style"
105        )
106        self.model.to(device)
107
108    @pipeline_function
109    def set_kwargs(self, input_kwargs: InputKwargs) -> InputKwargs:
110        """
111        Set the model kwargs given the input kwargs.
112        These are used in other methods.
113        """
114        self.input_kwargs = {**DEFAULT_KWARGS, **input_kwargs}
115        return self.input_kwargs
116
117    @pipeline_function
118    def seed_everything(self) -> int:
119        """
120        Sets seed for pseudo-random number generators in: pytorch, numpy, python.random.
121        `PL_GLOBAL_SEED` ensures the seed is passed to any spawned subprocesses.
122        """
123        seed = self.input_kwargs.pop("seed") or random.randint(1, 1_000_000)
124        os.environ["PL_GLOBAL_SEED"] = str(seed)
125        random.seed(seed)
126        np.random.seed(seed)
127        torch.manual_seed(seed)
128        torch.cuda.manual_seed_all(seed)
129        return seed
130
131    @pipeline_function
132    def predict(self) -> t.List[Image]:
133        """
134        A forward pass through the network given the `input_kwargs`.
135        """
136        # Ensure the input kwargs have been set
137        if self.input_kwargs is None:
138            raise TypeError(
139                "Input kwargs cannot be None. Set them before calling this method."
140            )
141        seed = self.seed_everything()
142        generator = torch.Generator(device=0).manual_seed(seed)
143
144        images = self.model(**self.input_kwargs, generator=generator).images
145
146        return images
147
148    @pipeline_function
149    def to_string(self, image: Image) -> str:
150        """
151        Converts a `PIL` image to a base64 encoded string.
152        """
153        buffered = io.BytesIO()
154        image.save(buffered, format="JPEG")
155        img_str = base64.b64encode(buffered.getvalue()).decode()
156        return img_str
157
158    @pipeline_function
159    def format_images(self, images: t.List[Image]) -> t.List[str]:
160        """
161        Formats a list of `PIL` images into a list of base64 encoded strings.
162        """
163        return [self.to_string(image) for image in images]
164
165
166@pipeline_function
167def save_to_store(images: t.List[Image]) -> None:
168    import base64
169    from datetime import datetime
170
171    client = StorageClient()
172    for image in images:
173        now = datetime.utcnow()
174        image_bytes = base64.b64decode(image)
175        client.upload(f"dreambooth-{now}.jpeg", image_bytes)
176
177
178with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
179    # Define pipeline inputs
180    input_kwargs = Variable(dict, is_input=True)
181    pipeline.add_variables(input_kwargs)
182
183    # Create and load model
184    model = SDDreambooth()
185    model.load()
186
187    # Feed inputs to model
188    context: InputKwargs = model.set_kwargs(input_kwargs)
189    images: t.List[Image] = model.predict()
190
191    # Format the images and output result
192    formatted_images: t.List[str] = model.format_images(images)
193    pipeline.output(formatted_images)
194    
195    # Upload images to MinIO
196    save_to_store(formatted_images)
197
198
199sd_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
200api = PipelineCloud()
201uploaded_pipeline = api.upload_pipeline(
202    sd_pipeline, environment="YOUR_ENVIRONMENT_ID"
203)
204print(f"Uploaded pipeline id: {uploaded_pipeline.id}")
205
206run = api.run_pipeline(
207        "pipeline_9364a03735bd41f8bbf40b6524d48d22",
208    {
209            "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
210        "num_inference_steps": 100,
211    },
212)

ABOUT PIPELINE.AIPipeline AI makes it easy to work with ML models and to deploy AI at scale. The self-serve platform provides a fast pay-as-you-go API to run pretrained or proprietory models in production. If you are looking to deploy a large product and would like to sign up as an Enterprise customer please get in touch.Follow us on Twitter and Linkedin.