-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
177 lines (149 loc) · 6.22 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
"""Main server implementation for Helmfile MCP Server.
This module defines the FastMCP server instance and tool functions for Helmfile operations,
providing a standardized interface for helmfile command execution and documentation.
"""
import asyncio
import logging
from typing import Optional
from fastmcp import Context, FastMCP
from pydantic import Field
# Configure logging
logger = logging.getLogger(__name__)
# Create the FastMCP server
mcp = FastMCP(
name="Helmfile MCP Server",
instructions="A server for executing Helmfile commands through MCP",
version="0.1.0",
)
@mcp.tool(
description="Execute Helmfile commands with support for Unix pipes.",
)
async def execute_helmfile(
command: str = Field(description="Complete Helmfile command to execute (including any pipes and flags)"),
timeout: Optional[int] = Field(description="Maximum execution time in seconds (default: 300)", default=None),
ctx: Optional[Context] = None,
) -> dict:
"""Execute Helmfile commands with support for Unix pipes.
Executes Helmfile commands with proper validation, error handling, and resource limits.
Supports piping output to standard Unix utilities for filtering and transformation.
Security considerations:
- Commands are validated against security policies
- Dangerous operations like apply/destroy require confirmation
- Environment-specific commands are validated against allowed environments
Examples:
helmfile list
helmfile status
helmfile diff
helmfile apply --environment prod
Args:
command: Complete Helmfile command to execute (can include Unix pipes)
timeout: Optional timeout in seconds
ctx: Optional MCP context for request tracking
Returns:
Dictionary containing output and status with structured error information
"""
# Convert timeout to integer if it's a FieldInfo object
actual_timeout = timeout.default if hasattr(timeout, 'default') else timeout
actual_timeout = actual_timeout or 300
logger.info(f"Executing Helmfile command: {command}" + (f" with timeout: {actual_timeout}" if actual_timeout else ""))
# Add helmfile prefix if not present
if not command.strip().startswith("helmfile"):
command = f"helmfile {command}"
if ctx:
is_pipe = "|" in command
message = "Executing" + (" piped" if is_pipe else "") + " Helmfile command"
await ctx.info(message + (f" with timeout: {actual_timeout}s" if actual_timeout else ""))
try:
# Execute the command with timeout
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=actual_timeout)
except asyncio.TimeoutError:
# Properly handle async process termination
try:
await process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=5)
except asyncio.TimeoutError:
# Only kill if process is still running
if process.returncode is None:
await process.kill()
except Exception as e:
logger.error(f"Error terminating process: {str(e)}")
return {
"status": "error",
"error": {
"code": "TERMINATION_ERROR",
"message": f"Failed to terminate process: {str(e)}",
}
}
return {
"status": "error",
"error": {
"code": "TIMEOUT",
"message": f"Command timed out after {actual_timeout} seconds",
}
}
# Check if command was successful
if process.returncode == 0:
return {
"status": "success",
"output": stdout.decode().strip(),
}
else:
return {
"status": "error",
"error": {
"code": "COMMAND_ERROR",
"message": stderr.decode().strip(),
}
}
except Exception as e:
logger.error(f"Error executing Helmfile command: {str(e)}")
return {
"status": "error",
"error": {
"code": "INTERNAL_ERROR",
"message": str(e),
}
}
@mcp.tool(
description="Synchronize Helmfile releases.",
)
async def sync_helmfile(
helmfile_path: str = Field(description="Path to the Helmfile configuration file"),
namespace: Optional[str] = Field(description="Namespace to target", default=None),
timeout: Optional[int] = Field(description="Maximum execution time in seconds (default: 300)", default=None),
ctx: Optional[Context] = None,
) -> dict:
"""Synchronize Helmfile releases.
Executes 'helmfile sync' command to synchronize the cluster state as described
in the Helmfile configuration.
Args:
helmfile_path: Path to the Helmfile configuration file
namespace: Optional namespace to target
timeout: Optional timeout in seconds
ctx: Optional MCP context for request tracking
Returns:
Dictionary containing output and status with structured error information
"""
# Get actual values from FieldInfo objects if needed
actual_namespace = namespace.default if hasattr(namespace, 'default') else namespace
actual_timeout = timeout.default if hasattr(timeout, 'default') else timeout
# Build the command with proper flag handling
command_parts = ["helmfile", "sync", "-f", helmfile_path]
# Only add namespace flag if namespace is provided and not empty
if actual_namespace and str(actual_namespace).strip():
command_parts.extend(["-n", str(actual_namespace).strip()])
# Join the command parts with spaces
command = " ".join(command_parts)
return await execute_helmfile(command, actual_timeout, ctx)
def main():
"""Run the Helmfile MCP Server."""
mcp.run(transport="stdio")
if __name__ == "__main__":
main()