sorry can't get this for you
P

6 minutes read

March 14, 2023

Tutorial: How to post Stable Diffusion images to Slack

How to post inference images to a Slack channel within a Stable Diffusion pipeline.

If you're already familiar with PipelineCloud, then you know how it can provide a powerful platform for running complex ML tasks without the need for expensive hardware or having to handle any tedious ML-ops infrastructure yourself. In addition, you also have the power to execute arbitrary Python, giving you a lot of flexibility and control over your ML pipelines. For instance, you could set up a notification scheme, where a message is sent to a 3rd party service at some point during the execution of one your pipelines.In this guide, we'll show you how to post your inference images to a Slack channel each time a prediction is made. As our inference model, we'll be using a pretrained HuggingFace stable-diffusion pipeline and building on deploy a stable diffusion pipeline. We'll be updating some of the code laid out in that guide, so make sure you've gone through that example first.Assumes prior knowledge. This guide assumes you are already familiar with deploying HuggingFace diffusers models on PipelineCloud and will build on deploy a stable diffusion pipeline.NOTE: This is a walkthrough, so many of the below code snippets are mere chunks of a larger script. If you're skimming or just want to see code, then skip to the conclusion where you'll find the complete script.

Background

In deploy a stable diffusion pipeline, we saw how to package a pretrained stable-diffusion HuggingFace pipeline into a deployable unit of code and upload that code to PipelineCloud. This enabled us to perform remote inferences in the cloud by making a HTTP POST request to an endpoint on the pipeline-ai API gateway.The deployable unit of code is a pipeline-ai pipeline. This is where you define the set of instructions that should be followed when an inference call is made to the endpoint. For instance, in the stable diffusion pipeline, the instructions involved creating a model instance, loading the HF model into memory, parsing the input to the pipeline and then passing it to the model to generate a list of images.
If we then want those images to be posted to Slack, we just need to add an additional step at the end of the stable diffusion pipeline which calls some other pipeline_function which will handle that logic. So that's where we're heading, but we'll get to that later. First, we'll see how to set things up in Slack to allow our Python application to post to a channel and then implement a simple notification client class to interface with Slack. Seeing as we'll be using slack_sdk for this, we'll also need to set up a custom Python environment on PipelineCloud, as it isn't available in any of the public environments.

Posting to a Slack channel

In order to post messages or files to a Slack channel from our own Python application, we'll need to interact with the Slack API in some form or other. There are a few ways to do this, but the simplest is to leverage the Python Slack SDK, which contains a web package with a bunch of methods to interact with various API resources, such as conversations (i.e channels), chats and files, to name a few.

Setting up the Slack App

The Python application needs to interact with a Slack App, so you'll need to set one up. This guide will walk you through how to setup a basic app in your workspace. The bot token you generate while following this guide, will associate your Python application (the bot) with a Slack app installed in a workspace. You'll need to make sure that you have:- A bot token, which we'll save as an environment variable.- Added the following to your bot token scopes: chat:writefiles:read and files:write. You can always update these later so don't worry, and the Slack API responds with any missing scopes required when you try to perform an action on a given resource.- Added your new Slack App to the Slack channel you want your Python application to post to.- The ID of the Slack channel, which we'll also save as an environment variable.Note that to get the ID of the Slack channel, you can query the conversations resource:
1curl https://slack.com/api/conversations.list -H "Authorization: Bearer xoxb-1234..."
There should be a number of channels in the response so just look for the ID of the channel with the right name.

Once you're sure you have all that, save your bot token and channel ID to a .env file:

1SLACK_BOT_TOKEN=xoxb-__fill_me_in__
2SLACK_CHANNEL=__fill_me_in__

Now that we have everything set up on Slack, let's connect to the App with a Python client.

Implementing a Client wrapper

In this section, we'll build out a simple NotificationClient class which wraps around the Slack WebClient. We'll implement 2 methods for posting content to the channel: one for text messages and another for uploading files. Since we'll be interacting with the Slack API using slack_sdk, make sure you have added it to your local environment, using pippoetry or whichever package manager you prefer. We'll also be loading the .env environment variables automatically using the dotenv package, so you'll want to install that too. This saves us from having to export variables each time when we open a new shell.
1import os
2import typing as t
3
4from dotenv import load_dotenv
5from slack_sdk import WebClient
6
7load_dotenv()
8
9
10class NotificationClient:
11    """
12    A wrapper class for posting messages and uploading files to a channel.
13    Default client is the Slack WebClient. Inject another client if you'd like,
14    but ensure the client interface matches the methods called here.
15    """
16
17    def __init__(self, Client=WebClient, token=os.getenv("SLACK_BOT_TOKEN")):
18        self.client = Client(token=token)
19
20    def post_message(self, text: str, channel=os.getenv("SLACK_CHANNEL")):
21        try:
22            self.client.chat_postMessage(text=text, channel=channel)
23        except Exception as e:
24            raise RuntimeError(f"Error posting message: {e}") from e
25
26    def upload_file(
27        self,
28        file: t.Union[str, bytes],
29        title="New Dreambooth image generated",
30        channel: t.Optional[str] = os.getenv("SLACK_CHANNEL"),
31    ):
32        try:
33            self.client.files_upload_v2(
34                channel=channel, file=file, title=title, filename=title
35            )
36        except Exception as e:
37            raise RuntimeError(f"Error uploading file: {e}") from e
You can test the client and whether you have correctly set up the Slack App from the previous section, by trying to post a text message to your Slack channel. For instance you could add the following lines to the above code:
1if __name__ == "__main__":
2    client = NotificationClient()
3    client.post_message(text="Hello Slack channel"

and then run the script. If everything went well you should see a new chat message in your Slack channel!

From a Slack perspective, this is pretty much the bulk of what we'll need to post inference images of our stable-diffusion pipeline to the channel. The only additional thing we'll need is to embed logic for calling the upload_file method in the workflow of the stable diffusion pipeline. But we'll get to that later, once we've set up the custom environment.

Creating the remote Python environment

For the pipeline that we'll be developing, we'll need some Python packages that aren't included in the default environment , e.g. the slack_sdk. This means that we'll need to create a new custom environment and add all the required packages. The easiest way to achieve this is by using the pipeline-ai CLI. We recommend that you have the latest version of pipeline-ai installed.First login with the pipeline CLI. We will be interacting with the Pipeline API using the CLI and assume you have authenticated. For more information about how to authenticate using the CLI, see the authentication guide.

To create a new environment, named sd-slack say, then simply run

1pipeline environments create sd-slack

in a shell with your local environment (with pipeline-ai) activated. You can check that it was created successfully by fetching it by name:

1pipeline environments get -n sd-slack

Here you should see a response with an empty list of python_requirements, which are the Python packages in your environment. Then create a local requirements.txt file containing the following lines:

1transformers==4.26.1
2torch==1.13.1
3diffusers==0.13.1
4accelerate==0.17.1
5slack-sdk==3.20.2

and then add all these packages to your custom environment by running:

1cat requirements.txt | xargs pipeline environments update -n sd-slack add

You should now see these packages in the environment python_requirements. Note that you'll need the ID of your custom environment when uploading the pipeline to PipelineCloud.

Adding Slack notification

Now that we've set up our Slack client and custom environment, we're ready to integrate slack notification into the stable diffusion pipeline:
1import typing as t
2
3from PIL.Image import Image
4from pipeline import Pipeline, Variable
5
6
7PIPELINE_NAME = "sd-dreambooth"
8
9# The `pipeline-ai` pipeline
10with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
11    # Define pipeline inputs
12    input_kwargs = Variable(dict, is_input=True)
13    pipeline.add_variables(input_kwargs)
14
15    # Create and load model
16    model = SDDreambooth()
17    model.load()
18
19    # Feed inputs to model
20    context: InputKwargs = model.set_kwargs(input_kwargs)
21    images: t.List[Image] = model.predict()
22
23    # Format the images and output result
24    formatted_images: t.List[str] = model.format_images(images)
25    pipeline.output(formatted_images)
26
27    # Send images to slack
28    post_to_channel(formatted_images, context)

Only 2 things have changed from the original pipeline:

- When we set the kwargs to the model on line 20, we now also return the context of the run so that we can use it elsewhere in the pipeline.

- We have added a function, post_to_channel, at the end of the pipeline which will be responsible for posting the output of the pipeline, formatted_images, to the Slack channel. We also pass the context of the run to give additional context to the posted images, e.g. the input prompt.

Since we are calling the post_to_channel within a Pipeline context manager and want the runtime values to be passed to the function, we need to decorate it with a pipeline_function decorator:

1@pipeline_function
2def post_to_channel(images: t.List[str], context: InputKwargs) -> None:
3    """
4    Send generated images to a (Slack) channel.
5    """
6    client = NotificationClient()
7    prompt = context.get("prompt", "No prompt text")
8    title = f"New generation from {PIPELINE_NAME}: {prompt}"
9    for image in images:
10        client.upload_file(file=base64.b64decode(image), title=title)
Here we simply instantiate the client, grab the prompt from the context and upload each generated image to the Slack channel. You could also post additional context data besides the prompt if you want.

Running the pipeline on Pipeline Cloud

Before we can run the pipeline on Pipeline Cloud, we need to upload it to the servers. Assuming you have authenticated using the CLI , we 'get' the pipeline, before instantiating a connection to Pipeline Cloud and uploading our pipeline:
1from pipeline import PipelineCloud
2
3dreambooth_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
4api = PipelineCloud()
5uploaded_pipeline = api.upload_pipeline(
6        dreambooth_pipeline, environment="YOUR_ENVIRONMENT_ID"
7)
8print(f"Uploaded pipeline id: {uploaded_pipeline.id}")
Just be sure to replace YOUR_ENVIRONMENT_ID with the ID of the custom environment you created previously, which you can get using the CLI:
1pipeline environments get -n sd-slack
During this stage, the pipeline-ai library will serialize all your code and post your pipeline to an endpoint for creating pipelines in our main API gateway. And now we run the pipeline, supplying an input dictionary of type InputKwargs :
1run = api.run_pipeline(
2    uploaded_pipeline.id,
3    {
4        "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
5        "num_inference_steps": 100
6    },
7)
After running this you should see your inference images posted to your Slack channel! The first time you run the pipeline, it will take about a minute because the pipeline won't be cached on our servers. Subsequent runs won't be subject to this cold start though and should be pretty speedy! Just make sure you move the run_pipeline call into another script and don't execute the whole script again because you'll be uploading a new pipeline each time!

Conclusion

In this guide, we saw how to connect up our Python application with Slack using the slack_sdk and start posting messages to a Slack channel. We created a notification client wrapper class around the Slack WebClient which could also handle uploading files to the channel. We then updated the stable diffusion pipeline by adding a new pipeline_function call which posts the inference images to the the channel after each prediction.

Complete script

1import base64
2import io
3import os
4import random
5import typing as t
6
7import numpy as np
8import torch
9from diffusers.utils import logging
10from dotenv import load_dotenv
11from PIL.Image import Image
12from pipeline import (
13    Pipeline,
14    PipelineCloud,
15    Variable,
16    pipeline_function,
17    pipeline_model,
18)
19from slack_sdk import WebClient
20
21load_dotenv()
22
23PIPELINE_NAME = "sd-dreambooth"
24
25
26class NotificationClient:
27    """
28    A wrapper class for posting messages and uploading files to a channel.
29    Default client is the Slack WebClient. Inject another client if you'd like,
30    but ensure the client interface matches the methods called here.
31    """
32
33    def __init__(self, Client=WebClient, token=os.getenv("SLACK_BOT_TOKEN")):
34        self.client = Client(token=token)
35
36    def post_message(self, text: str, channel=os.getenv("SLACK_CHANNEL")):
37        try:
38            self.client.chat_postMessage(text=text, channel=channel)
39        except Exception as e:
40            raise RuntimeError(f"Error posting message: {e}") from e
41
42    def upload_file(
43        self,
44        file: t.Union[str, bytes],
45        title="New Dreambooth image generated",
46        channel: t.Optional[str] = os.getenv("SLACK_CHANNEL"),
47    ):
48        try:
49            self.client.files_upload_v2(
50                channel=channel, file=file, title=title, filename=title
51            )
52        except Exception as e:
53            raise RuntimeError(f"Error uploading file: {e}") from e
54
55
56logging.disable_progress_bar()
57logging.set_verbosity_error()
58
59
60# The shape of the input keyword arguments
61class InputKwargs(t.TypedDict):
62    prompt: str
63    num_images_per_prompt: t.Optional[int]
64    height: t.Optional[int]
65    width: t.Optional[int]
66    num_inference_steps: t.Optional[int]
67    guidance_scale: t.Optional[float]
68    eta: t.Optional[float]
69    seed: t.Optional[int]
70
71
72DEFAULT_KWARGS: InputKwargs = {
73    "prompt": "Mountain winds and babbling springs and moonlight seas.",
74    "num_images_per_prompt": 1,
75    "height": 512,
76    "width": 512,
77    "num_inference_steps": 50,
78    "guidance_scale": 7.5,
79    "eta": 0.0,
80    "seed": None,
81}
82
83
84@pipeline_model
85class SDDreambooth:
86    def __init__(self) -> None:
87        self.input_kwargs = None
88        self.model = None
89
90    @pipeline_function(run_once=True, on_startup=True)
91    def load(self) -> None:
92        """
93        Load the model into memory. The decorator parameters ensure the
94        model is loaded only when needed, i.e. it is not cached on the GPU.
95        """
96        from diffusers import DiffusionPipeline
97
98        device = torch.device("cuda:0")
99        self.model = DiffusionPipeline.from_pretrained(
100            "sd-dreambooth-library/herge-style"
101        )
102        self.model.to(device)
103
104    @pipeline_function
105    def set_kwargs(self, input_kwargs: InputKwargs) -> InputKwargs:
106        """
107        Set the model kwargs given the input kwargs.
108        These are used in other methods.
109        """
110        self.input_kwargs = {**DEFAULT_KWARGS, **input_kwargs}
111        return self.input_kwargs
112
113    @pipeline_function
114    def seed_everything(self) -> int:
115        """
116        Sets seed for pseudo-random number generators in: pytorch, numpy, python.random.
117        `PL_GLOBAL_SEED` ensures the seed is passed to any spawned subprocesses.
118        """
119        seed = self.input_kwargs.pop("seed") or random.randint(1, 1_000_000)
120        os.environ["PL_GLOBAL_SEED"] = str(seed)
121        random.seed(seed)
122        np.random.seed(seed)
123        torch.manual_seed(seed)
124        torch.cuda.manual_seed_all(seed)
125        return seed
126
127    @pipeline_function
128    def predict(self) -> t.List[Image]:
129        """
130        A forward pass through the network given the `input_kwargs`.
131        """
132        # Ensure the input kwargs have been set
133        if self.input_kwargs is None:
134            raise TypeError(
135                "Input kwargs cannot be None. Set them before calling this method."
136            )
137        seed = self.seed_everything()
138        generator = torch.Generator(device=0).manual_seed(seed)
139
140        images = self.model(**self.input_kwargs, generator=generator).images
141
142        return images
143
144    @pipeline_function
145    def to_string(self, image: Image) -> str:
146        """
147        Converts a `PIL` image to a base64 encoded string.
148        """
149        buffered = io.BytesIO()
150        image.save(buffered, format="JPEG")
151        img_str = base64.b64encode(buffered.getvalue()).decode()
152        return img_str
153
154    @pipeline_function
155    def format_images(self, images: t.List[Image]) -> t.List[str]:
156        """
157        Formats a list of `PIL` images into a list of base64 encoded strings.
158        """
159        return [self.to_string(image) for image in images]
160
161
162@pipeline_function
163def post_to_channel(images: t.List[str], context: InputKwargs) -> None:
164    """
165    Send generated images to a (Slack) channel.
166    """
167    client = NotificationClient()
168    prompt = context.get("prompt", "No prompt text")
169    title = f"New generation from {PIPELINE_NAME}: {prompt}"
170    for image in images:
171        client.upload_file(file=base64.b64decode(image), title=title)
172
173
174with Pipeline(PIPELINE_NAME, min_gpu_vram_mb=3040) as pipeline:
175    # Define pipeline inputs
176    input_kwargs = Variable(dict, is_input=True)
177    pipeline.add_variables(input_kwargs)
178
179    # Create and load model
180    model = SDDreambooth()
181    model.load()
182
183    # Feed inputs to model
184    context: InputKwargs = model.set_kwargs(input_kwargs)
185    images: t.List[Image] = model.predict()
186
187    # Format the images and output result
188    formatted_images: t.List[str] = model.format_images(images)
189    pipeline.output(formatted_images)
190
191    # Send images to slack
192    post_to_channel(formatted_images, context)
193
194
195dreambooth_pipeline = Pipeline.get_pipeline(PIPELINE_NAME)
196
197
198api = PipelineCloud()
199uploaded_pipeline = api.upload_pipeline(
200    dreambooth_pipeline, environment="YOUR_ENVIRONMENT_ID"
201)
202print(f"Uploaded pipeline id: {uploaded_pipeline.id}")
203
204run = api.run_pipeline(
205    uploaded_pipeline.id,
206    {
207        "prompt": "Mountain winds and babbling springs and moonlight seas, futuristic, herge_style.",
208        "num_inference_steps": 25,
209    },
210)
211

ABOUT PIPELINE.AIPipeline AI makes it easy to work with ML models and to deploy AI at scale. The self-serve platform provides a fast pay-as-you-go API to run pretrained or proprietory models in production. If you are looking to deploy a large product and would like to sign up as an Enterprise customer please get in touch.Follow us on Twitter and Linkedin.