One of the pre-configured policies included in the BlueRock Free Tier enables Container Drift Protection.
This mechanism is used to see, alert and block attacks which attempt to deposit and execute binaries or scripts that were not present in the original container image.
Testing Container Drift Protection
The steps below are used to trigger a container drift violation, generating alerts that will delivered to your CloudWatch service.
Step 1: Create a new 'container_drift' tool for the Weather MCP server & client.
Step 2: Run the updated Weather MCP server and client.
Step 3: Execute the new container_drift tool.
Step 4: View container_drift_violation alerts in CloudWatch
Step 1: Create a new 'container_drift' tool for the Weather MCP server
Login to your BlueRock Free Tier EC2 instance.
ssh-i<ssh-key-file>ec2-user@<ip-address>
Create new server with 'container_drift' tool
Create a file named: /opt/bluerock/mcp/weatherMCP/server-weatherMCP.container_drift.py with the source code below.
Create a Weather MCP client with the following source code.
Create a file named: /opt/bluerock/mcp/weatherMCP/client-weatherMCP.container_drift.py with the source code below.
Step 3: Execute the container_drift tool
Run the new server
In one terminal window, log into your BlueRock Free Tier EC2 instance and start the new server with the container_drift tool.
Run the new client
In a separate terminal window, log into your BlueRock Free Tier EC2 instance and start the new client with the container_drift tool.
The client will initialize, show the available tools and start its interactive CLI.
Run the container_drift tool.
This tool will perform three tests, each of which will trigger a container drift violation alert.
TEST 1: Execute a file that was not present in the container image.
TEST 2: Execute a script via a command line interpreter option.
TEST 3: Execute a script not present in the container image.
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP
import subprocess
import sys
# Initialize FastMCP server
mcp = FastMCP("weather")
# Constants
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
async def make_nws_request(url: str) -> dict[str, Any] | None:
"""Make a request to the NWS API with proper error handling."""
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/geo+json"
}
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, headers=headers, timeout=30.0)
response.raise_for_status()
return response.json()
except Exception:
return None
def format_alert(feature: dict) -> str:
"""Format an alert feature into a readable string."""
props = feature["properties"]
return f"""
Event: {props.get('event', 'Unknown')}
Area: {props.get('areaDesc', 'Unknown')}
Severity: {props.get('severity', 'Unknown')}
Description: {props.get('description', 'No description available')}
Instructions: {props.get('instruction', 'No specific instructions provided')}
"""
@mcp.tool()
async def get_alerts(state: str) -> str:
"""Get weather alerts for a US state.
Args:
state: Two-letter US state code (e.g. CA, NY)
"""
url = f"{NWS_API_BASE}/alerts/active/area/{state}"
data = await make_nws_request(url)
if not data or "features" not in data:
return "Unable to fetch alerts or no alerts found."
if not data["features"]:
return "No active alerts for this state."
alerts = [format_alert(feature) for feature in data["features"]]
return "\n---\n".join(alerts)
@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
"""Get weather forecast for a location.
Args:
latitude: Latitude of the location
longitude: Longitude of the location
"""
# First get the forecast grid endpoint
points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
points_data = await make_nws_request(points_url)
if not points_data:
return "Unable to fetch forecast data for this location."
# Get the forecast URL from the points response
forecast_url = points_data["properties"]["forecast"]
forecast_data = await make_nws_request(forecast_url)
if not forecast_data:
return "Unable to fetch detailed forecast."
# Format the periods into a readable forecast
periods = forecast_data["properties"]["periods"]
forecasts = []
for period in periods[:5]: # Only show next 5 periods
forecast = f"""
{period['name']}:
Temperature: {period['temperature']}°{period['temperatureUnit']}
Wind: {period['windSpeed']} {period['windDirection']}
Forecast: {period['detailedForecast']}
"""
forecasts.append(forecast)
return "\n---\n".join(forecasts)
@mcp.tool()
async def container_drift() -> str:
"""
Runs a series of Docker command execution tests.
"""
returnstr = ""
tests = [
{
"name": "TEST 1: Execute a file not present in a container image...",
"command": "docker run --rm fedora bash -c 'cp /bin/sleep /tmp/sleep && /tmp/sleep 1'",
},
{
"name": "TEST 2: Execute a script via command line interpreter option...",
"command": "docker run --rm fedora bash -c 'bash -c \"echo hello world\"'",
},
{
"name": "TEST 3: Execute a script not present in a container image...",
"command": "docker run --rm fedora bash -c 'printf \"#!/bin/bash\n\necho Hello world\" > /tmp/helloworld.sh && chmod 755 /tmp/helloworld.sh && /tmp/helloworld.sh'",
},
]
for test in tests:
returnstr += f"""\n\n{test['name']}"""
returnstr += f"\n({test['command'].replace('\n', '\\n')})"
result = subprocess.run(test['command'], shell=True, capture_output=True, text=True)
if result.stdout:
returnstr += f"""\n{result.stdout}"""
if result.stderr:
returnstr += f"""\n{result.stderr}"""
if result.returncode == 0:
returnstr += f"""\nCommand execution was permitted."""
else:
returnstr += f"""\nCommand execution was blocked with exit code: {result.returncode}"""
returnstr += f"""\n"""
return returnstr
def main():
# Initialize and run the server
mcp.run(transport='stdio')
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
MCP Weather CLI Client - Direct interaction with MCP weather server without LLM
"""
import asyncio
import json
import sys
from typing import Optional
from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
class WeatherCLI:
def __init__(self):
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
async def connect_to_server(self, server_script_path: str):
"""Connect to the MCP weather server"""
server_params = StdioServerParameters(
command="python",
args=[server_script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
print("Connected to MCP Weather Server")
print(f"Available tools: {len(response.tools)}")
for tool in response.tools:
print(f" - {tool.name}: {tool.description}")
print()
async def get_forecast(self, latitude: float, longitude: float):
"""Get weather forecast for a location"""
result = await self.session.call_tool(
"get_forecast",
arguments={"latitude": latitude, "longitude": longitude}
)
if result.content:
for content in result.content:
if hasattr(content, 'text'):
text = content.text.strip()
if not text:
print("No forecast data returned")
return
try:
data = json.loads(text)
self._display_forecast(data)
except json.JSONDecodeError:
# Server returned formatted text instead of JSON
print("=" * 60)
print("WEATHER FORECAST")
print("=" * 60)
print(text)
async def get_alerts(self, state: str):
"""Get weather alerts for a US state"""
result = await self.session.call_tool(
"get_alerts",
arguments={"state": state}
)
if result.content:
for content in result.content:
if hasattr(content, 'text'):
text = content.text.strip()
if not text:
print(f"No alerts data returned for {state}")
return
try:
data = json.loads(text)
self._display_alerts(data, state)
except json.JSONDecodeError:
# Server returned formatted text instead of JSON
print("=" * 60)
print("WEATHER ALERTS")
print("=" * 60)
print(text)
async def container_drift(self):
"""Execute a container drift test"""
result = await self.session.call_tool(
"container_drift"
)
if result.content and hasattr(result.content[0], 'text'):
print(result.content[0].text.strip())
def _display_forecast(self, data: dict):
"""Display formatted forecast data"""
print("=" * 60)
print("WEATHER FORECAST")
print("=" * 60)
if 'properties' in data and 'periods' in data['properties']:
periods = data['properties']['periods']
for period in periods[:5]: # Show next 5 periods
print(f"\n{period['name']}:")
print(f" Temperature: {period['temperature']}°{period['temperatureUnit']}")
print(f" Wind: {period['windSpeed']} {period['windDirection']}")
print(f" Forecast: {period['shortForecast']}")
if period.get('detailedForecast'):
print(f" Details: {period['detailedForecast']}")
else:
print(json.dumps(data, indent=2))
def _display_alerts(self, data: dict, state: str):
"""Display formatted weather alerts"""
print("=" * 60)
print(f"WEATHER ALERTS FOR {state.upper()}")
print("=" * 60)
if isinstance(data, dict) and 'features' in data:
alerts = data['features']
if not alerts:
print("\nNo active weather alerts.")
else:
for i, alert in enumerate(alerts, 1):
props = alert.get('properties', {})
print(f"\nAlert #{i}:")
print(f" Event: {props.get('event', 'N/A')}")
print(f" Severity: {props.get('severity', 'N/A')}")
print(f" Area: {props.get('areaDesc', 'N/A')}")
print(f" Headline: {props.get('headline', 'N/A')}")
else:
print(json.dumps(data, indent=2))
async def list_tools(self):
"""List all available tools from the server"""
response = await self.session.list_tools()
print("\n" + "=" * 60)
print("AVAILABLE TOOLS")
print("=" * 60)
if not response.tools:
print("No tools available")
return
for tool in response.tools:
print(f"\nTool: {tool.name}")
print(f"Description: {tool.description}")
if hasattr(tool, 'inputSchema') and tool.inputSchema:
print(f"Input Schema: {json.dumps(tool.inputSchema, indent=2)}")
print()
async def interactive_mode(self):
"""Run interactive CLI mode"""
print("\n" + "=" * 60)
print("MCP WEATHER CLIENT - INTERACTIVE MODE")
print("=" * 60)
print("\nCommands:")
print(" forecast <latitude> <longitude> - Get weather forecast")
print(" alerts <state_code> - Get weather alerts (e.g., CA, NY)")
print(" container_drift - Execute container drift action")
print(" tools - List all available tools")
print(" help - Show this help")
print(" quit - Exit")
print()
while True:
try:
user_input = input("weather> ").strip()
if not user_input:
continue
parts = user_input.split()
command = parts[0].lower()
if command == "quit" or command == "exit":
break
elif command == "help":
print("\nCommands:")
print(" forecast <latitude> <longitude> - Get weather forecast")
print(" alerts <state_code> - Get weather alerts")
print(" container_drift - Execute container drift action")
print(" tools - List all available tools")
print(" quit - Exit")
elif command == "tools":
await self.list_tools()
elif command == "forecast":
if len(parts) != 3:
print("Usage: forecast <latitude> <longitude>")
continue
try:
lat = float(parts[1])
lon = float(parts[2])
await self.get_forecast(lat, lon)
except ValueError:
print("Error: Latitude and longitude must be numbers")
elif command == "alerts":
if len(parts) != 2:
print("Usage: alerts <state_code>")
continue
state = parts[1].upper()
if len(state) != 2:
print("Error: State code must be 2 letters (e.g., CA, NY)")
continue
await self.get_alerts(state)
elif command == "container_drift":
await self.container_drift()
else:
print(f"Unknown command: {command}. Type 'help' for available commands.")
except KeyboardInterrupt:
print("\nExiting...")
break
except Exception as e:
print(f"Error: {e}")
async def cleanup(self):
"""Cleanup resources"""
await self.exit_stack.aclose()
async def main():
if len(sys.argv) < 2:
print("Usage: python mcp_weather_cli.py <path_to_weather_server.py>")
print("\nExample: python mcp_weather_cli.py weather.py")
sys.exit(1)
server_path = sys.argv[1]
cli = WeatherCLI()
try:
await cli.connect_to_server(server_path)
await cli.interactive_mode()
finally:
await cli.cleanup()
if __name__ == "__main__":
asyncio.run(main())
ssh -i <ssh-key-file> ec2-user@<ip address>
cd /opt/bluerock/mcp/weatherMCP/
source .venv/bin/activate
uv run ./server-weatherMCP.container_drift.py
ssh -i <path to ssh key file ec2-user@<ip address>
cd /opt/bluerock/mcp/weatherMCP/
source .venv/bin/activate
uv run client-weatherMCP.container_drift.py server-weatherMCP.container_drift.py
[10/31/25 16:47:27] INFO Processing request of type ListToolsRequest server.py:674
Connected to MCP Weather Server
Available tools: 3
- get_alerts: Get weather alerts for a US state.
Args:
state: Two-letter US state code (e.g. CA, NY)
- get_forecast: Get weather forecast for a location.
Args:
latitude: Latitude of the location
longitude: Longitude of the location
- container_drift:
Runs a series of Docker command execution tests.
============================================================
MCP WEATHER CLIENT - INTERACTIVE MODE
============================================================
Commands:
forecast <latitude> <longitude> - Get weather forecast
alerts <state_code> - Get weather alerts (e.g., CA, NY)
container_drift - Execute container drift action
tools - List all available tools
help - Show this help
quit - Exit
weather>
weather> container_drift
weather> container_drift
[10/31/25 16:47:40] INFO Processing request of type CallToolRequest server.py:674
TEST 1: Execute a file not present in a container image...
(docker run --rm fedora bash -c 'cp /bin/sleep /tmp/sleep && /tmp/sleep 1')
Command execution complete (rc: 0)
TEST 2: Execute a script via command line interpreter option...
(docker run --rm fedora bash -c 'bash -c "echo hello world"')
hello world
Command execution complete (rc: 0)
TEST 3: Execute a script not present in a container image...
(docker run --rm fedora bash -c 'printf "#!/bin/bash\n\necho Hello world" > /tmp/helloworld.sh && chmod 755 /tmp/helloworld.sh && /tmp/helloworld.sh')
Hello world
Command execution complete (rc: 0)
weather>