import json
import boto3
import gzip
import base64
import urllib.request
import os
import re
# Initialize AWS client
ec2_client = boto3.client("ec2")
cf_client = boto3.client("cloudformation")
cloudtrail_client = boto3.client("cloudtrail")
codepipeline_client = boto3.client("codepipeline")
# Slack webhook URL (Set this as an environment variable in Lambda)
SLACK_WEBHOOK_URL = os.environ["SLACK_WEBHOOK_URL"]
def send_to_slack(message):
"""Send formatted message to Slack channel."""
slack_message = {"text": message}
encoded_msg = json.dumps(slack_message).encode("utf-8")
req = urllib.request.Request(
SLACK_WEBHOOK_URL,
data=encoded_msg,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req) as response:
return response.getcode()
def extract_user_info(user_arn):
"""Extract Role and User from ARN."""
parts = user_arn.split("/")
if len(parts) >= 2:
role = parts[-2]
user = parts[-1]
else:
role = "Unknown Role"
user = "Unknown User"
return role, user
def get_stack_info(security_group_id):
"""Retrieve StackName, determine if it's a root or nested stack, and fetch last updated time."""
try:
response = ec2_client.describe_tags(
Filters=[{"Name": "resource-id", "Values": [security_group_id]}]
)
stack_name = "N/A"
stack_id = None
root_stack_id = None
is_nested_stack = False
updated_time = "N/A"
pipeline_name = None
reviewer_arn = "N/A"
for tag in response.get("Tags", []):
if tag["Key"] == "StackName":
stack_name = tag["Value"]
elif tag["Key"] == "aws:cloudformation:stack-id":
stack_id = tag["Value"]
if stack_id:
try:
stack_response = cf_client.describe_stacks(StackName=stack_id)
stack_details = stack_response["Stacks"][0]
# If 'RootId' exists, it's a nested stack
if "RootId" in stack_details:
is_nested_stack = True
root_stack_id = stack_details["RootId"]
else:
root_stack_id = stack_id # If no RootId, the stack itself is root
# Get the last updated time
updated_time = stack_details.get("LastUpdatedTime", "N/A")
# Fetch tags from root stack
if root_stack_id:
pipeline_name = get_pipeline_name_from_root_stack(root_stack_id)
if pipeline_name:
reviewer_summary = get_last_approval_summary(pipeline_name)
except Exception as e:
print(f"Error describing CloudFormation stack {stack_id}: {e}")
return stack_name, stack_id, is_nested_stack, root_stack_id, updated_time, pipeline_name, reviewer_summary
except Exception as e:
print(f"Error fetching stack info for {security_group_id}: {e}")
return "N/A", None, False, None, "N/A", None, "N/A"
def get_pipeline_name_from_root_stack(root_stack_id):
"""Fetch PipelineName tag from the root stack."""
try:
response = cf_client.describe_stacks(StackName=root_stack_id)
stack_tags = response["Stacks"][0].get("Tags", [])
for tag in stack_tags:
if tag["Key"] == "PipelineName":
return tag["Value"]
except Exception as e:
print(f"Error fetching PipelineName from root stack {root_stack_id}: {e}")
return None
def get_last_approval_summary(pipeline_name):
"""Fetch the last approval summary from the pipeline's approval stage."""
try:
response = codepipeline_client.get_pipeline_state(name=pipeline_name)
for stage in response.get("stageStates", []):
if stage.get("stageName") == "Approval": # Adjust stage name if needed
for action_state in stage.get("actionStates", []):
if action_state.get("actionName") == "ManualApprove": # Adjust action name if needed
latest_execution = action_state.get("latestExecution", {})
summary = latest_execution.get("summary", "No summary available")
# Remove the 'Approved by' part and extract the email
match = re.search(r'Approved by arn:aws:sts::\d+:assumed-role/[^/]+/([^/]+)', summary)
if match:
# Return just the email without "Approved by"
return match.group(1) # Returns the email part
return summary
except Exception as e:
print(f"Error fetching approval summary for pipeline {pipeline_name}: {e}")
return "No summary available"
def parse_event(event_data):
"""Extract meaningful information from CloudTrail log."""
message = ""
event_name = event_data.get("eventName", "")
user_arn = event_data.get("userIdentity", {}).get("arn", "Unknown User")
role, user = extract_user_info(user_arn)
security_group_id = event_data.get("requestParameters", {}).get("groupId", "N/A")
# Fetch Stack Name, root stack info, updated time, pipeline, and reviewer summary
stack_info = ""
if security_group_id != "N/A":
if user == "AWSCloudFormation":
stack_name, stack_id, is_nested, root_stack_id, updated_time, pipeline_name, reviewer_summary = get_stack_info(security_group_id)
stack_info = f"\n Stack Name: {stack_name}"
if pipeline_name:
stack_info += f"\n Pipeline Name: {pipeline_name}"
if reviewer_summary:
stack_info += f"\n Pipeline Approved By: {reviewer_summary}"
if event_name == "CreateSecurityGroup":
sg_name = event_data["requestParameters"]["groupName"]
vpc_id = event_data["requestParameters"]["vpcId"]
message = (
f"
Security Group Created\n" f"SG Name: {sg_name}\n"
f"SG ID: {security_group_id}\n"
f"VPC: {vpc_id}\n"
f"Role: {role}\n"
f"User: {user}"
f"{stack_info}"
)
elif event_name in ["AuthorizeSecurityGroupIngress", "AuthorizeSecurityGroupEgress"]:
ip_permissions = event_data["requestParameters"]["ipPermissions"]["items"]
rules = []
for perm in ip_permissions:
protocol = perm.get("ipProtocol", "N/A")
from_port = perm.get("fromPort", "N/A")
to_port = perm.get("toPort", "N/A")
cidr_blocks_v4 = perm.get("ipRanges", {}).get("items", [])
cidr_blocks_v6 = perm.get("ipv6Ranges", {}).get("items", []) # Fix for IPv6
sg_references = perm.get("groups", {}).get("items", [])
alert_emoji = "" # Initialize to prevent UnboundLocalError
if sg_references:
referenced_sg = sg_references[0]["groupId"]
rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nReferenced SG: {referenced_sg}"
elif cidr_blocks_v4 or cidr_blocks_v6:
cidr_ip_v4 = cidr_blocks_v4[0].get("cidrIp") if cidr_blocks_v4 else None
cidr_ip_v6 = cidr_blocks_v6[0].get("cidrIpv6") if cidr_blocks_v6 else None
if cidr_ip_v4 == "0.0.0.0/0":
rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nCIDR: 
{cidr_ip_v4} 
" elif cidr_ip_v4:
rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nCIDR: {cidr_ip_v4}"
if cidr_ip_v6 == "::/0":
rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nCIDR: 
{cidr_ip_v6} 
" elif cidr_ip_v6:
rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nCIDR: {cidr_ip_v6}"
else:
rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nNo CIDR or SG Reference"
rules.append(rule_info)
action = "Ingress" if event_name == "AuthorizeSecurityGroupIngress" else "Egress"
message = (
f"
Security Group Rule Added\n" f"SG ID: {security_group_id}\n"
f"Action: {action}\n"
f"{chr(10).join(rules)}\n"
f"Role: {role}\n"
f"User: {user}"
f"{stack_info}"
)
elif event_name in ["RevokeSecurityGroupIngress", "RevokeSecurityGroupEgress"]:
ip_permissions = event_data["requestParameters"].get("ipPermissions", {}).get("items", [])
rules = []
for perm in ip_permissions:
protocol = perm.get("ipProtocol", "N/A")
from_port = perm.get("fromPort", "N/A")
to_port = perm.get("toPort", "N/A")
cidr_blocks_v4 = perm.get("ipRanges", {}).get("items", [])
cidr_blocks_v6 = perm.get("ipv6Ranges", {}).get("items", []) # Fix for IPv6
sg_references = perm.get("groups", {}).get("items", [])
alert_emoji = "" # Initialize to prevent UnboundLocalError
if sg_references:
referenced_sg = sg_references[0]["groupId"]
rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nReferenced SG: {referenced_sg}"
elif cidr_blocks_v4 or cidr_blocks_v6:
cidr_ip_v4 = cidr_blocks_v4[0].get("cidrIp") if cidr_blocks_v4 else None
cidr_ip_v6 = cidr_blocks_v6[0].get("cidrIpv6") if cidr_blocks_v6 else None
if cidr_ip_v4 == "0.0.0.0/0":
rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nCIDR: 
{cidr_ip_v4} 
" elif cidr_ip_v4:
rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nCIDR: {cidr_ip_v4}"
if cidr_ip_v6 == "::/0":
rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nCIDR: 
{cidr_ip_v6} 
" elif cidr_ip_v6:
rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nCIDR: {cidr_ip_v6}"
else:
rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nNo CIDR or SG Reference"
rules.append(rule_info)
action = "Ingress" if event_name == "RevokeSecurityGroupIngress" else "Egress"
message = (
f"
Security Group Rule Removed\n" f"SG ID: {security_group_id}\n"
f"Action: {action}\n"
f"{chr(10).join(rules)}\n"
f"Role: {role}\n"
f"User: {user}"
f"{stack_info}"
)
return message
def lambda_handler(event, context):
print("Received event:", json.dumps(event, indent=2))
# Check if it's an EventBridge event
if "detail" in event:
event_data = event["detail"]
print("Processing EventBridge event:", event_data)
message = parse_event(event_data)
if message:
response_code = send_to_slack(message)
print(f"Sent to Slack, Response: {response_code}")
else:
print("No relevant security group event detected.")
return {"statusCode": 200, "body": "Processed EventBridge event"}
# Handle unknown event structure
print("Unknown event format:", event)
return {"statusCode": 400, "body": "Unknown event format"}
Comments