
8 minutes read
February 26, 2023
Tutorial How to deploy a Stable Diffusion pipeline
How to deploy a pre-trained HugginFace stable diffusion pipeline on Pipeline Cloud.
Getting started with HuggingFace diffusers
Once you've installed diffusers, it's really simple to initialise a model and start running inference on it. We'll use a stable diffusion model trained using dreambooth, sd-dreambooth-library/herge-style . It is a text-to-image model, which means it will take in an input sentence/prompt like 'Mountain winds and babbling springs and moonlight seas', and output an image related to the input prompt. It was fine-tuned with Tintin images using Dreambooth. This technique was developed by Google in order to fine-tune diffusion models by injecting a custom subject to the model. It uses a rare word for the custom subject (in our case `herge_style`) which doesn't have much meaning in the original model. For instance, "Mountain winds and babbling springs and moonlight seas, herge_style", will generate an image like
Using a pipeline from diffusers
HuggingFace makes it very easy to load any pretrained diffusion pipeline and to use it in inference, by interfacing with the DiffusionPipeline module.Both HuggingFace and pipeline.ai use the same word 'pipeline' to mean 'a set of processing steps which convert an input to an output'. Later in this guide, we're going to embed this model within a pipeline-ai 'pipeline'.
Getting started using
sd-dreambooth-library/herge-style
for inference, is as simple as:1from diffusers import DiffusionPipeline
2from PIL.Image import Image
3
4# Load the HF pipeline
5model = DiffusionPipeline.from_pretrained("sd-dreambooth-library/herge-style")
6
7# The input prompt
8prompt = "Mountain winds and babbling springs and moonlight seas, herge_style."
9
10# Generate an image from the prompt
11output_image: Image = model(prompt).images[0]
12
13# Save the image to a local file
14with open("image.jpeg", "w") as f:
15 output_image.save(f, format="JPEG")
Running the Python script will take a while but you should eventually see an image saved to a local file. What just happened here? We instantiated a pre-trained HF pipeline, and just by passing a prompt string we made a prediction, which returned a list of PIL
images. We then saved the first image to a local file.
Internally, the HF pipeline assembles the model on CPU, downloads the sd-dreambooth-library/herge-style
weights, and then loads them into the model. If you have a GPU attached, you can ensure the prediction takes place on your GPU instead, by creating a torch.device
and moving the model (tensor) to that device:
1...
2import torch
3
4# Create a GPU device if it is available
5device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
6
7# Load the HF pipeline
8sd_pipeline = DiffusionPipeline.from_pretrained("sd-dreambooth-library/herge-style").to(
9 device
10)
11...
.to()
method provided by PyTorch to send models, inputs, and other data to specific devices.
Input keyword arguments
In addition to the required prompt input, there are a number of other optional keyword arguments we might want to supply to our model at inference. For instance, the number of images per prompt, the dimensions of the images, the random seed and so on. It is useful to define the shape of this input and also provide default values for these in case they are only partially provided at runtime. For this guide, we will define the following inputdict
to the model:1import typing as t
2
3# The shape of the input keyword arguments
4class InputKwargs(t.TypedDict):
5 prompt: str
6 num_images_per_prompt: t.Optional[int]
7 height: t.Optional[int]
8 width: t.Optional[int]
9 num_inference_steps: t.Optional[int]
10 guidance_scale: t.Optional[float]
11 eta: t.Optional[float]
12 # seed should not be passed to the model
13 seed: t.Optional[int]
14
15
16DEFAULT_KWARGS: InputKwargs = {
17 "prompt": "Mountain winds and babbling springs and moonlight seas, herge_style.",
18 "num_images_per_prompt": 1,
19 "height": 512,
20 "width": 512,
21 "num_inference_steps": 50,
22 "guidance_scale": 7.5,
23 "eta": 0.0,
24 "seed": None,
25}
TypedDict
characterising the shape of the expected input to the pipeline. As with all typing in Python, this isn't enforced at runtime but makes things clearer for other developers and also gives you handy type hints if your IDE supports it.Building a pipeline around the diffusers
model
Now that we have a HF model working for local inference, it's time to start laying the ground to offload that work to PipelineCloud
. In order for our Python code to run in the cloud, it needs to be serialised, sent to the server and executed in the cloud, where input keyword arguments should be fed into our code dynamically at runtime. Luckily, pipeline-ai
provides a number of useful tools for achieving just that. All these steps will be handled under the hood by building a pipeline-ai
pipeline around our Python code. We'll also package any loading, pre-processing, inference and post-processing steps into a single deployable object.
There are 2 main steps to this process:- Create and configure a pipeline
blueprint using a context manager.- Create a pipeline_model
which wraps around the HF stable-diffusion model and implements all the methods called within the pipeline blueprint.We'll begin by configuring the pipeline and then secondly build out the pipeline_model
.
This will make it easier to understand why the model objects are decorated as they are.Creating the pipeline
First, make sure you have added pipeline-ai
to your virtual environment, using whichever package manager you prefer. Then we'll start by making a pipeline
blueprint. This blueprint is essentially a computational graph representing a set of instructions for what should happen when a request is made to your inference endpoint. In the body of your request, you will typically be passing some payload, which includes e.g. prompt
, num_inference_steps
as discussed in the previous section. This payload should then be passed as an input variable to the pipeline at runtime and fed into operations, whose outputs are in turn fed into further operations until we arrive at the output of the pipeline itself.To build the pipeline, you need to create a Pipeline
object and use a context manager, as below:1import typing as t
2
3from PIL.Image import Image
4from pipeline import Pipeline, Variable
5
6
7PIPELINE_NAME = "sd-dreambooth"
8
9# The `pipeline-ai` pipeline
10with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
11 # Define pipeline inputs
12 input_kwargs = Variable(dict, is_input=True)
13 pipeline.add_variables(input_kwargs)
14
15 # Create and load model (We'll define it later)
16 model = SDDreambooth()
17 model.load()
18
19 # Feed inputs to model
20 model.set_kwargs(input_kwargs)
21 ## We expect a list of `PIL` images as output
22 images: t.List[Image] = model.predict()
23
24 # Format the images and output result
25 formatted_images: t.List[str] = model.format_images(images)
26 pipeline.output(formatted_images)
27
NOTE: We'll define the SDDreambooth model in the next section.
The pipeline has been broken down into 4 parts. We'll define the coreSDDreambooth
model and it's methods later, but on a high level we can see that:- At the start, we define the input Variable
to the pipeline and add it to the pipeline. This tells the pipeline that it should expect a variable of dict
type at runtime. In fact, what we really would want instead is input_kwargs = Variable(InputKwargs, is_input=True)
, but this leads to type error at runtime as TypedDict
is not currently supported as a Variable
type. We'll be pretty much passing an InputKwargs
dictionary to the model itself, with the exception of the seed
key, so we'll need to pop that off before feeding the input to the model.- We then create a SDDreambooth
model instance (defined later) and load the HF model into memory. We'll be able to tell the load method to only run once at start up so that the model isn't loaded unnecessarily for every inference call, but we'll get to that shortly.- The InputKwargs
to the pipeline are then parsed and fed to the model, which runs a forward pass through the network and returns a list of PIL
images.- A post processing stage to format the output images into JSON-serializable
format and output these formatted images from the pipeline.Why is the syntax so strict?If you're unfamiliar with building computational graphs this syntax can be a bit alien and tricky to parse. The point is to create a deterministic flow from input/s to output/s so that Pipeline Cloud servers can find optimisations and handle scaling correctly. In the end you'll acheive better performance.
Creating the core pipeline_model
Next, we want to implement the SDDreambooth
model that we instantiated in the pipeline. This will be a wrapper class around the HF model where we will define for instance, how the HF model should be loaded, how the inputs and outputs to the model should be transformed and of course, the inference method itself. The wrapper class needs to be decorated by pipeline_model
. This allows the Pipeline
context manager defined in the last section, to treat the wrapper class as a model object. The model can contain pipeline_function
decorated functions and allows for persistent logic to be present inside of the wrapper class (for caching etc).
As in the previous section, we create the model around the HF diffusers
package:1import base64
2import io
3import os
4import random
5import typing as t
6
7import numpy as np
8import torch
9from PIL.Image import Image
10from pipeline import (
11 pipeline_function,
12 pipeline_model,
13)
14
15
16@pipeline_model
17class SDDreambooth:
18 def __init__(self) -> None:
19 self.input_kwargs = None
20 self.model = None
21
22 @pipeline_function(run_once=True, on_startup=True)
23 def load(self) -> None:
24 """
25 Load the model into memory. The decorator parameters ensure the
26 model is loaded only when needed, i.e. when it is not cached on the GPU.
27 """
28 from diffusers import DiffusionPipeline
29
30 device = torch.device("cuda")
31 self.model = DiffusionPipeline.from_pretrained(
32 "sd-dreambooth-library/herge-style"
33 )
34 self.model.to(device)
35
36 @pipeline_function
37 def set_kwargs(self, input_kwargs: InputKwargs) -> InputKwargs:
38 """
39 Set the model kwargs given the input kwargs.
40 These are used in other methods.
41 """
42 self.input_kwargs = {**DEFAULT_KWARGS, **input_kwargs}
43 return self.input_kwargs
44
45 @pipeline_function
46 def seed_everything(self) -> int:
47 """
48 Sets seed for pseudo-random number generators in: pytorch, numpy, python.random.
49 `PL_GLOBAL_SEED` ensures the seed is passed to any spawned subprocesses.
50 """
51 seed = self.input_kwargs.pop("seed") or random.randint(1, 1_000_000)
52 os.environ["PL_GLOBAL_SEED"] = str(seed)
53 random.seed(seed)
54 np.random.seed(seed)
55 torch.manual_seed(seed)
56 torch.cuda.manual_seed_all(seed)
57 return seed
58
59 @pipeline_function
60 def predict(self) -> t.List[Image]:
61 """
62 Generates a list of images given the `input_kwargs`.
63 """
64 # Ensure the input kwargs have been set
65 if self.input_kwargs is None:
66 raise TypeError(
67 "Input kwargs cannot be None. Set them before calling this method."
68 )
69 seed = self.seed_everything()
70 generator = torch.Generator(device=0).manual_seed(seed)
71
72 images = self.model(**self.input_kwargs, generator=generator).images
73
74 return images
75
76 @pipeline_function
77 def to_string(self, image: Image) -> str:
78 """
79 Converts a `PIL` image to a base64 encoded string.
80 """
81 buffered = io.BytesIO()
82 image.save(buffered, format="JPEG")
83 img_str = base64.b64encode(buffered.getvalue()).decode()
84 return img_str
85
86 @pipeline_function
87 def format_images(self, images: t.List[Image]) -> t.List[str]:
88 """
89 Formats a list of `PIL` images into a list of base64 encoded strings.
90 """
91 return [self.to_string(image) for image in images]
92
pipeline_model
have pipeline_function
decorators. These ensure that the actual runtime values of the Variable
objects will be passed to these methods when we call them from within the pipeline
, rather than the bare Variable
objects themselves.
We've implemented the following methods:- load
handles the instantiation of the model and sending it to a GPU (more on this below).- set_kwargs
combines optional input parameters with default ones.
If you wanted to perform some form of validation on the input, here would be a good place to do it. We saved the inputs as an instance attribute to share them across methods.- seed_everything
sets the seed for pseudo-random generators, depending on the seed
property provided in the input. We pop
it off the input as we don't want it passed as a model parameter, but instead as a torch.Generator.manual_seed
parameter.- predict
passes our input to the stable diffusion model and generates a list of PIL
images.- format_images
converts the generated PIL
images into base64-encoded strings, so that they're in a suitable form to be sent across networks (more on this below).Set the load
function to run only on startup
Remember how every request to your pipeline's endpoint will follow the blueprint from top to bottom? If that were to happen now, the model.load()
function would be called on every single request. One of the great features of a platform like Pipeline Cloud is that it can cache your models on GPU so that you don't have to experience cold starts on every request. If we repeatedly called load
then we would be spending time with pointless loading.Thus we need to tell the blueprint to only call the load
method once when the pipeline loads, and not again for the duration of the pipeline's time within GPU cache. Fortunately, there's a really easy way to do exactly that, and unlock all the performance benefits that it entails. Just tag the pipeline_function
decorator on the load
method with the following two arguments:1...
2@pipeline_function(run_once=True, on_startup=True)
3def load(self) -> bool:
4 ...
Post-processing
You may have noticed that the images generated by the HF model are native PythonPIL
objects. However, when running inferences in the cloud, we need to return JSON-serializable objects. So instead of directly outputting the PIL
images from the pipeline, we instead transform them into base64
encoded strings. This is handled by the format_images
and to_string
methods.Running the pipeline locally
As we've seen,pipeline-ai
is a library for building a computational flow. It can also be used locally to handle execution of the pipeline, called a 'run'. So, a great way of debugging your pipeline before uploading it to Pipeline Cloud is to run it locally! Of course, if you don't have a GPU attached then in some cases local runs will be too slow to be practical.1pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
2
3example_input: InputKwargs = dict(
4 prompt="Black rock, ship-wreck, volcano, herge_style.",
5 num_inference_steps=20
6)
7result = pipeline.run(example_input)
.run()
method on the pipeline object, passing in our input.Running the pipeline on Pipeline Cloud
We will be interacting with the Pipeline API using the CLI and assume you have authenticated. For more information about how to authenticate using the CLI, see the authentication guide
Creating the remote Python environment
In order to execute runs in the cloud, we'll need some Python packages that aren't included in the default environment , e.g. a more up to datediffusers
package. This means that we'll need to create a new custom environment and add all the required packages. The easiest way to achieve this is by using the pipeline-ai
CLI. We recommend that you have the latest version of pipeline-ai
installed.To create a new environment, named huggingface
say, then simply run
1pipeline environments create huggingface
in a shell with your local environment (with pipeline-ai
) activated. You can check that it was created successfully by fetching it by name:
1pipeline environments get -n huggingface
Here you should see a response with an empty list of python_requirements
, which are the Python packages in your environment. Then create a local requirements.txt
file containing the following lines:
1transformers==4.26.1
2torch==1.13.1
3diffusers==0.13.1
4accelerate==0.17.1
and then add all these packages to your custom environment by running:
1cat requirements.txt | xargs pipeline environments update -n huggingface add
You should now see these packages in the environment python_requirements
. Note that you'll need the ID of your custom environment when uploading the pipeline to PipelineCloud
.
Uploading the pipeline
Before we can run the pipeline on Pipeline Cloud, we need to upload it to the servers. Again we 'get' the pipeline, before instantiating a connection to Pipeline Cloud and uploading our pipeline.1from pipeline import PipelineCloud
2
3pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
4
5api = PipelineCloud()
6uploaded_pipeline = api.upload_pipeline(pipelinei, environment="YOUR_ENVIRONMENT_ID")
7
8print(f"Uploaded pipeline id: {uploaded_pipeline.id}")
YOUR_ENVIRONMENT_ID
with the ID of the custom environment you created previously, which you can get using the CLI:1pipeline environments get -n huggingface
pipeline-ai
library will serialise all your code and post your pipeline to an endpoint for creating pipelines on the main API gateway.Running the pipeline
And now we run the pipeline, supplying an input dictionary of typeInputKwargs
:1run = api.run_pipeline(
2 uploaded_pipeline.id,
3 {
4 "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
5 "num_inference_steps": 50
6 },
7)
/v2/runs
endpoint on the main API, so if you're building an app in a different language you don't need to worry about dropping the pipeline-ai
library.The first time you run the pipeline, it may take up to a couple minutes because the custom environment and pipeline won't be cached on the servers. Subsequent runs won't be subject to this cold start though and should be pretty speedy! Just make sure you move the run_pipeline
call into another script and don't execute the whole script again because you'll be uploading a new pipeline each time.Conclusion
In this guide, we saw how to interface with the HuggingFaceDiffusionPipeline
to very easily start generating local predictions on a pretrained stable-diffusion pipeline. We then packaged this HuggingFace pipeline into a single deployable pipeline-ai
pipeline, getting our Python code in a form ready to be serialised, sent and executed on the the PipelineCloud
servers. After uploading the pipeline to the cloud, we were quickly able to start running the pipeline remotely.Complete script
1import base64
2import io
3import os
4import random
5import typing as t
6
7import numpy as np
8import torch
9from diffusers.utils import logging
10from dotenv import load_dotenv
11from PIL.Image import Image
12from pipeline import (
13 Pipeline,
14 PipelineCloud,
15 Variable,
16 pipeline_function,
17 pipeline_model,
18)
19
20load_dotenv()
21
22PIPELINE_NAME = "sd-dreambooth"
23
24
25logging.disable_progress_bar()
26logging.set_verbosity_error()
27
28
29# The shape of the input keyword arguments
30class InputKwargs(t.TypedDict):
31 prompt: str
32 num_images_per_prompt: t.Optional[int]
33 height: t.Optional[int]
34 width: t.Optional[int]
35 num_inference_steps: t.Optional[int]
36 guidance_scale: t.Optional[float]
37 eta: t.Optional[float]
38 seed: t.Optional[int]
39
40
41DEFAULT_KWARGS: InputKwargs = {
42 "prompt": "Mountain winds and babbling springs and moonlight seas.",
43 "num_images_per_prompt": 1,
44 "height": 512,
45 "width": 512,
46 "num_inference_steps": 50,
47 "guidance_scale": 7.5,
48 "eta": 0.0,
49 "seed": None,
50}
51
52
53@pipeline_model
54class SDDreambooth:
55 def __init__(self) -> None:
56 self.input_kwargs = None
57 self.model = None
58
59 @pipeline_function(run_once=True, on_startup=True)
60 def load(self) -> None:
61 """
62 Load the model into memory. The decorator parameters ensure the
63 model is loaded only when needed, i.e. it is not cached on the GPU.
64 """
65 from diffusers import DiffusionPipeline
66
67 device = torch.device("cuda:0")
68 self.model = DiffusionPipeline.from_pretrained(
69 "sd-dreambooth-library/herge-style"
70 )
71 self.model.to(device)
72
73 @pipeline_function
74 def set_kwargs(self, input_kwargs: InputKwargs) -> InputKwargs:
75 """
76 Set the model kwargs given the input kwargs.
77 These are used in other methods.
78 """
79 self.input_kwargs = {**DEFAULT_KWARGS, **input_kwargs}
80 return self.input_kwargs
81
82 @pipeline_function
83 def seed_everything(self) -> int:
84 """
85 Sets seed for pseudo-random number generators in: pytorch, numpy, python.random.
86 `PL_GLOBAL_SEED` ensures the seed is passed to any spawned subprocesses.
87 """
88 seed = self.input_kwargs.pop("seed") or random.randint(1, 1_000_000)
89 os.environ["PL_GLOBAL_SEED"] = str(seed)
90 random.seed(seed)
91 np.random.seed(seed)
92 torch.manual_seed(seed)
93 torch.cuda.manual_seed_all(seed)
94 return seed
95
96 @pipeline_function
97 def predict(self) -> t.List[Image]:
98 """
99 A forward pass through the network given the `input_kwargs`.
100 """
101 # Ensure the input kwargs have been set
102 if self.input_kwargs is None:
103 raise TypeError(
104 "Input kwargs cannot be None. Set them before calling this method."
105 )
106 seed = self.seed_everything()
107 generator = torch.Generator(device=0).manual_seed(seed)
108
109 images = self.model(**self.input_kwargs, generator=generator).images
110
111 return images
112
113 @pipeline_function
114 def to_string(self, image: Image) -> str:
115 """
116 Converts a `PIL` image to a base64 encoded string.
117 """
118 buffered = io.BytesIO()
119 image.save(buffered, format="JPEG")
120 img_str = base64.b64encode(buffered.getvalue()).decode()
121 return img_str
122
123 @pipeline_function
124 def format_images(self, images: t.List[Image]) -> t.List[str]:
125 """
126 Formats a list of `PIL` images into a list of base64 encoded strings.
127 """
128 return [self.to_string(image) for image in images]
129
130
131with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
132 # Define pipeline inputs
133 input_kwargs = Variable(dict, is_input=True)
134 pipeline.add_variables(input_kwargs)
135
136 # Create and load model
137 model = SDDreambooth()
138 model.load()
139
140 # Feed inputs to model
141 context: InputKwargs = model.set_kwargs(input_kwargs)
142 images: t.List[Image] = model.predict()
143
144 # Format the images and output result
145 formatted_images: t.List[str] = model.format_images(images)
146 pipeline.output(formatted_images)
147
148
149pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
150
151
152api = PipelineCloud()
153uploaded_pipeline = api.upload_pipeline(pipeline, environment="YOUR_ENVIRONMENT_ID")
154print(f"Uploaded pipeline id: {uploaded_pipeline.id}")
155
156run = api.run_pipeline(
157 uploaded_pipeline.id,
158 {
159 "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
160 "num_inference_steps": 50,
161 },
162)
ABOUT PIPELINE.AIPipeline AI makes it easy to work with ML models and to deploy AI at scale. The self-serve platform provides a fast pay-as-you-go API to run pretrained or proprietory models in production. If you are looking to deploy a large product and would like to sign up as an Enterprise customer please get in touch.Follow us on Twitter and Linkedin.