How I used FastAPI to automate phishing email handling in Microsoft Teams
In this post, I will highlight how I used Python`s FastAPI to develop a custom phishing email handling functionality in Microsoft Teams using actionable message cards.
Motivation
Email is one of the most prevalent attack vectors for malicious actors. An attacker only needs one unknowing user to fall for a phishing campaign to gain foothold access to company systems and cause significant damage. Consequently, cybersecurity and IT analysts must always be vigilant about suspicious emails. Many email security providers offer solutions to prevent suspicious emails from reaching users’ inboxes, such as Microsoft’s Defender for Office 365, Proofpoint, Mimecast, and Abnormal. However, these products can introduce the risk of false positives, where legitimate emails are undelivered or stuck in quarantine. This requires SOC analysts to regularly monitor and review quarantined emails to release any legitimate ones. This motivated me to build a simple API that alerts analysts when an email is quarantined and allows quick action on the alert, such as releasing or deleting the email, changing the status of the detection, and more.
Project Architecture
This project will require to develop a FastAPI app instance endpoint to handle our program’s logic and a Python job to continuously query Microsoft Defender for Office 365 for new quarantined emails.
Below is a high-level diagram of the project architecture:
FastAPI setup
FastAPI is a modern, fast (high-performance), web framework for building APIs with Python 3.6+ based on standard Python type hints. It is designed to be easy to use and learn, while also providing the performance of asynchronous code. FastAPI is built on top of Starlette for the web parts and Pydantic for the data parts. It allows you to quickly create robust and production-ready APIs with automatic interactive documentation generated by Swagger UI and ReDoc.
To get started with FastAPI, you need to install it using pip:
pip install fastapi
pip install uvicorn[standard]
You can then create a simple FastAPI application:
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def read_root():
return {"Hello": "World"}
To run the application, use the following command:
uvicorn main:app --reload
This will start a development server and you can access the interactive API documentation at http://127.0.0.1:8000/docs
.
Now that we have setup the FastAPI instance, we can start building our endpoints. We will start by creating the endpoint that sends a Microsoft Teams alert for an analyzed email object. To do so, we will use the Microsoft Graph analyzedEmail resource type to retrieve the email’s information followed by a Microsoft Teams webhook to send the message card.
To query the Microsoft Graph API, we need to register an application in Microsoft and generate client credentials. We can do so in Azure using the App registrations service. Make sure to give the application the appropriate application permission: SecurityAnalyzedMessage.ReadWrite.All. You can follow this post in Microsoft to guide you in registering an application that uses Microsoft Graph API.
from fastapi import HTTPException
import httpx
MICROSOFT_GRAPH_API_URL = "https://graph.microsoft.com/beta/security/collaboration/analyzedEmails/"
async def get_ms_token():
tenant_id = os.environ["MICROSOFTGRAPH_TENANT_ID"]
client_id = os.environ["MICROSOFTGRAPH_CLIENT_ID"]
client_secret = os.environ["MICROSOFTGRAPH_CLIENT_SECRET"]
scope = "https://graph.microsoft.com/.default"
async with httpx.AsyncClient() as client:
token_result = await client.post(f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token", data={
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": scope
}).json()
if 'access_token' not in token_result:
raise Exception(f"Error while getting access token: {json.dumps(token_result, indent=4)}")
return token_result['access_token']
@app.get("/analyzedEmail/{analyzedEmailId}")
async def get_analyzed_email(analyzedEmailId: str):
access_token = await get_ms_token()
headers = {
"Authorization": f"Bearer {access_token}"
}
async with httpx.AsyncClient() as client:
response = await client.get(f"{MICROSOFT_GRAPH_API_URL}{analyzedEmailId}", headers=headers)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail="Error retrieving analyzed email")
Once we have obtained the analyzed email resource object, we are ready to send an alert in Microsoft Teams. The alert will be sent in the format of a message card to allow semi-automated actions by the members of the Teams channel. This will greatly facilitate the need for analysts to jump back and forth through different consoles.
API_ENDPOINT = "<enter your API URL endpoint here>"
WEBHOOK_URL = "<entern your Microsoft Teams webhook URL>"
TENANT_ID = os.environ["MICROSOFTGRAPH_TENANT_ID"]
@app.post("/sendToTeams/analyzedEmail/{analyzedEmailId}")
async def send_analyzedEmail(analyzedEmailId: str):
email_data = await get_email_data(analyzedEmailId)
message_card = {
"@type": "MessageCard",
"@context": "http://schema.org/extensions",
"summary": "Email Sent to Quarantine",
"themeColor": "0076D7",
"sections": [
{
"activityTitle": "Email Sent to Quarantine",
"activitySubtitle": "Microsoft Defender for Office 365 detected a suspicious email and sent it to quarantine",
"activityImage": "https://www.hkmu.edu.hk/ito/wp-content/uploads/sites/10/2021/06/phishingicon1.jpg",
"facts": [
{"name": "Logged Timestamp", "value": email_data["loggedDateTime"]},
{"name": "Network Message ID", "value": email_data["networkMessageId"]},
{"name": "Email Subject", "value": email_data["subject"]},
{"name": "Recipient Address", "value": email_data["recipientEmailAddress"]},
{"name": "Sender Address", "value": email_data["senderDetail"]["fromAddress"]},
{"name": "Return Path", "value": email_data["returnPath"]},
{"name": "Policy", "value": email_data["policy"]},
{"name": "Latest Action", "value": email_data["latestDelivery"]["action"]},
{"name": "Policy", "value": email_data["policy"]},
{"name": "DMARC", "value": email_data["authenticationDetails"]["dmarc"]},
{"name": "DKIM", "value": email_data["authenticationDetails"]["dkim"]},
{"name": "SPF", "value": email_data["authenticationDetails"]["senderPolicyFramework"]},
{"name": "Sender IP Address", "value": email_data["senderDetail"]["ipv4"]},
{"name": "Message URLs", "value": "\n".join([url["url"] for url in email_data["urls"]])},
{"name": "Attachment File Hashes", "value": "\n".join([attachment["sha256"] for attachment in email_data["attachments"]])}
],
"markdown": True
}
],
"potentialAction": [
{
"@type": "OpenUri",
"name": "View Email",
"targets": [
{
"os": "default",
"uri": f"https://security.microsoft.com/emailentity?f=summary&startTime={email_data['loggedDateTime']}&endTime={email_data['loggedDateTime']}&id={email_data['networkMessageId']}&recipient={email_data['recipientEmailAddress']}&tid={TENANT_ID}"
}
]
},
{
"@type": "HttpPOST",
"name": "Release Email",
"target": f"{API_ENDPOINT}/analyzedEmail/remediate",
"body": "{\"networkMessageId\":\"" + email_data["networkMessageId"] + "\", \"recipientEmailAddress\":\"" + email_data["recipientEmailAddress"] + "\", \"action\":\"moveToInbox\"}"
},
{
"@type": "HttpPOST",
"name": "Delete Email",
"target": f"{API_ENDPOINT}/analyzedEmail/remediate",
"body": "{\"networkMessageId\":\"" + email_data["networkMessageId"] + "\", \"recipientEmailAddress\":\"" + email_data["recipientEmailAddress"] + "\", \"action\":\"hardDelete\"}"
},
{
"@type": "HttpPOST",
"name": "Move to Junk",
"target": f"{API_ENDPOINT}/analyzedEmail/remediate",
"body": "{\"networkMessageId\":\"" + email_data["networkMessageId"] + "\", \"recipientEmailAddress\":\"" + email_data["recipientEmailAddress"] + "\", \"action\":\"moveToJunk\"}"
}
]
}
async with httpx.AsyncClient() as client:
response = await client.post(WEBHOOK_URL, json=message_card)
if response.status_code != 200:
raise HTTPException(status_code=404, detail="Error sending message card to Teams")
return {"message": "Message card sent to Teams successfully"}
Here’s a sample of how the message card will look like in Teams:
Once we have sent the alert to Teams, we also need to accept incoming POST requests sent via the user actions in the message card. We will implement a release, hard delete and move to junk functionalities.
@app.post("/analyzedEmail/remediate")
async def email_action(networkMessageId: str, recipientEmailAddress: str, action: str):
access_token = await get_ms_token()
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
payload = {
"action": request.action,
"networkMessageId": request.networkMessageId,
"recipientEmailAddress": request.recipientEmailAddress
}
async with httpx.AsyncClient() as client:
response = await client.post(f"{MICROSOFT_GRAPH_API_URL}remediate", headers=headers, json=payload)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail="Error performing email action")
return {"message": f"Email action '{request.action}' performed successfully"}
This concludes our FastAPI instance. Next, we will implement a python script that will run as a service to continuously poll the Microsoft Graph API for new quarantined emails and use our API endpoint to send the alert to Teams.
Python Job
We will write our Python service script to check analyzed emails that are stuck in quarantine every 5 minutes by manipulating the startTime and endTime query parameters. In practice, I have noticed that the Microsoft Graph API has some latency before showing analyzed emails, so instead of querying from the past 5 minutes, the code queries from the previous 15th to 10th minutes. For every quarantined email, the script calls our FastAPI endpoint to send the relevant detection to Teams.
import httpx
import asyncio
async def main():
while True:
try:
now = time.time()
start_time = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now - 5*60*2))
end_time = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now - 5*60*3))
access_token = await get_ms_token()
headers = {
"Authorization": f"Bearer {access_token}"
}
async with httpx.AsyncClient() as client:
response = await client.get(f"https://graph.microsoft.com/v1.0/security/collaboration/analyzedEmails?startTime={start_time}&endTime={end_time}", headers=headers)
response.raise_for_status()
emails = response.json()["value"]
for email in emails:
if email["latestDelivery"]["location"] == "quarantine":
await send_to_teams(email["id"])
except Exception as e:
print(f"Error: {e}")
await asyncio.sleep(300)
if __name__ == "__main__":
asyncio.run(main())
Conclusion
By leveraging FastAPI and Microsoft Graph, we have created a powerful tool that enhances the efficiency of cybersecurity analysts. This solution not only automates the detection and alerting process but also provides actionable insights directly within Microsoft Teams. This integration reduces the need for analysts to switch between different platforms, allowing them to respond to threats more quickly and effectively. The project can be easily augmented to interact with a backend database and offer more functionalities such as assigning a status, an analyst, tags or comments to a detection facilitating team collaboration against possible attacks. As cyber threats continue to evolve, having such automated and integrated systems in place is crucial for maintaining robust security postures. This project demonstrates the potential of combining modern web frameworks with cloud-based APIs to build scalable and responsive security solutions. I hope this guide inspires you to explore further possibilities with FastAPI and Microsoft Graph in your own projects.
What do you think about this solution? Have you tried something similar? Connect with me on LinkedIn and let me know!
References
Microsoft Graph analyzedEmail resource type