sorry can't get this for you
P

8 minutes read

February 26, 2023

Tutorial How to deploy a Stable Diffusion pipeline

How to deploy a pre-trained HugginFace stable diffusion pipeline on Pipeline Cloud.

Stable diffusion is a text-to-image model used to generate detailed images from text descriptions. The great thing about stable diffusion is that, in contrast to other proprietary models such as DALL-E and Midjourney, it's computational graph and model weights have been released publicly. This means that it has become readily accessible on ML sharing platforms such as HuggingFace (HF). HF provides a really simple way to use some of the best models from the open-source ML sphere.In this guide, we'll build out a stable diffusion pipeline-ai pipeline around a HF diffusers model and show you how to deploy it in the cloud seemlessly. The logic followed here can be replicated for almost any of the ~100,000 models available on HF.Once you have deployed a stable diffusion pipeline you could integrate it into your own apps. For instance, you could build a frontend web application where users can generate their own images, or you could generate your own GIFs, website designs or even convert written books into picture books.NOTE: This is a walkthrough, so many of the below code snippets are mere chunks of a larger script. If you're skimming or just want to see code, then skip to the conclusion where you'll find the complete script.

Getting started with HuggingFace diffusers

Once you've installed diffusers, it's really simple to initialise a model and start running inference on it. We'll use a stable diffusion model trained using dreambooth, sd-dreambooth-library/herge-style . It is a text-to-image model, which means it will take in an input sentence/prompt like 'Mountain winds and babbling springs and moonlight seas', and output an image related to the input prompt. It was fine-tuned with Tintin images using Dreambooth. This technique was developed by Google in order to fine-tune diffusion models by injecting a custom subject to the model. It uses a rare word for the custom subject (in our case `herge_style`) which doesn't have much meaning in the original model. For instance, "Mountain winds and babbling springs and moonlight seas, herge_style", will generate an image like

Using a pipeline from diffusers

HuggingFace makes it very easy to load any pretrained diffusion pipeline and to use it in inference, by interfacing with the DiffusionPipeline module.
Both HuggingFace and pipeline.ai use the same word 'pipeline' to mean 'a set of processing steps which convert an input to an output'. Later in this guide, we're going to embed this model within a pipeline-ai 'pipeline'.
Getting started using sd-dreambooth-library/herge-style for inference, is as simple as:
1from diffusers import DiffusionPipeline
2from PIL.Image import Image
3
4# Load the HF pipeline
5model = DiffusionPipeline.from_pretrained("sd-dreambooth-library/herge-style")
6
7# The input prompt
8prompt = "Mountain winds and babbling springs and moonlight seas, herge_style."
9
10# Generate an image from the prompt
11output_image: Image = model(prompt).images[0]
12
13# Save the image to a local file
14with open("image.jpeg", "w") as f:
15    output_image.save(f, format="JPEG")

Running the Python script will take a while but you should eventually see an image saved to a local file. What just happened here? We instantiated a pre-trained HF pipeline, and just by passing a prompt string we made a prediction, which returned a list of PIL images. We then saved the first image to a local file.

Internally, the HF pipeline assembles the model on CPU, downloads the sd-dreambooth-library/herge-style weights, and then loads them into the model. If you have a GPU attached, you can ensure the prediction takes place on your GPU instead, by creating a torch.device and moving the model (tensor) to that device:

1...
2import torch
3
4# Create a GPU device if it is available
5device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
6
7# Load the HF pipeline
8sd_pipeline = DiffusionPipeline.from_pretrained("sd-dreambooth-library/herge-style").to(
9    device
10)
11...
Here we're taking advantage of the neat .to() method provided by PyTorch to send models, inputs, and other data to specific devices.

Input keyword arguments

In addition to the required prompt input, there are a number of other optional keyword arguments we might want to supply to our model at inference. For instance, the number of images per prompt, the dimensions of the images, the random seed and so on. It is useful to define the shape of this input and also provide default values for these in case they are only partially provided at runtime. For this guide, we will define the following input dict to the model:
1import typing as t
2
3# The shape of the input keyword arguments
4class InputKwargs(t.TypedDict):
5    prompt: str
6    num_images_per_prompt: t.Optional[int]
7    height: t.Optional[int]
8    width: t.Optional[int]
9    num_inference_steps: t.Optional[int]
10    guidance_scale: t.Optional[float]
11    eta: t.Optional[float]
12    # seed should not be passed to the model
13    seed: t.Optional[int]
14
15
16DEFAULT_KWARGS: InputKwargs = {
17    "prompt": "Mountain winds and babbling springs and moonlight seas, herge_style.",
18    "num_images_per_prompt": 1,
19    "height": 512,
20    "width": 512,
21    "num_inference_steps": 50,
22    "guidance_scale": 7.5,
23    "eta": 0.0,
24    "seed": None,
25}
We've included a TypedDict characterising the shape of the expected input to the pipeline. As with all typing in Python, this isn't enforced at runtime but makes things clearer for other developers and also gives you handy type hints if your IDE supports it.

Building a pipeline around the diffusers model

Now that we have a HF model working for local inference, it's time to start laying the ground to offload that work to PipelineCloud. In order for our Python code to run in the cloud, it needs to be serialised, sent to the server and executed in the cloud, where input keyword arguments should be fed into our code dynamically at runtime. Luckily, pipeline-ai provides a number of useful tools for achieving just that. All these steps will be handled under the hood by building a pipeline-ai pipeline around our Python code. We'll also package any loadingpre-processinginference and post-processing steps into a single deployable object. There are 2 main steps to this process:- Create and configure a pipeline blueprint using a context manager.- Create a pipeline_model which wraps around the HF stable-diffusion model and implements all the methods called within the pipeline blueprint.We'll begin by configuring the pipeline and then secondly build out the pipeline_model. This will make it easier to understand why the model objects are decorated as they are.

Creating the pipeline

First, make sure you have added pipeline-ai to your virtual environment, using whichever package manager you prefer. Then we'll start by making a pipeline blueprint. This blueprint is essentially a computational graph representing a set of instructions for what should happen when a request is made to your inference endpoint. In the body of your request, you will typically be passing some payload, which includes e.g. promptnum_inference_steps as discussed in the previous section. This payload should then be passed as an input variable to the pipeline at runtime and fed into operations, whose outputs are in turn fed into further operations until we arrive at the output of the pipeline itself.To build the pipeline, you need to create a Pipeline object and use a context manager, as below:
1import typing as t
2
3from PIL.Image import Image
4from pipeline import Pipeline, Variable
5
6
7PIPELINE_NAME = "sd-dreambooth"
8
9# The `pipeline-ai` pipeline
10with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
11    # Define pipeline inputs
12    input_kwargs = Variable(dict, is_input=True)
13    pipeline.add_variables(input_kwargs)
14
15    # Create and load model (We'll define it later)
16    model = SDDreambooth()
17    model.load()
18
19    # Feed inputs to model
20    model.set_kwargs(input_kwargs)
21    ## We expect a list of `PIL` images as output
22    images: t.List[Image] = model.predict()
23
24    # Format the images and output result
25    formatted_images: t.List[str] = model.format_images(images)
26    pipeline.output(formatted_images)
27

NOTE: We'll define the SDDreambooth model in the next section.

The pipeline has been broken down into 4 parts. We'll define the core SDDreambooth model and it's methods later, but on a high level we can see that:- At the start, we define the input Variable to the pipeline and add it to the pipeline. This tells the pipeline that it should expect a variable of dict type at runtime. In fact, what we really would want instead is input_kwargs = Variable(InputKwargs, is_input=True), but this leads to type error at runtime as TypedDict is not currently supported as a Variable type. We'll be pretty much passing an InputKwargs dictionary to the model itself, with the exception of the seed key, so we'll need to pop that off before feeding the input to the model.- We then create a SDDreambooth model instance (defined later) and load the HF model into memory. We'll be able to tell the load method to only run once at start up so that the model isn't loaded unnecessarily for every inference call, but we'll get to that shortly.- The InputKwargs to the pipeline are then parsed and fed to the model, which runs a forward pass through the network and returns a list of PIL images.- A post processing stage to format the output images into JSON-serializable format and output these formatted images from the pipeline.
Why is the syntax so strict?If you're unfamiliar with building computational graphs this syntax can be a bit alien and tricky to parse. The point is to create a deterministic flow from input/s to output/s so that Pipeline Cloud servers can find optimisations and handle scaling correctly. In the end you'll acheive better performance.

Creating the core pipeline_model

Next, we want to implement the SDDreambooth model that we instantiated in the pipeline. This will be a wrapper class around the HF model where we will define for instance, how the HF model should be loaded, how the inputs and outputs to the model should be transformed and of course, the inference method itself. The wrapper class needs to be decorated by pipeline_model. This allows the Pipeline context manager defined in the last section, to treat the wrapper class as a model object. The model can contain pipeline_function decorated functions and allows for persistent logic to be present inside of the wrapper class (for caching etc). As in the previous section, we create the model around the HF diffusers package:
1import base64
2import io
3import os
4import random
5import typing as t
6
7import numpy as np
8import torch
9from PIL.Image import Image
10from pipeline import (
11    pipeline_function,
12    pipeline_model,
13)
14
15
16@pipeline_model
17class SDDreambooth:
18    def __init__(self) -> None:
19        self.input_kwargs = None
20        self.model = None
21
22    @pipeline_function(run_once=True, on_startup=True)
23    def load(self) -> None:
24        """
25        Load the model into memory. The decorator parameters ensure the
26        model is loaded only when needed, i.e. when it is not cached on the GPU.
27        """
28        from diffusers import DiffusionPipeline
29
30        device = torch.device("cuda")
31        self.model = DiffusionPipeline.from_pretrained(
32            "sd-dreambooth-library/herge-style"
33        )
34        self.model.to(device)
35
36    @pipeline_function
37    def set_kwargs(self, input_kwargs: InputKwargs) -> InputKwargs:
38        """
39        Set the model kwargs given the input kwargs.
40        These are used in other methods.
41        """
42        self.input_kwargs = {**DEFAULT_KWARGS, **input_kwargs}
43        return self.input_kwargs
44
45    @pipeline_function
46    def seed_everything(self) -> int:
47        """
48        Sets seed for pseudo-random number generators in: pytorch, numpy, python.random.
49        `PL_GLOBAL_SEED` ensures the seed is passed to any spawned subprocesses.
50        """
51        seed = self.input_kwargs.pop("seed") or random.randint(1, 1_000_000)
52        os.environ["PL_GLOBAL_SEED"] = str(seed)
53        random.seed(seed)
54        np.random.seed(seed)
55        torch.manual_seed(seed)
56        torch.cuda.manual_seed_all(seed)
57        return seed
58
59    @pipeline_function
60    def predict(self) -> t.List[Image]:
61        """
62        Generates a list of images given the `input_kwargs`.
63        """
64        # Ensure the input kwargs have been set
65        if self.input_kwargs is None:
66            raise TypeError(
67                "Input kwargs cannot be None. Set them before calling this method."
68            )
69        seed = self.seed_everything()
70        generator = torch.Generator(device=0).manual_seed(seed)
71
72        images = self.model(**self.input_kwargs, generator=generator).images
73
74        return images
75
76    @pipeline_function
77    def to_string(self, image: Image) -> str:
78        """
79        Converts a `PIL` image to a base64 encoded string.
80        """
81        buffered = io.BytesIO()
82        image.save(buffered, format="JPEG")
83        img_str = base64.b64encode(buffered.getvalue()).decode()
84        return img_str
85
86    @pipeline_function
87    def format_images(self, images: t.List[Image]) -> t.List[str]:
88        """
89        Formats a list of `PIL` images into a list of base64 encoded strings.
90        """
91        return [self.to_string(image) for image in images]
92
OK that may seem like a bit of a mouthful, but most of the code here is actually pretty straight-forward. First notice that the methods of the pipeline_model have pipeline_function decorators. These ensure that the actual runtime values of the Variable objects will be passed to these methods when we call them from within the pipeline, rather than the bare Variable objects themselves. We've implemented the following methods:- load handles the instantiation of the model and sending it to a GPU (more on this below).- set_kwargs combines optional input parameters with default ones. If you wanted to perform some form of validation on the input, here would be a good place to do it. We saved the inputs as an instance attribute to share them across methods.- seed_everything sets the seed for pseudo-random generators, depending on the seed property provided in the input. We pop it off the input as we don't want it passed as a model parameter, but instead as a torch.Generator.manual_seed parameter.- predict passes our input to the stable diffusion model and generates a list of PIL images.- format_images converts the generated PIL images into base64-encoded strings, so that they're in a suitable form to be sent across networks (more on this below).

Set the load function to run only on startup

Remember how every request to your pipeline's endpoint will follow the blueprint from top to bottom? If that were to happen now, the model.load() function would be called on every single request. One of the great features of a platform like Pipeline Cloud is that it can cache your models on GPU so that you don't have to experience cold starts on every request. If we repeatedly called load then we would be spending time with pointless loading.Thus we need to tell the blueprint to only call the load method once when the pipeline loads, and not again for the duration of the pipeline's time within GPU cache. Fortunately, there's a really easy way to do exactly that, and unlock all the performance benefits that it entails. Just tag the pipeline_function decorator on the load method with the following two arguments:
1...
2@pipeline_function(run_once=True, on_startup=True)
3def load(self) -> bool:
4  ...

Post-processing

You may have noticed that the images generated by the HF model are native Python PIL objects. However, when running inferences in the cloud, we need to return JSON-serializable objects. So instead of directly outputting the PIL images from the pipeline, we instead transform them into base64 encoded strings. This is handled by the format_images and to_string methods.

Running the pipeline locally

As we've seen, pipeline-ai is a library for building a computational flow. It can also be used locally to handle execution of the pipeline, called a 'run'. So, a great way of debugging your pipeline before uploading it to Pipeline Cloud is to run it locally! Of course, if you don't have a GPU attached then in some cases local runs will be too slow to be practical.
1pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
2
3example_input: InputKwargs = dict(
4    prompt="Black rock, ship-wreck, volcano, herge_style.",
5    num_inference_steps=20
6)
7result = pipeline.run(example_input)
First we 'get' the pipeline by using the name which we set when defining the pipeline blueprint. Then, very simply, we call the .run() method on the pipeline object, passing in our input.

Running the pipeline on Pipeline Cloud


We will be interacting with the Pipeline API using the CLI and assume you have authenticated. For more information about how to authenticate using the CLI, see the authentication guide

Creating the remote Python environment

In order to execute runs in the cloud, we'll need some Python packages that aren't included in the default environment , e.g. a more up to date diffusers package. This means that we'll need to create a new custom environment and add all the required packages. The easiest way to achieve this is by using the pipeline-ai CLI. We recommend that you have the latest version of pipeline-ai installed.

To create a new environment, named huggingface say, then simply run

1pipeline environments create huggingface

in a shell with your local environment (with pipeline-ai) activated. You can check that it was created successfully by fetching it by name:

1pipeline environments get -n huggingface

Here you should see a response with an empty list of python_requirements, which are the Python packages in your environment. Then create a local requirements.txt file containing the following lines:

1transformers==4.26.1
2torch==1.13.1
3diffusers==0.13.1
4accelerate==0.17.1

and then add all these packages to your custom environment by running:

1cat requirements.txt | xargs pipeline environments update -n huggingface add

You should now see these packages in the environment python_requirements. Note that you'll need the ID of your custom environment when uploading the pipeline to PipelineCloud.

Uploading the pipeline

Before we can run the pipeline on Pipeline Cloud, we need to upload it to the servers. Again we 'get' the pipeline, before instantiating a connection to Pipeline Cloud and uploading our pipeline.
1from pipeline import PipelineCloud
2
3pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
4
5api = PipelineCloud()
6uploaded_pipeline = api.upload_pipeline(pipelinei, environment="YOUR_ENVIRONMENT_ID")
7
8print(f"Uploaded pipeline id: {uploaded_pipeline.id}")
Just be sure to replace YOUR_ENVIRONMENT_ID with the ID of the custom environment you created previously, which you can get using the CLI:
1pipeline environments get -n huggingface
During this stage, the pipeline-ai library will serialise all your code and post your pipeline to an endpoint for creating pipelines on the main API gateway.

Running the pipeline

And now we run the pipeline, supplying an input dictionary of type InputKwargs:
1run = api.run_pipeline(
2    uploaded_pipeline.id,
3    {
4        "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
5        "num_inference_steps": 50
6    },
7)
Internally this performs a POST request to the /v2/runs endpoint on the main API, so if you're building an app in a different language you don't need to worry about dropping the pipeline-ai library.The first time you run the pipeline, it may take up to a couple minutes because the custom environment and pipeline won't be cached on the servers. Subsequent runs won't be subject to this cold start though and should be pretty speedy! Just make sure you move the run_pipeline call into another script and don't execute the whole script again because you'll be uploading a new pipeline each time.

Conclusion

In this guide, we saw how to interface with the HuggingFace DiffusionPipeline to very easily start generating local predictions on a pretrained stable-diffusion pipeline. We then packaged this HuggingFace pipeline into a single deployable pipeline-ai pipeline, getting our Python code in a form ready to be serialised, sent and executed on the the PipelineCloud servers. After uploading the pipeline to the cloud, we were quickly able to start running the pipeline remotely.

Complete script

1import base64
2import io
3import os
4import random
5import typing as t
6
7import numpy as np
8import torch
9from diffusers.utils import logging
10from dotenv import load_dotenv
11from PIL.Image import Image
12from pipeline import (
13    Pipeline,
14    PipelineCloud,
15    Variable,
16    pipeline_function,
17    pipeline_model,
18)
19
20load_dotenv()
21
22PIPELINE_NAME = "sd-dreambooth"
23
24
25logging.disable_progress_bar()
26logging.set_verbosity_error()
27
28
29# The shape of the input keyword arguments
30class InputKwargs(t.TypedDict):
31    prompt: str
32    num_images_per_prompt: t.Optional[int]
33    height: t.Optional[int]
34    width: t.Optional[int]
35    num_inference_steps: t.Optional[int]
36    guidance_scale: t.Optional[float]
37    eta: t.Optional[float]
38    seed: t.Optional[int]
39
40
41DEFAULT_KWARGS: InputKwargs = {
42    "prompt": "Mountain winds and babbling springs and moonlight seas.",
43    "num_images_per_prompt": 1,
44    "height": 512,
45    "width": 512,
46    "num_inference_steps": 50,
47    "guidance_scale": 7.5,
48    "eta": 0.0,
49    "seed": None,
50}
51
52
53@pipeline_model
54class SDDreambooth:
55    def __init__(self) -> None:
56        self.input_kwargs = None
57        self.model = None
58
59    @pipeline_function(run_once=True, on_startup=True)
60    def load(self) -> None:
61        """
62        Load the model into memory. The decorator parameters ensure the
63        model is loaded only when needed, i.e. it is not cached on the GPU.
64        """
65        from diffusers import DiffusionPipeline
66
67        device = torch.device("cuda:0")
68        self.model = DiffusionPipeline.from_pretrained(
69            "sd-dreambooth-library/herge-style"
70        )
71        self.model.to(device)
72
73    @pipeline_function
74    def set_kwargs(self, input_kwargs: InputKwargs) -> InputKwargs:
75        """
76        Set the model kwargs given the input kwargs.
77        These are used in other methods.
78        """
79        self.input_kwargs = {**DEFAULT_KWARGS, **input_kwargs}
80        return self.input_kwargs
81
82    @pipeline_function
83    def seed_everything(self) -> int:
84        """
85        Sets seed for pseudo-random number generators in: pytorch, numpy, python.random.
86        `PL_GLOBAL_SEED` ensures the seed is passed to any spawned subprocesses.
87        """
88        seed = self.input_kwargs.pop("seed") or random.randint(1, 1_000_000)
89        os.environ["PL_GLOBAL_SEED"] = str(seed)
90        random.seed(seed)
91        np.random.seed(seed)
92        torch.manual_seed(seed)
93        torch.cuda.manual_seed_all(seed)
94        return seed
95
96    @pipeline_function
97    def predict(self) -> t.List[Image]:
98        """
99        A forward pass through the network given the `input_kwargs`.
100        """
101        # Ensure the input kwargs have been set
102        if self.input_kwargs is None:
103            raise TypeError(
104                "Input kwargs cannot be None. Set them before calling this method."
105            )
106        seed = self.seed_everything()
107        generator = torch.Generator(device=0).manual_seed(seed)
108
109        images = self.model(**self.input_kwargs, generator=generator).images
110
111        return images
112
113    @pipeline_function
114    def to_string(self, image: Image) -> str:
115        """
116        Converts a `PIL` image to a base64 encoded string.
117        """
118        buffered = io.BytesIO()
119        image.save(buffered, format="JPEG")
120        img_str = base64.b64encode(buffered.getvalue()).decode()
121        return img_str
122
123    @pipeline_function
124    def format_images(self, images: t.List[Image]) -> t.List[str]:
125        """
126        Formats a list of `PIL` images into a list of base64 encoded strings.
127        """
128        return [self.to_string(image) for image in images]
129
130
131with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
132    # Define pipeline inputs
133    input_kwargs = Variable(dict, is_input=True)
134    pipeline.add_variables(input_kwargs)
135
136    # Create and load model
137    model = SDDreambooth()
138    model.load()
139
140    # Feed inputs to model
141    context: InputKwargs = model.set_kwargs(input_kwargs)
142    images: t.List[Image] = model.predict()
143
144    # Format the images and output result
145    formatted_images: t.List[str] = model.format_images(images)
146    pipeline.output(formatted_images)
147
148
149pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
150
151
152api = PipelineCloud()
153uploaded_pipeline = api.upload_pipeline(pipeline, environment="YOUR_ENVIRONMENT_ID")
154print(f"Uploaded pipeline id: {uploaded_pipeline.id}")
155
156run = api.run_pipeline(
157    uploaded_pipeline.id,
158    {
159        "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
160        "num_inference_steps": 50,
161    },
162)

ABOUT PIPELINE.AIPipeline AI makes it easy to work with ML models and to deploy AI at scale. The self-serve platform provides a fast pay-as-you-go API to run pretrained or proprietory models in production. If you are looking to deploy a large product and would like to sign up as an Enterprise customer please get in touch.Follow us on Twitter and Linkedin.