Sending Alerts to Slack from Snowflake
- Date
- Jeff SkoldbergPrincipal Consultant at Green Mountain Data Solutions
Snowflake has recently expanded alerting capability, allowing alerts to be sent via Email or Webhook. Webhook alerts are very powerful, because this enables the ability to send alerts to Slack, Microsoft Teams, Pager Duty, etc. This is useful for a variety of use cases including pipeline failure alerts and data driven alerts.
This post will provide a step-by-step guide to configure Snowflake alerts in Slack. We will cover a detailed example that will send an alert when warehouse usage increases.
Step 1: Create a Webhook in Slack
Log in to your Slack account, then go to this page.
Click Create an App
Choose “From Scratch”
On the next screen, give the app a name, choose a workspace, then click Create App
The next screen will present your Application Credentials.
Click Incoming Webhooks in the left sidebar:
Toggle “Active Incoming Webhooks” to “On”
After you change the toggle, scroll down to the bottom of that page and click “Add New Webhook to Workspace”.
Choose which channel you want the Snowflake alerts to go to. I created a new channel called “snowflake-alerts”, then I refreshed this page to get it to show in the dropdown.
Click Allow.
Test the Webhook
After you click allow above, the next page will give you your Webhook URL and a test CURL command.
Copy and paste the test command into your terminal. If you’re on Windows, use Git Bash.
It should instantly send a message to your new Slack channel.
Congrats, your new Webhook is now working!
Optional: Give the app a more attractive Snowflake ❄️ avatar
- Go back to the “Basic Information” tab.
- Scroll to the bottom.
- Download a Snowflake logo and upload it here.
- Test again.
Now it looks much better!
Step 2: Create Snowflake Secret
The Webhook URL contains a secret. Replace the secret string below with your secret string, then run this command in Snowflake.
Reminder: Secrets are schema level objects. Be careful about your worksheet database and schema context, or use fully qualified names.
In my case, I created a secret called gmds_slack_secret
in the public
schema of the analytics
database.
use schmea analytics.public;
CREATE OR REPLACE SECRET gmds_slack_secret
TYPE = GENERIC_STRING
SECRET_STRING = 'T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX';
Step 3: Create a webhook notification integration
Now that we’ve successfully created the secret, let’s create the Notification Integration:
CREATE OR REPLACE NOTIFICATION INTEGRATION gmds_slack_webhook_integration
TYPE=WEBHOOK
ENABLED=TRUE
WEBHOOK_URL='https://hooks.slack.com/services/SNOWFLAKE_WEBHOOK_SECRET'
WEBHOOK_SECRET=analytics.public.gmds_slack_secret
WEBHOOK_BODY_TEMPLATE='{"text": "SNOWFLAKE_WEBHOOK_MESSAGE"}'
WEBHOOK_HEADERS=('Content-Type'='application/json');
Step 4: Send the notification
To send a notification, we use the built-in SYSTEM$SEND_SNOWFLAKE_NOTIFICATION
stored procedure. We must pass the SANITIZE_WEBHOOK_CONTENT
function to the procedure to remove placeholder (i.e. SNOWFLAKE_WEBHOOK_SECRET
from the message.
Here is the code I ran in my account:
CALL SYSTEM$SEND_SNOWFLAKE_NOTIFICATION(
SNOWFLAKE.NOTIFICATION.TEXT_PLAIN(
SNOWFLAKE.NOTIFICATION.SANITIZE_WEBHOOK_CONTENT('This is a test message from my Snowflake Account')
),
SNOWFLAKE.NOTIFICATION.INTEGRATION('gmds_slack_webhook_integration')
);
The notification arrived in slack instantly!
Now that we have the basic gears in place, let’s move on to a real-world example.
Alerts Example: Send an alert when warehouse usage spikes
Craft a SQL query to identify warehouse usage spikes and wrap it in a serverless task
The following query compares recent usage (last completed hour), to the average hourly usage of each warehouse over the last month. In this case, we will flag warehouses with 50% increased usage.
We are using a serverless task because it will save us money! Just omit the warehouse name to make the task serverless.
For example purposes, I’m going to union a dummy record to ensure that every execution of the task produces a row and an alert.
CREATE OR REPLACE TASK monitor_warehouse_spikes
SCHEDULE = 'USING CRON 2 * * * * America/New_York'
SERVERLESS_TASK_MIN_STATEMENT_SIZE = 'XSMALL'
SERVERLESS_TASK_MAX_STATEMENT_SIZE = 'XSMALL'
as
insert into usage_spike_alerts (warehouse_name,last_hour_credits,avg_monthly_credits,credit_diff,percent_increase)
WITH last_hour_usage AS (
SELECT
warehouse_name,
sum(credits_used) AS last_hour_credits
FROM
snowflake.account_usage.warehouse_metering_history
WHERE
start_time >= DATEADD(hour, -2, CURRENT_TIMESTAMP)
AND end_time <= CURRENT_TIMESTAMP
GROUP BY
warehouse_name
),
monthly_avg_usage AS (
SELECT
warehouse_name,
AVG(credits_used) AS avg_monthly_credits
FROM
snowflake.account_usage.warehouse_metering_history
WHERE
start_time >= DATEADD(month, -1, CURRENT_TIMESTAMP)
AND start_time < DATEADD(hour, -1, CURRENT_TIMESTAMP) -- Exclude last hour
GROUP BY
warehouse_name
),
spikes AS (
SELECT
l.warehouse_name,
l.last_hour_credits,
m.avg_monthly_credits,
l.last_hour_credits - m.avg_monthly_credits AS credit_diff,
ROUND((l.last_hour_credits / NULLIF(m.avg_monthly_credits, 0) - 1) * 100, 2) AS percent_increase
FROM
last_hour_usage l
INNER JOIN
monthly_avg_usage m
ON
l.warehouse_name = m.warehouse_name
WHERE
l.last_hour_credits > m.avg_monthly_credits * 1.5 -- Customize spike threshold (e.g., 50% higher)
)
SELECT
warehouse_name,
last_hour_credits,
avg_monthly_credits,
credit_diff,
percent_increase
FROM
spikes
union select 'dummy_row', 0,0,0,0 -- for example puroses to ensure at least 1 row always comes to test the alert
ORDER BY
percent_increase DESC
;
show tasks;
-- don't forget to enable your task!
alter task set monitor_warehouse_spikes resume;
Create a table to store the query results
CREATE or replace TABLE usage_spike_alerts (
alert_id INT AUTOINCREMENT PRIMARY KEY,
warehouse_name STRING NOT NULL,
last_hour_credits FLOAT NOT NULL,
avg_monthly_credits FLOAT NOT NULL,
credit_diff FLOAT NOT NULL,
percent_increase FLOAT NOT NULL,
inserted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
alert_sent boolean default false
);
Note this table has a few helper columns with default values not included in the query results:
- alert_id: primary key
- inserted_at: helps us know when the task inserted the record.
- alert_sent: initially false, will be set to true after the alert is sent.
Test the task, review the results
execute task monitor_warehouse_spikes;
select * from usage_spike_alerts where not alert_sent;
Create a procedure to send the alert if a spike exists
We want to do the following:
- Check the
usage_spike_alerts
table for unsent records:alert_sent==false
- If we have unsent records, send them to slack.
- Mark the record as sent.
- Output the number of alerts sent in the SQL console.
CREATE OR REPLACE PROCEDURE send_usage_spike_alerts()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'send_alerts'
AS $$
import snowflake.snowpark as snowpark
def send_alerts(session):
query = """
SELECT warehouse_name, last_hour_credits, avg_monthly_credits, credit_diff, percent_increase
FROM usage_spike_alerts
WHERE alert_sent = FALSE
"""
results = session.sql(query).collect()
if not results:
return "No alerts to send."
alerts_sent = 0
for row in results:
try:
# Construct the single-line message
message_content = (
f"🚨 *Warehouse Spike Detected:* "
f"*Warehouse*: {row['WAREHOUSE_NAME']}, "
f"*Last Hour Credits*: {row['LAST_HOUR_CREDITS']:.4f}, "
f"*1 Month Avg Hourly Credit,*: {row['AVG_MONTHLY_CREDITS']:.4f}, "
f"*Difference*: {row['CREDIT_DIFF']:.4f}, "
f"*Percent Increase*: {row['PERCENT_INCREASE']:.2f}%"
)
# Sanitize the message
sanitized_message_query = f"""
SELECT SNOWFLAKE.NOTIFICATION.SANITIZE_WEBHOOK_CONTENT('{message_content}')
"""
sanitized_message = session.sql(sanitized_message_query).collect()[0][0]
# Send the alert
notification_query = f"""
CALL SYSTEM$SEND_SNOWFLAKE_NOTIFICATION(
SNOWFLAKE.NOTIFICATION.TEXT_PLAIN('{sanitized_message}'),
SNOWFLAKE.NOTIFICATION.INTEGRATION('gmds_slack_webhook_integration')
)
"""
session.sql(notification_query).collect()
# Mark the alert as sent
update_query = f"""
UPDATE usage_spike_alerts
SET alert_sent = TRUE
WHERE warehouse_name = '{row['WAREHOUSE_NAME']}'
AND last_hour_credits = {row['LAST_HOUR_CREDITS']}
AND avg_monthly_credits = {row['AVG_MONTHLY_CREDITS']}
"""
session.sql(update_query).collect()
alerts_sent += 1
except Exception as e:
session.add_log(f"Error sending alert for warehouse {row['WAREHOUSE_NAME']}: {str(e)}")
continue
return f"{alerts_sent} alert(s) sent."
$$;
Note
I experienced challenges sending new lines \n
to slack via SANITIZE_WEBHOOK_CONTENT
function. Slack requires a text string literal with double backslash \\n
; so a python string of \\\\n
should work. However, when python passes this string literal to the SQL function, something is happening and the alert is not sent. I was not able to achieve python based alerts with new lines, although this works manually: SANITIZE_WEBHOOK_CONTENT('line1\\line2')
Manually test the procedure
We’ve already inserted a test row into the “spikes” table. Let’s run the sproc and ensure:
- The records are updated to
alert_sent==true
- We receive the message in slack
execute task monitor_warehouse_spikes; -- if you haven't already...
select * from usage_spike_alerts where not alert_sent; -- review it: sent == false
CALL send_usage_spike_alerts(); -- send the alert
-- wait for the alert to come
select * from usage_spike_alerts where not alert_sent; -- 0 rows
You should have received an alert in Slack!
Chain the task and the procedure together
We want the hourly schedule of the monitor_warehouse_spikes
task to handle the end-to-end process; let’s make sure that after the monitor_warehouse_spikes
task is run, the procedure that sends the alert is also run.
First, wrap the procedure in a serverless task and turn it on:
CREATE TASK send_usage_spike_alerts_task
AS
CALL send_usage_spike_alerts();
alter task send_usage_spike_alerts_task resume;
Next, chain the tasks together:
ALTER TASK send_usage_spike_alerts_task
ADD AFTER monitor_warehouse_spikes;
Test it:
execute task monitor_warehouse_spikes;
This task should now add the new records to the table and send the alert(s) to slack!
Note
When calling send_usage_spike_alerts()
procedure directly, the message appears in slack right away. When chaining send_usage_spike_alerts_task
to monitor_warehouse_spikes
task, it takes up to 3 minutes for the alert to come.
Review theß task DAG and History
From the Data sidebar menu in the Snowsight UI (database icon in the sidebar), navigate to the database and schema that contains your tasks. Expand the tasks container and select either task we created. Then click the “Graph” tab in the main frame. Here you can manually trigger the task.
You can also review the task history on the “Run History” tab:
Design Notes
The query to check for spikes and sending the alert could have been done in a single task. One may argue that chaining two tasks adds unnecessary complexity. However, I like this design for the following reasons.
- The spikes are logged in a table forever. Technically this is not necessary for alerting, but it really nice to have.
- Logging the spikes to a table is one unique unit of work and gets its own task.
- By abstracting the spike monitoring logic from the alert itself means we can change the logic without needing to re-test the send-alert functionality.
Wrap Up
Sending Snowflake alerts to slack is incredibly useful. The use cases for this type of alerting are endless!
This article has equipped you with the knowledge of:
- Creating a webhook in Slack.
- Creating Secret in Snowflake.
- Creating a Webhook Integration in Snowflake.
- Creating a task to log warehouse usage spikes.
- Creating a procedure to send alerts based on a condition. In this case the existence of rows in a query.
- Wrapping a procedure in a task.
- Chaining tasks together.
I’m looking forward to hearing about the use cases you come up with! 🥂