# Testing BlueRock Container Drift Alerts

One of the pre-configured policies included in the BlueRock Free Tier enables Container Drift Protection.

This mechanism is used to see, alert and block attacks which attempt to deposit and execute binaries or scripts that were not present in the original container image.

### **Testing Container Drift Protection**

The steps below are used to trigger a container drift violation, generating alerts that will delivered to your CloudWatch service.

Step 1: Create a new 'container\_drift' tool for the Weather MCP server & client.

Step 2: Run the updated Weather MCP server and client.

Step 3: Execute the new container\_drift tool.

Step 4: View container\_drift\_violation alerts in CloudWatch

####

#### **Step 1: Create a new 'container\_drift' tool for the Weather MCP server**

#### &#x20;**Login to your BlueRock Free Tier EC2 instance.**

```sh
ssh -i <ssh-key-file> ec2-user@<ip-address>
```

#### **Create new server with 'container\_drift' tool**

Create a file named: `/opt/bluerock/mcp/weatherMCP/server-weatherMCP.container_drift.py` with the source code below.

<pre class="language-python"><code class="lang-python"><strong>from typing import Any
</strong>import httpx
from mcp.server.fastmcp import FastMCP
import subprocess
import sys

# Initialize FastMCP server
mcp = FastMCP("weather")

# Constants
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"

async def make_nws_request(url: str) -> dict[str, Any] | None:
    """Make a request to the NWS API with proper error handling."""
    headers = {
        "User-Agent": USER_AGENT,
        "Accept": "application/geo+json"
    }
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(url, headers=headers, timeout=30.0)
            response.raise_for_status()
            return response.json()
        except Exception:
            return None

def format_alert(feature: dict) -> str:
    """Format an alert feature into a readable string."""
    props = feature["properties"]
    return f"""
Event: {props.get('event', 'Unknown')}
Area: {props.get('areaDesc', 'Unknown')}
Severity: {props.get('severity', 'Unknown')}
Description: {props.get('description', 'No description available')}
Instructions: {props.get('instruction', 'No specific instructions provided')}
"""

@mcp.tool()
async def get_alerts(state: str) -> str:
    """Get weather alerts for a US state.

    Args:
        state: Two-letter US state code (e.g. CA, NY)
    """
    url = f"{NWS_API_BASE}/alerts/active/area/{state}"
    data = await make_nws_request(url)

    if not data or "features" not in data:
        return "Unable to fetch alerts or no alerts found."

    if not data["features"]:
        return "No active alerts for this state."

    alerts = [format_alert(feature) for feature in data["features"]]
    return "\n---\n".join(alerts)

@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
    """Get weather forecast for a location.

    Args:
        latitude: Latitude of the location
        longitude: Longitude of the location
    """
    # First get the forecast grid endpoint
    points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
    points_data = await make_nws_request(points_url)

    if not points_data:
        return "Unable to fetch forecast data for this location."

    # Get the forecast URL from the points response
    forecast_url = points_data["properties"]["forecast"]
    forecast_data = await make_nws_request(forecast_url)

    if not forecast_data:
        return "Unable to fetch detailed forecast."

    # Format the periods into a readable forecast
    periods = forecast_data["properties"]["periods"]
    forecasts = []
    for period in periods[:5]:  # Only show next 5 periods
        forecast = f"""
{period['name']}:
Temperature: {period['temperature']}°{period['temperatureUnit']}
Wind: {period['windSpeed']} {period['windDirection']}
Forecast: {period['detailedForecast']}
"""
        forecasts.append(forecast)

    return "\n---\n".join(forecasts)

@mcp.tool()
async def container_drift() -> str:
    """
    Runs a series of Docker command execution tests.
    """
    returnstr = ""
    tests = [
        {
            "name": "TEST 1: Execute a file not present in a container image...",
            "command": "docker run --rm fedora bash -c 'cp /bin/sleep /tmp/sleep &#x26;&#x26; /tmp/sleep 1'",
        },
        {
            "name": "TEST 2: Execute a script via command line interpreter option...",
            "command": "docker run --rm fedora bash -c 'bash -c \"echo hello world\"'",
        },
        {
            "name": "TEST 3: Execute a script not present in a container image...",
            "command": "docker run --rm fedora bash -c 'printf \"#!/bin/bash\n\necho Hello world\" > /tmp/helloworld.sh &#x26;&#x26; chmod 755 /tmp/helloworld.sh &#x26;&#x26; /tmp/helloworld.sh'",
        },
    ]

    for test in tests:
        returnstr += f"""\n\n{test['name']}"""
        returnstr += f"\n({test['command'].replace('\n', '\\n')})"

        result = subprocess.run(test['command'], shell=True, capture_output=True, text=True)

        if result.stdout:
            returnstr += f"""\n{result.stdout}"""
        if result.stderr:
            returnstr += f"""\n{result.stderr}"""

        if result.returncode == 0:
            returnstr += f"""\nCommand execution was permitted."""
        else:
            returnstr += f"""\nCommand execution was blocked with exit code: {result.returncode}"""

    returnstr += f"""\n"""
    return returnstr

def main():
    # Initialize and run the server
    mcp.run(transport='stdio')

if __name__ == "__main__":
    main()
</code></pre>

#### **Create a Weather MCP client with the following source code.**

Create a file named: /`opt/bluerock/mcp/weatherMCP/client-weatherMCP.container_drift.py` with the source code below.

```python
#!/usr/bin/env python3
"""
MCP Weather CLI Client - Direct interaction with MCP weather server without LLM
"""
import asyncio
import json
import sys
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client


class WeatherCLI:
    def __init__(self):
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()

    async def connect_to_server(self, server_script_path: str):
        """Connect to the MCP weather server"""
        server_params = StdioServerParameters(
            command="python",
            args=[server_script_path],
            env=None
        )

        stdio_transport = await self.exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(
            ClientSession(self.stdio, self.write)
        )

        await self.session.initialize()

        # List available tools
        response = await self.session.list_tools()
        print("Connected to MCP Weather Server")
        print(f"Available tools: {len(response.tools)}")
        for tool in response.tools:
            print(f"  - {tool.name}: {tool.description}")
        print()

    async def get_forecast(self, latitude: float, longitude: float):
        """Get weather forecast for a location"""
        result = await self.session.call_tool(
            "get_forecast",
            arguments={"latitude": latitude, "longitude": longitude}
        )

        if result.content:
            for content in result.content:
                if hasattr(content, 'text'):
                    text = content.text.strip()
                    if not text:
                        print("No forecast data returned")
                        return
                    try:
                        data = json.loads(text)
                        self._display_forecast(data)
                    except json.JSONDecodeError:
                        # Server returned formatted text instead of JSON
                        print("=" * 60)
                        print("WEATHER FORECAST")
                        print("=" * 60)
                        print(text)

    async def get_alerts(self, state: str):
        """Get weather alerts for a US state"""
        result = await self.session.call_tool(
            "get_alerts",
            arguments={"state": state}
        )

        if result.content:
            for content in result.content:
                if hasattr(content, 'text'):
                    text = content.text.strip()
                    if not text:
                        print(f"No alerts data returned for {state}")
                        return
                    try:
                        data = json.loads(text)
                        self._display_alerts(data, state)
                    except json.JSONDecodeError:
                        # Server returned formatted text instead of JSON
                        print("=" * 60)
                        print("WEATHER ALERTS")
                        print("=" * 60)
                        print(text)

    async def container_drift(self):
        """Execute a container drift test"""
        result = await self.session.call_tool(
            "container_drift"
        )
        if result.content and hasattr(result.content[0], 'text'):
            print(result.content[0].text.strip())

    def _display_forecast(self, data: dict):
        """Display formatted forecast data"""
        print("=" * 60)
        print("WEATHER FORECAST")
        print("=" * 60)

        if 'properties' in data and 'periods' in data['properties']:
            periods = data['properties']['periods']
            for period in periods[:5]:  # Show next 5 periods
                print(f"\n{period['name']}:")
                print(f"  Temperature: {period['temperature']}°{period['temperatureUnit']}")
                print(f"  Wind: {period['windSpeed']} {period['windDirection']}")
                print(f"  Forecast: {period['shortForecast']}")
                if period.get('detailedForecast'):
                    print(f"  Details: {period['detailedForecast']}")
        else:
            print(json.dumps(data, indent=2))

    def _display_alerts(self, data: dict, state: str):
        """Display formatted weather alerts"""
        print("=" * 60)
        print(f"WEATHER ALERTS FOR {state.upper()}")
        print("=" * 60)

        if isinstance(data, dict) and 'features' in data:
            alerts = data['features']
            if not alerts:
                print("\nNo active weather alerts.")
            else:
                for i, alert in enumerate(alerts, 1):
                    props = alert.get('properties', {})
                    print(f"\nAlert #{i}:")
                    print(f"  Event: {props.get('event', 'N/A')}")
                    print(f"  Severity: {props.get('severity', 'N/A')}")
                    print(f"  Area: {props.get('areaDesc', 'N/A')}")
                    print(f"  Headline: {props.get('headline', 'N/A')}")
        else:
            print(json.dumps(data, indent=2))


    async def list_tools(self):
        """List all available tools from the server"""
        response = await self.session.list_tools()
        print("\n" + "=" * 60)
        print("AVAILABLE TOOLS")
        print("=" * 60)

        if not response.tools:
            print("No tools available")
            return

        for tool in response.tools:
            print(f"\nTool: {tool.name}")
            print(f"Description: {tool.description}")
            if hasattr(tool, 'inputSchema') and tool.inputSchema:
                print(f"Input Schema: {json.dumps(tool.inputSchema, indent=2)}")
        print()

    async def interactive_mode(self):
        """Run interactive CLI mode"""
        print("\n" + "=" * 60)
        print("MCP WEATHER CLIENT - INTERACTIVE MODE")
        print("=" * 60)
        print("\nCommands:")
        print("  forecast <latitude> <longitude> - Get weather forecast")
        print("  alerts <state_code>             - Get weather alerts (e.g., CA, NY)")
        print("  container_drift                 - Execute container drift action")
        print("  tools                           - List all available tools")
        print("  help                            - Show this help")
        print("  quit                            - Exit")
        print()

        while True:
            try:
                user_input = input("weather> ").strip()

                if not user_input:
                    continue

                parts = user_input.split()
                command = parts[0].lower()

                if command == "quit" or command == "exit":
                    break
                elif command == "help":
                    print("\nCommands:")
                    print("  forecast <latitude> <longitude> - Get weather forecast")
                    print("  alerts <state_code>             - Get weather alerts")
                    print("  container_drift                 - Execute container drift action")
                    print("  tools                           - List all available tools")
                    print("  quit                            - Exit")
                elif command == "tools":
                    await self.list_tools()
                elif command == "forecast":
                    if len(parts) != 3:
                        print("Usage: forecast <latitude> <longitude>")
                        continue
                    try:
                        lat = float(parts[1])
                        lon = float(parts[2])
                        await self.get_forecast(lat, lon)
                    except ValueError:
                        print("Error: Latitude and longitude must be numbers")
                elif command == "alerts":
                    if len(parts) != 2:
                        print("Usage: alerts <state_code>")
                        continue
                    state = parts[1].upper()
                    if len(state) != 2:
                        print("Error: State code must be 2 letters (e.g., CA, NY)")
                        continue
                    await self.get_alerts(state)
                elif command == "container_drift":
                    await self.container_drift()
                else:
                    print(f"Unknown command: {command}. Type 'help' for available commands.")

            except KeyboardInterrupt:
                print("\nExiting...")
                break
            except Exception as e:
                print(f"Error: {e}")

    async def cleanup(self):
        """Cleanup resources"""
        await self.exit_stack.aclose()


async def main():
    if len(sys.argv) < 2:
        print("Usage: python mcp_weather_cli.py <path_to_weather_server.py>")
        print("\nExample: python mcp_weather_cli.py weather.py")
        sys.exit(1)

    server_path = sys.argv[1]

    cli = WeatherCLI()
    try:
        await cli.connect_to_server(server_path)
        await cli.interactive_mode()
    finally:
        await cli.cleanup()


if __name__ == "__main__":
    asyncio.run(main())
```

### **Step 3: Execute the container\_drift tool**

#### **Run the new server**

In one terminal window, log into your BlueRock Free Tier EC2 instance and start the new server with the container\_drift tool.

```sh
ssh -i <ssh-key-file> ec2-user@<ip address>
cd /opt/bluerock/mcp/weatherMCP/
source .venv/bin/activate
uv run ./server-weatherMCP.container_drift.py
```

#### **Run the new client**

In a separate terminal window, log into your BlueRock Free Tier EC2 instance and start the new client with the container\_drift tool.

```sh
ssh -i <path to ssh key file ec2-user@<ip address>
cd /opt/bluerock/mcp/weatherMCP/
source .venv/bin/activate
uv run client-weatherMCP.container_drift.py server-weatherMCP.container_drift.py
```

The client will initialize, show the available tools and start its interactive CLI.

```
[10/31/25 16:47:27] INFO     Processing request of type ListToolsRequest                                          server.py:674
Connected to MCP Weather Server
Available tools: 3
  - get_alerts: Get weather alerts for a US state.

    Args:
        state: Two-letter US state code (e.g. CA, NY)

  - get_forecast: Get weather forecast for a location.

    Args:
        latitude: Latitude of the location
        longitude: Longitude of the location

  - container_drift:
    Runs a series of Docker command execution tests.



============================================================
MCP WEATHER CLIENT - INTERACTIVE MODE
============================================================

Commands:
  forecast <latitude> <longitude> - Get weather forecast
  alerts <state_code>             - Get weather alerts (e.g., CA, NY)
  container_drift                 - Execute container drift action
  tools                           - List all available tools
  help                            - Show this help
  quit                            - Exit

weather>
```

Run the container\_drift tool.

```
weather> container_drift
```

This tool will perform three tests, each of which will trigger a container drift violation alert.

* TEST 1: Execute a file that was not present in the container image.
* TEST 2: Execute a script via a command line interpreter option.
* TEST 3: Execute a  script not present in the container image.

```
weather> container_drift
[10/31/25 16:47:40] INFO     Processing request of type CallToolRequest                                           server.py:674
TEST 1: Execute a file not present in a container image...
(docker run --rm fedora bash -c 'cp /bin/sleep /tmp/sleep && /tmp/sleep 1')
Command execution complete (rc: 0)

TEST 2: Execute a script via command line interpreter option...
(docker run --rm fedora bash -c 'bash -c "echo hello world"')
hello world

Command execution complete (rc: 0)

TEST 3: Execute a script not present in a container image...
(docker run --rm fedora bash -c 'printf "#!/bin/bash\n\necho Hello world" > /tmp/helloworld.sh && chmod 755 /tmp/helloworld.sh && /tmp/helloworld.sh')
Hello world

Command execution complete (rc: 0)
weather>
```

#### **Step 4: View container drift violation logs**

Log into your AWS console and open up CloudWatch:

* \<YOUR\_AWS\_REGION>
* \<YOUR\_LOG\_GROUP>
* \<YOUR\_LOG\_STREAM>

In the filter, type 'container\_drift\_violation'.
