
6 minutes read
March 14, 2023
Tutorial: How to post Stable Diffusion images to Slack
How to post inference images to a Slack channel within a Stable Diffusion pipeline.
PipelineCloud
, then you know how it can provide a powerful platform for running complex ML tasks without the need for expensive hardware or having to handle any tedious ML-ops infrastructure yourself. In addition, you also have the power to execute arbitrary Python, giving you a lot of flexibility and control over your ML pipelines. For instance, you could set up a notification scheme, where a message is sent to a 3rd party service at some point during the execution of one your pipelines.In this guide, we'll show you how to post your inference images to a Slack channel each time a prediction is made. As our inference model, we'll be using a pretrained HuggingFace stable-diffusion pipeline and building on deploy a stable diffusion pipeline. We'll be updating some of the code laid out in that guide, so make sure you've gone through that example first.Assumes prior knowledge. This guide assumes you are already familiar with deploying HuggingFace diffusers
models on PipelineCloud
and will build on deploy a stable diffusion pipeline.NOTE: This is a walkthrough, so many of the below code snippets are mere chunks of a larger script. If you're skimming or just want to see code, then skip to the conclusion where you'll find the complete script.Background
In deploy a stable diffusion pipeline, we saw how to package a pretrained stable-diffusion HuggingFace pipeline into a deployable unit of code and upload that code toPipelineCloud
. This enabled us to perform remote inferences in the cloud by making a HTTP POST
request to an endpoint on the pipeline-ai
API gateway.The deployable unit of code is a pipeline-ai
pipeline. This is where you define the set of instructions that should be followed when an inference call is made to the endpoint. For instance, in the stable diffusion pipeline, the instructions involved creating a model instance, loading the HF model into memory, parsing the input to the pipeline and then passing it to the model to generate a list of images.
pipeline_function
which will handle that logic. So that's where we're heading, but we'll get to that later. First, we'll see how to set things up in Slack to allow our Python application to post to a channel and then implement a simple notification client class to interface with Slack. Seeing as we'll be using slack_sdk
for this, we'll also need to set up a custom Python environment on PipelineCloud
, as it isn't available in any of the public environments.Posting to a Slack channel
In order to post messages or files to a Slack channel from our own Python application, we'll need to interact with the Slack API in some form or other. There are a few ways to do this, but the simplest is to leverage the Python Slack SDK, which contains aweb
package with a bunch of methods to interact with various API resources, such as conversations
(i.e channels), chats
and files
, to name a few.Setting up the Slack App
The Python application needs to interact with a Slack App, so you'll need to set one up. This guide will walk you through how to setup a basic app in your workspace. The bot token you generate while following this guide, will associate your Python application (the bot) with a Slack app installed in a workspace. You'll need to make sure that you have:- A bot token, which we'll save as an environment variable.- Added the following to your bot token scopes:chat:write
, files:read
and files:write
. You can always update these later so don't worry, and the Slack API responds with any missing scopes required when you try to perform an action on a given resource.- Added your new Slack App to the Slack channel you want your Python application to post to.- The ID of the Slack channel, which we'll also save as an environment variable.Note that to get the ID of the Slack channel, you can query the conversations resource:1curl https://slack.com/api/conversations.list -H "Authorization: Bearer xoxb-1234..."
Once you're sure you have all that, save your bot token and channel ID to a .env
file:
1SLACK_BOT_TOKEN=xoxb-__fill_me_in__
2SLACK_CHANNEL=__fill_me_in__
Now that we have everything set up on Slack, let's connect to the App with a Python client.
Implementing a Client wrapper
In this section, we'll build out a simpleNotificationClient
class which wraps around the Slack WebClient
. We'll implement 2 methods for posting content to the channel: one for text messages and another for uploading files. Since we'll be interacting with the Slack API using slack_sdk
, make sure you have added it to your local environment, using pip
, poetry
or whichever package manager you prefer. We'll also be loading the .env
environment variables automatically using the dotenv
package, so you'll want to install that too. This saves us from having to export variables each time when we open a new shell.1import os
2import typing as t
3
4from dotenv import load_dotenv
5from slack_sdk import WebClient
6
7load_dotenv()
8
9
10class NotificationClient:
11 """
12 A wrapper class for posting messages and uploading files to a channel.
13 Default client is the Slack WebClient. Inject another client if you'd like,
14 but ensure the client interface matches the methods called here.
15 """
16
17 def __init__(self, Client=WebClient, token=os.getenv("SLACK_BOT_TOKEN")):
18 self.client = Client(token=token)
19
20 def post_message(self, text: str, channel=os.getenv("SLACK_CHANNEL")):
21 try:
22 self.client.chat_postMessage(text=text, channel=channel)
23 except Exception as e:
24 raise RuntimeError(f"Error posting message: {e}") from e
25
26 def upload_file(
27 self,
28 file: t.Union[str, bytes],
29 title="New Dreambooth image generated",
30 channel: t.Optional[str] = os.getenv("SLACK_CHANNEL"),
31 ):
32 try:
33 self.client.files_upload_v2(
34 channel=channel, file=file, title=title, filename=title
35 )
36 except Exception as e:
37 raise RuntimeError(f"Error uploading file: {e}") from e
1if __name__ == "__main__":
2 client = NotificationClient()
3 client.post_message(text="Hello Slack channel"
and then run the script. If everything went well you should see a new chat message in your Slack channel!
From a Slack perspective, this is pretty much the bulk of what we'll need to post inference images of our stable-diffusion pipeline to the channel. The only additional thing we'll need is to embed logic for calling the upload_file
method in the workflow of the stable diffusion pipeline. But we'll get to that later, once we've set up the custom environment.
Creating the remote Python environment
For the pipeline that we'll be developing, we'll need some Python packages that aren't included in the default environment , e.g. theslack_sdk
. This means that we'll need to create a new custom environment and add all the required packages. The easiest way to achieve this is by using the pipeline-ai
CLI. We recommend that you have the latest version of pipeline-ai
installed.First login with the pipeline CLI. We will be interacting with the Pipeline API using the CLI and assume you have authenticated. For more information about how to authenticate using the CLI, see the authentication guide.To create a new environment, named sd-slack
say, then simply run
1pipeline environments create sd-slack
in a shell with your local environment (with pipeline-ai
) activated. You can check that it was created successfully by fetching it by name:
1pipeline environments get -n sd-slack
Here you should see a response with an empty list of python_requirements
, which are the Python packages in your environment. Then create a local requirements.txt
file containing the following lines:
1transformers==4.26.1
2torch==1.13.1
3diffusers==0.13.1
4accelerate==0.17.1
5slack-sdk==3.20.2
and then add all these packages to your custom environment by running:
1cat requirements.txt | xargs pipeline environments update -n sd-slack add
You should now see these packages in the environment python_requirements
. Note that you'll need the ID of your custom environment when uploading the pipeline to PipelineCloud
.
Adding Slack notification
Now that we've set up our Slack client and custom environment, we're ready to integrate slack notification into the stable diffusion pipeline:1import typing as t
2
3from PIL.Image import Image
4from pipeline import Pipeline, Variable
5
6
7PIPELINE_NAME = "sd-dreambooth"
8
9# The `pipeline-ai` pipeline
10with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
11 # Define pipeline inputs
12 input_kwargs = Variable(dict, is_input=True)
13 pipeline.add_variables(input_kwargs)
14
15 # Create and load model
16 model = SDDreambooth()
17 model.load()
18
19 # Feed inputs to model
20 context: InputKwargs = model.set_kwargs(input_kwargs)
21 images: t.List[Image] = model.predict()
22
23 # Format the images and output result
24 formatted_images: t.List[str] = model.format_images(images)
25 pipeline.output(formatted_images)
26
27 # Send images to slack
28 post_to_channel(formatted_images, context)
Only 2 things have changed from the original pipeline:
- When we set the kwargs
to the model on line 20, we now also return the context of the run so that we can use it elsewhere in the pipeline.
- We have added a function, post_to_channel
, at the end of the pipeline which will be responsible for posting the output of the pipeline, formatted_images
, to the Slack channel. We also pass the context of the run to give additional context to the posted images, e.g. the input prompt.
Since we are calling the post_to_channel
within a Pipeline
context manager and want the runtime values to be passed to the function, we need to decorate it with a pipeline_function
decorator:
1@pipeline_function
2def post_to_channel(images: t.List[str], context: InputKwargs) -> None:
3 """
4 Send generated images to a (Slack) channel.
5 """
6 client = NotificationClient()
7 prompt = context.get("prompt", "No prompt text")
8 title = f"New generation from {PIPELINE_NAME}: {prompt}"
9 for image in images:
10 client.upload_file(file=base64.b64decode(image), title=title)
Running the pipeline on Pipeline Cloud
Before we can run the pipeline on Pipeline Cloud, we need to upload it to the servers. Assuming you have authenticated using the CLI , we 'get' the pipeline, before instantiating a connection to Pipeline Cloud and uploading our pipeline:1from pipeline import PipelineCloud
2
3dreambooth_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
4api = PipelineCloud()
5uploaded_pipeline = api.upload_pipeline(
6 dreambooth_pipeline, environment="YOUR_ENVIRONMENT_ID"
7)
8print(f"Uploaded pipeline id: {uploaded_pipeline.id}")
YOUR_ENVIRONMENT_ID
with the ID of the custom environment you created previously, which you can get using the CLI:1pipeline environments get -n sd-slack
pipeline-ai
library will serialize all your code and post your pipeline to an endpoint for creating pipelines in our main API gateway. And now we run the pipeline, supplying an input dictionary of type InputKwargs
:1run = api.run_pipeline(
2 uploaded_pipeline.id,
3 {
4 "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
5 "num_inference_steps": 100
6 },
7)
run_pipeline
call into another script and don't execute the whole script again because you'll be uploading a new pipeline each time!Conclusion
In this guide, we saw how to connect up our Python application with Slack using theslack_sdk
and start posting messages to a Slack channel. We created a notification client wrapper class around the Slack WebClient
which could also handle uploading files to the channel. We then updated the stable diffusion pipeline by adding a new pipeline_function
call which posts the inference images to the the channel after each prediction.Complete script
1import base64
2import io
3import os
4import random
5import typing as t
6
7import numpy as np
8import torch
9from diffusers.utils import logging
10from dotenv import load_dotenv
11from PIL.Image import Image
12from pipeline import (
13 Pipeline,
14 PipelineCloud,
15 Variable,
16 pipeline_function,
17 pipeline_model,
18)
19from slack_sdk import WebClient
20
21load_dotenv()
22
23PIPELINE_NAME = "sd-dreambooth"
24
25
26class NotificationClient:
27 """
28 A wrapper class for posting messages and uploading files to a channel.
29 Default client is the Slack WebClient. Inject another client if you'd like,
30 but ensure the client interface matches the methods called here.
31 """
32
33 def __init__(self, Client=WebClient, token=os.getenv("SLACK_BOT_TOKEN")):
34 self.client = Client(token=token)
35
36 def post_message(self, text: str, channel=os.getenv("SLACK_CHANNEL")):
37 try:
38 self.client.chat_postMessage(text=text, channel=channel)
39 except Exception as e:
40 raise RuntimeError(f"Error posting message: {e}") from e
41
42 def upload_file(
43 self,
44 file: t.Union[str, bytes],
45 title="New Dreambooth image generated",
46 channel: t.Optional[str] = os.getenv("SLACK_CHANNEL"),
47 ):
48 try:
49 self.client.files_upload_v2(
50 channel=channel, file=file, title=title, filename=title
51 )
52 except Exception as e:
53 raise RuntimeError(f"Error uploading file: {e}") from e
54
55
56logging.disable_progress_bar()
57logging.set_verbosity_error()
58
59
60# The shape of the input keyword arguments
61class InputKwargs(t.TypedDict):
62 prompt: str
63 num_images_per_prompt: t.Optional[int]
64 height: t.Optional[int]
65 width: t.Optional[int]
66 num_inference_steps: t.Optional[int]
67 guidance_scale: t.Optional[float]
68 eta: t.Optional[float]
69 seed: t.Optional[int]
70
71
72DEFAULT_KWARGS: InputKwargs = {
73 "prompt": "Mountain winds and babbling springs and moonlight seas.",
74 "num_images_per_prompt": 1,
75 "height": 512,
76 "width": 512,
77 "num_inference_steps": 50,
78 "guidance_scale": 7.5,
79 "eta": 0.0,
80 "seed": None,
81}
82
83
84@pipeline_model
85class SDDreambooth:
86 def __init__(self) -> None:
87 self.input_kwargs = None
88 self.model = None
89
90 @pipeline_function(run_once=True, on_startup=True)
91 def load(self) -> None:
92 """
93 Load the model into memory. The decorator parameters ensure the
94 model is loaded only when needed, i.e. it is not cached on the GPU.
95 """
96 from diffusers import DiffusionPipeline
97
98 device = torch.device("cuda:0")
99 self.model = DiffusionPipeline.from_pretrained(
100 "sd-dreambooth-library/herge-style"
101 )
102 self.model.to(device)
103
104 @pipeline_function
105 def set_kwargs(self, input_kwargs: InputKwargs) -> InputKwargs:
106 """
107 Set the model kwargs given the input kwargs.
108 These are used in other methods.
109 """
110 self.input_kwargs = {**DEFAULT_KWARGS, **input_kwargs}
111 return self.input_kwargs
112
113 @pipeline_function
114 def seed_everything(self) -> int:
115 """
116 Sets seed for pseudo-random number generators in: pytorch, numpy, python.random.
117 `PL_GLOBAL_SEED` ensures the seed is passed to any spawned subprocesses.
118 """
119 seed = self.input_kwargs.pop("seed") or random.randint(1, 1_000_000)
120 os.environ["PL_GLOBAL_SEED"] = str(seed)
121 random.seed(seed)
122 np.random.seed(seed)
123 torch.manual_seed(seed)
124 torch.cuda.manual_seed_all(seed)
125 return seed
126
127 @pipeline_function
128 def predict(self) -> t.List[Image]:
129 """
130 A forward pass through the network given the `input_kwargs`.
131 """
132 # Ensure the input kwargs have been set
133 if self.input_kwargs is None:
134 raise TypeError(
135 "Input kwargs cannot be None. Set them before calling this method."
136 )
137 seed = self.seed_everything()
138 generator = torch.Generator(device=0).manual_seed(seed)
139
140 images = self.model(**self.input_kwargs, generator=generator).images
141
142 return images
143
144 @pipeline_function
145 def to_string(self, image: Image) -> str:
146 """
147 Converts a `PIL` image to a base64 encoded string.
148 """
149 buffered = io.BytesIO()
150 image.save(buffered, format="JPEG")
151 img_str = base64.b64encode(buffered.getvalue()).decode()
152 return img_str
153
154 @pipeline_function
155 def format_images(self, images: t.List[Image]) -> t.List[str]:
156 """
157 Formats a list of `PIL` images into a list of base64 encoded strings.
158 """
159 return [self.to_string(image) for image in images]
160
161
162@pipeline_function
163def post_to_channel(images: t.List[str], context: InputKwargs) -> None:
164 """
165 Send generated images to a (Slack) channel.
166 """
167 client = NotificationClient()
168 prompt = context.get("prompt", "No prompt text")
169 title = f"New generation from {PIPELINE_NAME}: {prompt}"
170 for image in images:
171 client.upload_file(file=base64.b64decode(image), title=title)
172
173
174with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
175 # Define pipeline inputs
176 input_kwargs = Variable(dict, is_input=True)
177 pipeline.add_variables(input_kwargs)
178
179 # Create and load model
180 model = SDDreambooth()
181 model.load()
182
183 # Feed inputs to model
184 context: InputKwargs = model.set_kwargs(input_kwargs)
185 images: t.List[Image] = model.predict()
186
187 # Format the images and output result
188 formatted_images: t.List[str] = model.format_images(images)
189 pipeline.output(formatted_images)
190
191 # Send images to slack
192 post_to_channel(formatted_images, context)
193
194
195dreambooth_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
196
197
198api = PipelineCloud()
199uploaded_pipeline = api.upload_pipeline(
200 dreambooth_pipeline, environment="YOUR_ENVIRONMENT_ID"
201)
202print(f"Uploaded pipeline id: {uploaded_pipeline.id}")
203
204run = api.run_pipeline(
205 uploaded_pipeline.id,
206 {
207 "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
208 "num_inference_steps": 25,
209 },
210)
211
ABOUT PIPELINE.AIPipeline AI makes it easy to work with ML models and to deploy AI at scale. The self-serve platform provides a fast pay-as-you-go API to run pretrained or proprietory models in production. If you are looking to deploy a large product and would like to sign up as an Enterprise customer please get in touch.Follow us on Twitter and Linkedin.