Skip to content

pydantic_ai.environments

Execution environment abstractions for agents.

This package provides:

  • ExecutionEnvironment — abstract base class for execution environments
  • ExecutionProcess — interactive process handle with bidirectional I/O
  • ExecutionEnvironmentToolset — toolset exposing coding-agent-style tools backed by an environment
  • ExecutionResult — result type

Implementations:

  • environments.docker.DockerEnvironment — Docker container-based sandbox (isolated)
  • environments.local.LocalEnvironment — local subprocess environment (no isolation, for dev/testing)
  • environments.memory.MemoryEnvironment — in-memory environment for testing

EnvToolName module-attribute

EnvToolName = Literal[
    "shell", "read_file", "write_file", "edit_file"
]

Tool name for an environment capability.

Used in capabilities to declare which methods an environment implements, and by ExecutionEnvironmentToolset for include/exclude filtering.

ExecutionEnvironment

Bases: ABC

Abstract base class for execution environments.

An execution environment provides a place where agents can execute commands, read/write files, and search the filesystem.

Implementations range from in-memory (for testing) to local subprocess, Docker containers, and cloud-hosted VMs.

The only abstract member is capabilities; all tool methods raise NotImplementedError by default. Concrete subclasses override the methods that match their declared capabilities.

Source code in pydantic_ai_slim/pydantic_ai/environments/_base.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
class ExecutionEnvironment(ABC):
    """Abstract base class for execution environments.

    An execution environment provides a place where agents can execute
    commands, read/write files, and search the filesystem.

    Implementations range from in-memory (for testing) to local subprocess,
    Docker containers, and cloud-hosted VMs.

    The only abstract member is `capabilities`; all tool methods raise
    `NotImplementedError` by default. Concrete subclasses override the
    methods that match their declared capabilities.
    """

    # --- Capability introspection ---

    @property
    @abstractmethod
    def capabilities(self) -> frozenset[EnvToolName]:
        """Capabilities this environment supports (high-level).

        Used by toolsets to decide which tools to register. Only methods
        corresponding to declared capabilities need to be implemented.
        """
        ...

    # --- Tool methods ---
    # All raise NotImplementedError by default. Concrete subclasses override
    # the methods that match their declared capabilities.

    async def shell(
        self,
        command: str,
        *,
        timeout: float | None = 120,
        env: dict[str, str] | None = None,
    ) -> ExecutionResult:
        """Execute a shell command and return the result.

        Args:
            command: The shell command to execute.
            timeout: Maximum seconds to wait for completion.
                Pass `None` to disable the timeout.
            env: Additional environment variables for this command.
                Merged with (and overrides) any baseline environment variables.

        Returns:
            An `ExecutionResult` with the command output and exit code.
        """
        raise NotImplementedError(f'{type(self).__name__} does not support shell.')

    async def read_file(
        self,
        path: str,
        *,
        offset: int = 0,
        limit: int = 2000,
    ) -> str | bytes:
        """Read a file from the environment.

        For text files, returns a string with `cat -n` style line numbers.
        For binary files (images), returns raw bytes.

        Args:
            path: The file path within the environment.
            offset: The line number to start reading from (0-indexed).
                Ignored for binary files.
            limit: Maximum number of lines to read.
                Ignored for binary files.

        Returns:
            Text content with line numbers (`str`), or raw bytes for binary files.
        """
        raise NotImplementedError(f'{type(self).__name__} does not support read_file.')

    async def write_file(self, path: str, content: str | bytes) -> None:
        """Create or overwrite a file in the environment.

        Args:
            path: The file path within the environment.
            content: The file content (text or binary).
        """
        raise NotImplementedError(f'{type(self).__name__} does not support write_file.')

    async def replace_str(
        self,
        path: str,
        old: str,
        new: str,
        *,
        replace_all: bool = False,
    ) -> int:
        """Edit a file by exact string replacement.

        Args:
            path: The file path within the environment.
            old: The exact text to find.
            new: The replacement text.
            replace_all: If True, replace all occurrences. If False, the
                old string must appear exactly once or an error is raised.

        Returns:
            The number of replacements made.

        Raises:
            FileNotFoundError: If the file does not exist.
            ValueError: If `old` is not found, or appears multiple times
                when `replace_all` is False.
        """
        raise NotImplementedError(f'{type(self).__name__} does not support replace_str.')

    # --- Internal helpers (not tools) ---

    async def create_process(
        self,
        command: str,
        *,
        env: dict[str, str] | None = None,
    ) -> ExecutionProcess:
        r"""Create an interactive process with streaming stdin/stdout.

        Args:
            command: The shell command to run.
            env: Additional environment variables for this process.

        Returns:
            An `ExecutionProcess` handle for bidirectional I/O.
        """
        raise NotImplementedError(f'{type(self).__name__} does not support interactive processes.')

    # --- Lifecycle ---

    async def __aenter__(self) -> Self:
        """Start the environment (e.g., create a Docker container)."""
        return self

    async def __aexit__(self, *args: Any) -> None:
        """Stop the environment and clean up resources."""

capabilities abstractmethod property

capabilities: frozenset[EnvToolName]

Capabilities this environment supports (high-level).

Used by toolsets to decide which tools to register. Only methods corresponding to declared capabilities need to be implemented.

shell async

shell(
    command: str,
    *,
    timeout: float | None = 120,
    env: dict[str, str] | None = None
) -> ExecutionResult

Execute a shell command and return the result.

Parameters:

Name Type Description Default
command str

The shell command to execute.

required
timeout float | None

Maximum seconds to wait for completion. Pass None to disable the timeout.

120
env dict[str, str] | None

Additional environment variables for this command. Merged with (and overrides) any baseline environment variables.

None

Returns:

Type Description
ExecutionResult

An ExecutionResult with the command output and exit code.

Source code in pydantic_ai_slim/pydantic_ai/environments/_base.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
async def shell(
    self,
    command: str,
    *,
    timeout: float | None = 120,
    env: dict[str, str] | None = None,
) -> ExecutionResult:
    """Execute a shell command and return the result.

    Args:
        command: The shell command to execute.
        timeout: Maximum seconds to wait for completion.
            Pass `None` to disable the timeout.
        env: Additional environment variables for this command.
            Merged with (and overrides) any baseline environment variables.

    Returns:
        An `ExecutionResult` with the command output and exit code.
    """
    raise NotImplementedError(f'{type(self).__name__} does not support shell.')

read_file async

read_file(
    path: str, *, offset: int = 0, limit: int = 2000
) -> str | bytes

Read a file from the environment.

For text files, returns a string with cat -n style line numbers. For binary files (images), returns raw bytes.

Parameters:

Name Type Description Default
path str

The file path within the environment.

required
offset int

The line number to start reading from (0-indexed). Ignored for binary files.

0
limit int

Maximum number of lines to read. Ignored for binary files.

2000

Returns:

Type Description
str | bytes

Text content with line numbers (str), or raw bytes for binary files.

Source code in pydantic_ai_slim/pydantic_ai/environments/_base.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
async def read_file(
    self,
    path: str,
    *,
    offset: int = 0,
    limit: int = 2000,
) -> str | bytes:
    """Read a file from the environment.

    For text files, returns a string with `cat -n` style line numbers.
    For binary files (images), returns raw bytes.

    Args:
        path: The file path within the environment.
        offset: The line number to start reading from (0-indexed).
            Ignored for binary files.
        limit: Maximum number of lines to read.
            Ignored for binary files.

    Returns:
        Text content with line numbers (`str`), or raw bytes for binary files.
    """
    raise NotImplementedError(f'{type(self).__name__} does not support read_file.')

write_file async

write_file(path: str, content: str | bytes) -> None

Create or overwrite a file in the environment.

Parameters:

Name Type Description Default
path str

The file path within the environment.

required
content str | bytes

The file content (text or binary).

required
Source code in pydantic_ai_slim/pydantic_ai/environments/_base.py
217
218
219
220
221
222
223
224
async def write_file(self, path: str, content: str | bytes) -> None:
    """Create or overwrite a file in the environment.

    Args:
        path: The file path within the environment.
        content: The file content (text or binary).
    """
    raise NotImplementedError(f'{type(self).__name__} does not support write_file.')

replace_str async

replace_str(
    path: str,
    old: str,
    new: str,
    *,
    replace_all: bool = False
) -> int

Edit a file by exact string replacement.

Parameters:

Name Type Description Default
path str

The file path within the environment.

required
old str

The exact text to find.

required
new str

The replacement text.

required
replace_all bool

If True, replace all occurrences. If False, the old string must appear exactly once or an error is raised.

False

Returns:

Type Description
int

The number of replacements made.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

ValueError

If old is not found, or appears multiple times when replace_all is False.

Source code in pydantic_ai_slim/pydantic_ai/environments/_base.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
async def replace_str(
    self,
    path: str,
    old: str,
    new: str,
    *,
    replace_all: bool = False,
) -> int:
    """Edit a file by exact string replacement.

    Args:
        path: The file path within the environment.
        old: The exact text to find.
        new: The replacement text.
        replace_all: If True, replace all occurrences. If False, the
            old string must appear exactly once or an error is raised.

    Returns:
        The number of replacements made.

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: If `old` is not found, or appears multiple times
            when `replace_all` is False.
    """
    raise NotImplementedError(f'{type(self).__name__} does not support replace_str.')

create_process async

create_process(
    command: str, *, env: dict[str, str] | None = None
) -> ExecutionProcess

Create an interactive process with streaming stdin/stdout.

Parameters:

Name Type Description Default
command str

The shell command to run.

required
env dict[str, str] | None

Additional environment variables for this process.

None

Returns:

Type Description
ExecutionProcess

An ExecutionProcess handle for bidirectional I/O.

Source code in pydantic_ai_slim/pydantic_ai/environments/_base.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
async def create_process(
    self,
    command: str,
    *,
    env: dict[str, str] | None = None,
) -> ExecutionProcess:
    r"""Create an interactive process with streaming stdin/stdout.

    Args:
        command: The shell command to run.
        env: Additional environment variables for this process.

    Returns:
        An `ExecutionProcess` handle for bidirectional I/O.
    """
    raise NotImplementedError(f'{type(self).__name__} does not support interactive processes.')

__aenter__ async

__aenter__() -> Self

Start the environment (e.g., create a Docker container).

Source code in pydantic_ai_slim/pydantic_ai/environments/_base.py
274
275
276
async def __aenter__(self) -> Self:
    """Start the environment (e.g., create a Docker container)."""
    return self

__aexit__ async

__aexit__(*args: Any) -> None

Stop the environment and clean up resources.

Source code in pydantic_ai_slim/pydantic_ai/environments/_base.py
278
279
async def __aexit__(self, *args: Any) -> None:
    """Stop the environment and clean up resources."""

ExecutionEnvironmentToolset

Bases: FunctionToolset[Any]

Toolset providing coding-agent-style tools backed by an ExecutionEnvironment.

Tool names and schemas are designed to match what popular coding agents expose, so models are well-trained on them.

Tools are dynamically registered based on the environment's capabilities, filtered by include/exclude.

The environment can be: - Passed directly at construction time via shared_environment (shared across concurrent runs) - Created per-run via environment_factory (isolated concurrent runs) - Set/overridden via context var using use_environment() (for testing or per-call-site config)

Usage
from pydantic_ai import Agent
from pydantic_ai.environments import ExecutionEnvironmentToolset
from pydantic_ai.environments.docker import DockerEnvironment

env = DockerEnvironment(image='python:3.12-slim')
toolset = ExecutionEnvironmentToolset(env)

agent = Agent('openai:gpt-5.2', toolsets=[toolset])

async with env:
    result = await agent.run('Write a script that prints hello')
Source code in pydantic_ai_slim/pydantic_ai/toolsets/execution_environment.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
class ExecutionEnvironmentToolset(FunctionToolset[Any]):
    """Toolset providing coding-agent-style tools backed by an `ExecutionEnvironment`.

    Tool names and schemas are designed to match what popular coding agents
    expose, so models are well-trained on them.

    Tools are dynamically registered based on the environment's `capabilities`,
    filtered by `include`/`exclude`.

    The environment can be:
    - Passed directly at construction time via `shared_environment` (shared across concurrent runs)
    - Created per-run via `environment_factory` (isolated concurrent runs)
    - Set/overridden via context var using `use_environment()` (for testing or per-call-site config)

    Usage:
        ```python {test="skip" lint="skip"}
        from pydantic_ai import Agent
        from pydantic_ai.environments import ExecutionEnvironmentToolset
        from pydantic_ai.environments.docker import DockerEnvironment

        env = DockerEnvironment(image='python:3.12-slim')
        toolset = ExecutionEnvironmentToolset(env)

        agent = Agent('openai:gpt-5.2', toolsets=[toolset])

        async with env:
            result = await agent.run('Write a script that prints hello')
        ```
    """

    def __init__(
        self,
        shared_environment: ExecutionEnvironment | None = None,
        *,
        environment_factory: Callable[[], ExecutionEnvironment] | None = None,
        include: Sequence[EnvToolName] | None = None,
        exclude: Sequence[EnvToolName] | None = None,
        require_shell_approval: bool = False,
        require_write_approval: bool = False,
        image_support: bool = True,
        max_image_bytes: int = 50 * 1024 * 1024,
        max_retries: int = 1,
        id: str | None = None,
    ):
        """Create a new execution environment toolset.

        Args:
            shared_environment: A shared execution environment for tool execution.
                All concurrent runs share this single environment instance.
                Can also be set later via `use_environment()`.
            environment_factory: A callable that creates a fresh environment per
                `async with toolset:` entry. Use this for concurrent runs that need
                isolation (e.g. separate Docker containers). Mutually exclusive with
                `shared_environment`.
            include: Tool names to include. `None` means all tools supported
                by the environment. Pass an explicit sequence to restrict to
                specific tools.
            exclude: Tool names to exclude. `None` defaults to no exclusions.
                Pass an explicit sequence to exclude specific tools.
            require_shell_approval: Whether the `shell` tool requires human-in-the-loop
                approval before execution. Recommended for `LocalEnvironment` where
                commands run directly on the host.
            require_write_approval: Whether `write_file` and edit tools require
                human-in-the-loop approval before execution.
            image_support: Whether `read_file` should return images as `BinaryContent`
                for multimodal models (otherwise returns a placeholder message).
            max_image_bytes: Maximum image file size to return as BinaryContent.
            max_retries: Maximum retries per tool call.
            id: Optional unique ID for the toolset (required for durable execution).
        """
        if shared_environment is not None and environment_factory is not None:
            raise ValueError('Cannot provide both shared_environment and environment_factory.')

        super().__init__(max_retries=max_retries, id=id)
        self._shared_environment = shared_environment
        self._environment_factory = environment_factory
        self._environment_override: ContextVar[ExecutionEnvironment | None] = ContextVar(
            f'_environment_override_{id or "environment"}', default=None
        )
        self._per_run_state: ContextVar[tuple[AsyncExitStack, Token[ExecutionEnvironment | None]] | None] = ContextVar(
            f'_per_run_state_{id or "environment"}', default=None
        )
        self._include: frozenset[EnvToolName] | None = frozenset(include) if include is not None else None
        self._exclude: frozenset[EnvToolName] = frozenset(exclude) if exclude else frozenset()
        self._image_support = image_support
        self._max_image_bytes = max_image_bytes
        self._require_shell_approval = require_shell_approval
        self._require_write_approval = require_write_approval
        self._enter_lock = anyio.Lock()
        self._running_count: int = 0
        self._exit_stack: AsyncExitStack | None = None

        # Register all tools unconditionally so schemas are built eagerly.
        # get_tools() filters at runtime based on the current environment's capabilities.
        self._register_tools()

    def _resolve_tool_names(self, env: ExecutionEnvironment) -> frozenset[str]:
        """Determine which tool names to expose, based on the environment's capabilities and include/exclude."""
        tool_names: set[str] = set(env.capabilities)

        if self._include is not None:
            tool_names &= self._include
        tool_names -= self._exclude

        return frozenset(tool_names)

    def _register_tools(self) -> None:
        """Register all tools unconditionally.

        Filtering based on the environment's capabilities and include/exclude
        is deferred to ``get_tools()``, which runs at request time when the
        active environment is known.
        """
        self._register_shell()
        self._register_read_file()
        self._register_write_file()
        self._register_edit_file()

    def _register_shell(self) -> None:
        async def shell(command: str, timeout: int = 120) -> str:
            """Execute a shell command and return its output.

            Use this for running scripts, installing packages, and other terminal operations.

            Args:
                command: The shell command to execute.
                timeout: Maximum seconds to wait for the command to complete.
            """
            result = await self.required_environment.shell(command, timeout=timeout)
            parts: list[str] = []
            if result.output:
                parts.append(result.output)
            parts.append(f'Exit code: {result.exit_code}')
            return '\n'.join(parts)

        self.tool(requires_approval=self._require_shell_approval)(shell)

    def _register_read_file(self) -> None:
        async def read_file(path: str, offset: int = 0, limit: int = 2000) -> Any:
            """Read a file from the filesystem.

            Returns text files with line numbers, or renders image files for visual inspection.
            Use offset and limit to read specific sections of large files.

            Args:
                path: The file path to read.
                offset: The line number to start reading from (0-indexed).
                limit: Maximum number of lines to read.
            """
            try:
                content = await self.required_environment.read_file(path, offset=offset, limit=limit)
                if isinstance(content, bytes):
                    ext = posixpath.splitext(path)[1].lower()
                    if ext in IMAGE_EXTENSIONS:
                        # Image file — return as BinaryContent or placeholder
                        if self._image_support:
                            if len(content) > self._max_image_bytes:
                                return (
                                    f'Error: Image too large ({len(content)} bytes, max {self._max_image_bytes} bytes).'
                                )
                            media_type = IMAGE_MEDIA_TYPES.get(ext, 'application/octet-stream')
                            return BinaryContent(data=content, media_type=media_type)
                        else:
                            return f'[Image file: {path} — image_support is disabled on this toolset]'
                    else:
                        return f'[Binary file: {path} — cannot display as text]'
                return content
            except (FileNotFoundError, PermissionError, ValueError, OSError) as e:
                return f'Error: {e}'

        self.tool(read_file)

    def _register_write_file(self) -> None:
        async def write_file(path: str, content: str) -> str:
            """Create or overwrite a file.

            The file and any parent directories will be created if they do not exist.

            Args:
                path: The file path to write.
                content: The content to write to the file.
            """
            try:
                await self.required_environment.write_file(path, content)
                return f'File written: {path}'
            except (PermissionError, OSError) as e:
                return f'Error: {e}'

        self.tool(requires_approval=self._require_write_approval)(write_file)

    def _register_edit_file(self) -> None:
        async def edit_file(path: str, old: str, new: str, replace_all: bool = False) -> str:
            """Edit a file by exact string replacement.

            The old string must match exactly (including whitespace and indentation).
            For uniqueness, include surrounding context lines.
            Only use this after reading the file first.

            Args:
                path: The file path to edit.
                old: The exact text to find and replace.
                new: The replacement text.
                replace_all: Replace all occurrences. Defaults to false (old must be unique).
            """
            try:
                count = await self.required_environment.replace_str(path, old, new, replace_all=replace_all)
                return f'Replaced {count} occurrence{"s" if count != 1 else ""} in {path}.'
            except (FileNotFoundError, PermissionError, ValueError, OSError) as e:
                raise ModelRetry(str(e))

        self.tool(requires_approval=self._require_write_approval)(edit_file)

    async def get_tools(self, ctx: RunContext[AgentDepsT]) -> dict[str, ToolsetTool[AgentDepsT]]:
        all_tools = await super().get_tools(ctx)
        env = self.required_environment
        tool_names = self._resolve_tool_names(env)
        filtered = {name: tool for name, tool in all_tools.items() if name in tool_names}

        # Override tool descriptions from environment method docstrings.
        # Each environment subclass can document its tool methods with LLM-facing
        # docstrings (e.g. explaining regex flavor for grep); if present, these
        # replace the generic default description.
        env_type = type(env)
        for tool_name, tool in filtered.items():
            method_name = _TOOL_TO_ENV_METHOD.get(tool_name, tool_name)
            env_method = getattr(env_type, method_name, None)
            base_method = getattr(ExecutionEnvironment, method_name, None)
            if env_method is not None and env_method is not base_method and env_method.__doc__:
                desc = inspect.cleandoc(env_method.__doc__)
                filtered[tool_name] = replace(tool, tool_def=replace(tool.tool_def, description=desc))

        return filtered

    @property
    def tool_name_conflict_hint(self) -> str:
        return 'Wrap the ExecutionEnvironmentToolset in a PrefixedToolset to avoid name conflicts.'

    @property
    def environment(self) -> ExecutionEnvironment | None:
        """The active execution environment, or None if not configured.

        Checks the context var override first (which includes per-run factory
        environments), then falls back to the shared environment.
        """
        override = self._environment_override.get()
        if override is not None:
            return override
        return self._shared_environment

    @property
    def required_environment(self) -> ExecutionEnvironment:
        """The active execution environment, raising if not configured.

        Raises:
            RuntimeError: If no environment is available.
        """
        env = self.environment
        if env is not None:
            return env
        raise RuntimeError(
            'No execution environment configured. Pass one to ExecutionEnvironmentToolset() or use .use_environment().'
        )

    @contextmanager
    def use_environment(self, environment: ExecutionEnvironment) -> Iterator[None]:
        """Override the execution environment for the current context.

        Useful for testing or using different environments at different call sites.

        Usage:
            ```python {test="skip" lint="skip"}
            with toolset.use_environment(test_env):
                result = await agent.run('test prompt', toolsets=[toolset])
            ```

        Args:
            environment: The execution environment to use within this context.
        """
        token = self._environment_override.set(environment)
        try:
            yield
        finally:
            self._environment_override.reset(token)

    # --- Lifecycle ---

    async def __aenter__(self) -> Self:
        if self._environment_factory is not None:
            env = self._environment_factory()
            stack = AsyncExitStack()
            await stack.enter_async_context(env)
            token = self._environment_override.set(env)
            self._per_run_state.set((stack, token))
        else:
            async with self._enter_lock:
                self._running_count += 1
                if self._running_count == 1:
                    # Use _shared_environment directly (not required_environment) to avoid
                    # entering a use_environment() override into the shared exit stack.
                    env = self._shared_environment
                    if env is None:
                        self._running_count -= 1
                        raise RuntimeError(
                            'No execution environment configured. Pass one to ExecutionEnvironmentToolset() or use environment_factory.'
                        )
                    self._exit_stack = AsyncExitStack()
                    try:
                        await self._exit_stack.enter_async_context(env)
                    except Exception:
                        self._running_count -= 1
                        raise
        return self

    async def __aexit__(self, *args: Any) -> bool | None:
        if self._environment_factory is not None:
            state = self._per_run_state.get()
            if state is not None:  # pragma: no branch
                stack, token = state
                await stack.aclose()
                self._environment_override.reset(token)
                self._per_run_state.set(None)
        else:
            async with self._enter_lock:
                self._running_count -= 1
                if self._running_count == 0 and self._exit_stack is not None:
                    await self._exit_stack.aclose()
                    self._exit_stack = None
        return None

__init__

__init__(
    shared_environment: ExecutionEnvironment | None = None,
    *,
    environment_factory: (
        Callable[[], ExecutionEnvironment] | None
    ) = None,
    include: Sequence[EnvToolName] | None = None,
    exclude: Sequence[EnvToolName] | None = None,
    require_shell_approval: bool = False,
    require_write_approval: bool = False,
    image_support: bool = True,
    max_image_bytes: int = 50 * 1024 * 1024,
    max_retries: int = 1,
    id: str | None = None
)

Create a new execution environment toolset.

Parameters:

Name Type Description Default
shared_environment ExecutionEnvironment | None

A shared execution environment for tool execution. All concurrent runs share this single environment instance. Can also be set later via use_environment().

None
environment_factory Callable[[], ExecutionEnvironment] | None

A callable that creates a fresh environment per async with toolset: entry. Use this for concurrent runs that need isolation (e.g. separate Docker containers). Mutually exclusive with shared_environment.

None
include Sequence[EnvToolName] | None

Tool names to include. None means all tools supported by the environment. Pass an explicit sequence to restrict to specific tools.

None
exclude Sequence[EnvToolName] | None

Tool names to exclude. None defaults to no exclusions. Pass an explicit sequence to exclude specific tools.

None
require_shell_approval bool

Whether the shell tool requires human-in-the-loop approval before execution. Recommended for LocalEnvironment where commands run directly on the host.

False
require_write_approval bool

Whether write_file and edit tools require human-in-the-loop approval before execution.

False
image_support bool

Whether read_file should return images as BinaryContent for multimodal models (otherwise returns a placeholder message).

True
max_image_bytes int

Maximum image file size to return as BinaryContent.

50 * 1024 * 1024
max_retries int

Maximum retries per tool call.

1
id str | None

Optional unique ID for the toolset (required for durable execution).

None
Source code in pydantic_ai_slim/pydantic_ai/toolsets/execution_environment.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self,
    shared_environment: ExecutionEnvironment | None = None,
    *,
    environment_factory: Callable[[], ExecutionEnvironment] | None = None,
    include: Sequence[EnvToolName] | None = None,
    exclude: Sequence[EnvToolName] | None = None,
    require_shell_approval: bool = False,
    require_write_approval: bool = False,
    image_support: bool = True,
    max_image_bytes: int = 50 * 1024 * 1024,
    max_retries: int = 1,
    id: str | None = None,
):
    """Create a new execution environment toolset.

    Args:
        shared_environment: A shared execution environment for tool execution.
            All concurrent runs share this single environment instance.
            Can also be set later via `use_environment()`.
        environment_factory: A callable that creates a fresh environment per
            `async with toolset:` entry. Use this for concurrent runs that need
            isolation (e.g. separate Docker containers). Mutually exclusive with
            `shared_environment`.
        include: Tool names to include. `None` means all tools supported
            by the environment. Pass an explicit sequence to restrict to
            specific tools.
        exclude: Tool names to exclude. `None` defaults to no exclusions.
            Pass an explicit sequence to exclude specific tools.
        require_shell_approval: Whether the `shell` tool requires human-in-the-loop
            approval before execution. Recommended for `LocalEnvironment` where
            commands run directly on the host.
        require_write_approval: Whether `write_file` and edit tools require
            human-in-the-loop approval before execution.
        image_support: Whether `read_file` should return images as `BinaryContent`
            for multimodal models (otherwise returns a placeholder message).
        max_image_bytes: Maximum image file size to return as BinaryContent.
        max_retries: Maximum retries per tool call.
        id: Optional unique ID for the toolset (required for durable execution).
    """
    if shared_environment is not None and environment_factory is not None:
        raise ValueError('Cannot provide both shared_environment and environment_factory.')

    super().__init__(max_retries=max_retries, id=id)
    self._shared_environment = shared_environment
    self._environment_factory = environment_factory
    self._environment_override: ContextVar[ExecutionEnvironment | None] = ContextVar(
        f'_environment_override_{id or "environment"}', default=None
    )
    self._per_run_state: ContextVar[tuple[AsyncExitStack, Token[ExecutionEnvironment | None]] | None] = ContextVar(
        f'_per_run_state_{id or "environment"}', default=None
    )
    self._include: frozenset[EnvToolName] | None = frozenset(include) if include is not None else None
    self._exclude: frozenset[EnvToolName] = frozenset(exclude) if exclude else frozenset()
    self._image_support = image_support
    self._max_image_bytes = max_image_bytes
    self._require_shell_approval = require_shell_approval
    self._require_write_approval = require_write_approval
    self._enter_lock = anyio.Lock()
    self._running_count: int = 0
    self._exit_stack: AsyncExitStack | None = None

    # Register all tools unconditionally so schemas are built eagerly.
    # get_tools() filters at runtime based on the current environment's capabilities.
    self._register_tools()

environment property

environment: ExecutionEnvironment | None

The active execution environment, or None if not configured.

Checks the context var override first (which includes per-run factory environments), then falls back to the shared environment.

required_environment property

required_environment: ExecutionEnvironment

The active execution environment, raising if not configured.

Raises:

Type Description
RuntimeError

If no environment is available.

use_environment

use_environment(
    environment: ExecutionEnvironment,
) -> Iterator[None]

Override the execution environment for the current context.

Useful for testing or using different environments at different call sites.

Usage
with toolset.use_environment(test_env):
    result = await agent.run('test prompt', toolsets=[toolset])

Parameters:

Name Type Description Default
environment ExecutionEnvironment

The execution environment to use within this context.

required
Source code in pydantic_ai_slim/pydantic_ai/toolsets/execution_environment.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
@contextmanager
def use_environment(self, environment: ExecutionEnvironment) -> Iterator[None]:
    """Override the execution environment for the current context.

    Useful for testing or using different environments at different call sites.

    Usage:
        ```python {test="skip" lint="skip"}
        with toolset.use_environment(test_env):
            result = await agent.run('test prompt', toolsets=[toolset])
        ```

    Args:
        environment: The execution environment to use within this context.
    """
    token = self._environment_override.set(environment)
    try:
        yield
    finally:
        self._environment_override.reset(token)

ExecutionProcess

Bases: ABC

Handle to a running process with bidirectional streaming I/O.

Used for interactive execution where a script outputs data, waits for input, processes it, and outputs more data.

Source code in pydantic_ai_slim/pydantic_ai/environments/_base.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class ExecutionProcess(ABC):
    """Handle to a running process with bidirectional streaming I/O.

    Used for interactive execution where a script outputs data,
    waits for input, processes it, and outputs more data.
    """

    @abstractmethod
    async def send(self, data: bytes) -> None:
        """Write data to the process's stdin.

        Args:
            data: The bytes to write to stdin.
        """

    @abstractmethod
    async def recv(self, timeout: float | None = None) -> bytes:
        """Read available output from stdout.

        Blocks until data is available, the process exits, or the timeout expires.

        Args:
            timeout: Maximum seconds to wait for data. None means wait indefinitely.

        Raises:
            TimeoutError: If the timeout expires with no data available.
        """

    @abstractmethod
    async def recv_stderr(self, timeout: float | None = None) -> bytes:
        """Read available output from stderr.

        Args:
            timeout: Maximum seconds to wait for data. None means wait indefinitely.

        Raises:
            TimeoutError: If the timeout expires with no data available.
        """

    @property
    @abstractmethod
    def returncode(self) -> int | None:
        """Return code if the process has exited, None if still running."""

    @abstractmethod
    async def wait(self, timeout: float | None = None) -> int:
        """Wait for the process to exit.

        Args:
            timeout: Maximum seconds to wait. None means wait indefinitely.

        Returns:
            The process exit code.

        Raises:
            TimeoutError: If the timeout expires before the process exits.
        """

    @abstractmethod
    async def kill(self) -> None:
        """Kill the process."""

    async def __aenter__(self) -> Self:
        return self

    async def __aexit__(self, *args: Any) -> None:
        if self.returncode is None:
            await self.kill()

send abstractmethod async

send(data: bytes) -> None

Write data to the process's stdin.

Parameters:

Name Type Description Default
data bytes

The bytes to write to stdin.

required
Source code in pydantic_ai_slim/pydantic_ai/environments/_base.py
51
52
53
54
55
56
57
@abstractmethod
async def send(self, data: bytes) -> None:
    """Write data to the process's stdin.

    Args:
        data: The bytes to write to stdin.
    """

recv abstractmethod async

recv(timeout: float | None = None) -> bytes

Read available output from stdout.

Blocks until data is available, the process exits, or the timeout expires.

Parameters:

Name Type Description Default
timeout float | None

Maximum seconds to wait for data. None means wait indefinitely.

None

Raises:

Type Description
TimeoutError

If the timeout expires with no data available.

Source code in pydantic_ai_slim/pydantic_ai/environments/_base.py
59
60
61
62
63
64
65
66
67
68
69
70
@abstractmethod
async def recv(self, timeout: float | None = None) -> bytes:
    """Read available output from stdout.

    Blocks until data is available, the process exits, or the timeout expires.

    Args:
        timeout: Maximum seconds to wait for data. None means wait indefinitely.

    Raises:
        TimeoutError: If the timeout expires with no data available.
    """

recv_stderr abstractmethod async

recv_stderr(timeout: float | None = None) -> bytes

Read available output from stderr.

Parameters:

Name Type Description Default
timeout float | None

Maximum seconds to wait for data. None means wait indefinitely.

None

Raises:

Type Description
TimeoutError

If the timeout expires with no data available.

Source code in pydantic_ai_slim/pydantic_ai/environments/_base.py
72
73
74
75
76
77
78
79
80
81
@abstractmethod
async def recv_stderr(self, timeout: float | None = None) -> bytes:
    """Read available output from stderr.

    Args:
        timeout: Maximum seconds to wait for data. None means wait indefinitely.

    Raises:
        TimeoutError: If the timeout expires with no data available.
    """

returncode abstractmethod property

returncode: int | None

Return code if the process has exited, None if still running.

wait abstractmethod async

wait(timeout: float | None = None) -> int

Wait for the process to exit.

Parameters:

Name Type Description Default
timeout float | None

Maximum seconds to wait. None means wait indefinitely.

None

Returns:

Type Description
int

The process exit code.

Raises:

Type Description
TimeoutError

If the timeout expires before the process exits.

Source code in pydantic_ai_slim/pydantic_ai/environments/_base.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@abstractmethod
async def wait(self, timeout: float | None = None) -> int:
    """Wait for the process to exit.

    Args:
        timeout: Maximum seconds to wait. None means wait indefinitely.

    Returns:
        The process exit code.

    Raises:
        TimeoutError: If the timeout expires before the process exits.
    """

kill abstractmethod async

kill() -> None

Kill the process.

Source code in pydantic_ai_slim/pydantic_ai/environments/_base.py
102
103
104
@abstractmethod
async def kill(self) -> None:
    """Kill the process."""

ExecutionResult dataclass

Result of a completed command execution.

Source code in pydantic_ai_slim/pydantic_ai/environments/_base.py
33
34
35
36
37
38
39
40
41
@dataclass
class ExecutionResult:
    """Result of a completed command execution."""

    output: str
    """The combined stdout/stderr output of the command."""

    exit_code: int
    """The exit code of the command."""

output instance-attribute

output: str

The combined stdout/stderr output of the command.

exit_code instance-attribute

exit_code: int

The exit code of the command.

pydantic_ai.environments.local

Local subprocess-based execution environment for development and testing.

Runs commands directly on the host machine within a specified root directory. No isolation — use DockerEnvironment for untrusted code.

LocalEnvironment

Bases: ExecutionEnvironment

Local subprocess-based execution environment for development and testing.

Runs commands directly on the host machine within a specified root directory. Provides no isolation — use DockerEnvironment for untrusted code.

Usage
async with LocalEnvironment(root_dir='/tmp/workspace') as env:
    result = await env.shell('python script.py')
    print(result.output)
Source code in pydantic_ai_slim/pydantic_ai/environments/local.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
class LocalEnvironment(ExecutionEnvironment):
    """Local subprocess-based execution environment for development and testing.

    Runs commands directly on the host machine within a specified root
    directory. Provides no isolation — use `DockerEnvironment` for untrusted code.

    Usage:
        ```python {test="skip" lint="skip"}
        async with LocalEnvironment(root_dir='/tmp/workspace') as env:
            result = await env.shell('python script.py')
            print(result.output)
        ```
    """

    def __init__(
        self,
        root_dir: str | Path = '.',
        *,
        env_vars: dict[str, str] | None = None,
        inherit_env: bool = True,
    ) -> None:
        """Create a local execution environment.

        Args:
            root_dir: The working directory for all operations.
                Defaults to the current directory.
            env_vars: Baseline environment variables for all commands.
            inherit_env: Whether to inherit the host's environment variables.
                When True (default), `env_vars` and per-call `env` are merged
                on top of `os.environ`. When False, only `env_vars` and per-call
                `env` are used (useful for reproducibility and testing).
        """
        self._root_dir = Path(root_dir).resolve()
        self._env_vars = env_vars or {}
        self._inherit_env = inherit_env

    @property
    def capabilities(self) -> frozenset[EnvToolName]:
        return frozenset({'shell', 'read_file', 'write_file', 'edit_file'})

    async def __aenter__(self) -> Self:
        await anyio.to_thread.run_sync(lambda: self._root_dir.mkdir(parents=True, exist_ok=True))
        return self

    async def __aexit__(self, *_args: Any) -> None:
        pass

    def _resolve_path(self, path: str) -> Path:
        """Resolve a path relative to root_dir, preventing traversal."""
        resolved = (self._root_dir / path).resolve()
        if not resolved.is_relative_to(self._root_dir):
            raise PermissionError(f'Path {path!r} resolves outside the environment root.')
        return resolved

    def _build_env(self, env: dict[str, str] | None) -> dict[str, str] | None:
        """Merge baseline env vars with per-call overrides."""
        if not self._env_vars and not env and self._inherit_env:
            return None  # subprocess inherits naturally
        import os

        merged = {**os.environ} if self._inherit_env else {}
        merged.update(self._env_vars)
        if env:
            merged.update(env)
        return merged

    async def create_process(
        self,
        command: str,
        *,
        env: dict[str, str] | None = None,
    ) -> ExecutionProcess:
        proc = await anyio.open_process(
            command,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            cwd=self._root_dir,
            env=self._build_env(env),
        )
        return _LocalEnvironmentProcess(proc)

    async def shell(
        self,
        command: str,
        *,
        timeout: float | None = 120,
        env: dict[str, str] | None = None,
    ) -> ExecutionResult:
        """Execute a command using subprocess for simplicity and reliability."""
        if timeout is not None and timeout <= 0:
            raise ValueError(f'timeout must be positive or None, got {timeout}')
        proc = await anyio.open_process(
            command,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            cwd=self._root_dir,
            env=self._build_env(env),
        )
        try:
            assert proc.stdout is not None
            chunks: list[bytes] = []
            if timeout is not None:
                with anyio.fail_after(timeout):
                    async for chunk in proc.stdout:
                        chunks.append(chunk)
                    await proc.wait()
            else:
                async for chunk in proc.stdout:
                    chunks.append(chunk)
                await proc.wait()
        except TimeoutError:
            proc.kill()
            with anyio.CancelScope(shield=True):
                await proc.wait()
            _close_subprocess_transport(proc)
            return ExecutionResult(output='[Command timed out]', exit_code=-1)

        _close_subprocess_transport(proc)
        stdout = b''.join(chunks)
        output = stdout.decode('utf-8', errors='replace')
        return ExecutionResult(
            output=output,
            exit_code=proc.returncode if proc.returncode is not None else 0,
        )

    async def read_file(self, path: str, *, offset: int = 0, limit: int = 2000) -> str | bytes:
        resolved = self._resolve_path(path)

        def _read() -> str | bytes:
            if not resolved.is_file():
                if resolved.is_dir():
                    raise FileNotFoundError(f"'{path}' is a directory, not a file.")
                raise FileNotFoundError(f'File not found: {path}')

            if resolved.suffix.lower() in IMAGE_EXTENSIONS:
                return resolved.read_bytes()

            raw = resolved.read_bytes()
            try:
                text = raw.decode('utf-8')
            except UnicodeDecodeError:
                return raw
            return format_lines(text, offset, limit)

        return await anyio.to_thread.run_sync(_read)

    async def write_file(self, path: str, content: str | bytes) -> None:
        resolved = self._resolve_path(path)

        def _write() -> None:
            resolved.parent.mkdir(parents=True, exist_ok=True)
            if isinstance(content, bytes):
                resolved.write_bytes(content)
            else:
                resolved.write_text(content, encoding='utf-8')

        await anyio.to_thread.run_sync(_write)

    async def replace_str(
        self,
        path: str,
        old: str,
        new: str,
        *,
        replace_all: bool = False,
    ) -> int:
        resolved = self._resolve_path(path)

        def _edit() -> int:
            if not resolved.is_file():
                raise FileNotFoundError(f'File not found: {path}')

            text = resolved.read_text(encoding='utf-8')
            new_text, count = apply_edit(text, old, new, path, replace_all=replace_all)
            resolved.write_text(new_text, encoding='utf-8')
            return count

        return await anyio.to_thread.run_sync(_edit)

__init__

__init__(
    root_dir: str | Path = ".",
    *,
    env_vars: dict[str, str] | None = None,
    inherit_env: bool = True
) -> None

Create a local execution environment.

Parameters:

Name Type Description Default
root_dir str | Path

The working directory for all operations. Defaults to the current directory.

'.'
env_vars dict[str, str] | None

Baseline environment variables for all commands.

None
inherit_env bool

Whether to inherit the host's environment variables. When True (default), env_vars and per-call env are merged on top of os.environ. When False, only env_vars and per-call env are used (useful for reproducibility and testing).

True
Source code in pydantic_ai_slim/pydantic_ai/environments/local.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def __init__(
    self,
    root_dir: str | Path = '.',
    *,
    env_vars: dict[str, str] | None = None,
    inherit_env: bool = True,
) -> None:
    """Create a local execution environment.

    Args:
        root_dir: The working directory for all operations.
            Defaults to the current directory.
        env_vars: Baseline environment variables for all commands.
        inherit_env: Whether to inherit the host's environment variables.
            When True (default), `env_vars` and per-call `env` are merged
            on top of `os.environ`. When False, only `env_vars` and per-call
            `env` are used (useful for reproducibility and testing).
    """
    self._root_dir = Path(root_dir).resolve()
    self._env_vars = env_vars or {}
    self._inherit_env = inherit_env

shell async

shell(
    command: str,
    *,
    timeout: float | None = 120,
    env: dict[str, str] | None = None
) -> ExecutionResult

Execute a command using subprocess for simplicity and reliability.

Source code in pydantic_ai_slim/pydantic_ai/environments/local.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
async def shell(
    self,
    command: str,
    *,
    timeout: float | None = 120,
    env: dict[str, str] | None = None,
) -> ExecutionResult:
    """Execute a command using subprocess for simplicity and reliability."""
    if timeout is not None and timeout <= 0:
        raise ValueError(f'timeout must be positive or None, got {timeout}')
    proc = await anyio.open_process(
        command,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        cwd=self._root_dir,
        env=self._build_env(env),
    )
    try:
        assert proc.stdout is not None
        chunks: list[bytes] = []
        if timeout is not None:
            with anyio.fail_after(timeout):
                async for chunk in proc.stdout:
                    chunks.append(chunk)
                await proc.wait()
        else:
            async for chunk in proc.stdout:
                chunks.append(chunk)
            await proc.wait()
    except TimeoutError:
        proc.kill()
        with anyio.CancelScope(shield=True):
            await proc.wait()
        _close_subprocess_transport(proc)
        return ExecutionResult(output='[Command timed out]', exit_code=-1)

    _close_subprocess_transport(proc)
    stdout = b''.join(chunks)
    output = stdout.decode('utf-8', errors='replace')
    return ExecutionResult(
        output=output,
        exit_code=proc.returncode if proc.returncode is not None else 0,
    )

pydantic_ai.environments.docker

Docker container-based environment for isolated code execution.

Requires the docker package: pip install pydantic-ai-slim[docker-environment]

DockerEnvironment

Bases: ExecutionEnvironment

Docker container-based environment for isolated code execution.

Provides isolated code execution with configurable resource limits, network access, and persistent or ephemeral workspaces.

Usage
async with DockerEnvironment(image='python:3.12-slim') as env:
    result = await env.shell('python -c "print(42)"')
    print(result.output)
Source code in pydantic_ai_slim/pydantic_ai/environments/docker.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
class DockerEnvironment(ExecutionEnvironment):
    """Docker container-based environment for isolated code execution.

    Provides isolated code execution with configurable resource limits,
    network access, and persistent or ephemeral workspaces.

    Usage:
        ```python {test="skip" lint="skip"}
        async with DockerEnvironment(image='python:3.12-slim') as env:
            result = await env.shell('python -c "print(42)"')
            print(result.output)
        ```
    """

    def __init__(
        self,
        *,
        image: str = 'python:3.12-slim',
        env_vars: dict[str, str] | None = None,
        work_dir: str = '/workspace',
        volumes: dict[str, dict[str, str]] | None = None,
        memory_limit: str | None = None,
        cpu_limit: float | None = None,
        pids_limit: int | None = None,
        network_disabled: bool = False,
        read_only: bool = False,
        cap_drop: list[str] | None = None,
        security_opt: list[str] | None = None,
        user: str | None = None,
        tmpfs: dict[str, str] | None = None,
        init: bool = False,
    ) -> None:
        """Create a Docker environment.

        Args:
            image: Docker image to use. Pre-build custom images with any
                required packages before passing them here.
            env_vars: Baseline environment variables to set in the container.
            work_dir: Working directory inside the container.
            volumes: Volume mounts (Docker format).
            memory_limit: Memory limit (e.g. '512m', '1g').
            cpu_limit: CPU limit (e.g. 1.0 for one CPU).
            pids_limit: Maximum number of PIDs in the container (e.g. 256).
                Prevents fork bombs.
            network_disabled: Whether to disable network access.
            read_only: Whether to mount the root filesystem as read-only.
                Use with `tmpfs` to provide writable scratch space.
            cap_drop: Linux capabilities to drop (e.g. `['ALL']`).
            security_opt: Security options (e.g. `['no-new-privileges']`).
            user: User to run as inside the container (e.g. `'nobody'`).
            tmpfs: tmpfs mounts as `{path: options}`
                (e.g. `{'/tmp': 'noexec,nosuid,size=64m'}`).
            init: Whether to use `--init` to run an init process as PID 1.
                Ensures proper signal handling and zombie reaping.
        """
        self._image = image
        self._env_vars = env_vars or {}
        self._work_dir = work_dir
        self._volumes = volumes
        self._memory_limit = memory_limit
        self._cpu_limit = cpu_limit
        self._pids_limit = pids_limit
        self._network_disabled = network_disabled
        self._read_only = read_only
        self._cap_drop = cap_drop
        self._security_opt = security_opt
        self._user = user
        self._tmpfs = tmpfs
        self._init = init

        self._client: docker.DockerClient | None = None
        self._container: Container | None = None

    @classmethod
    def hardened(
        cls,
        *,
        image: str = 'python:3.12-slim',
        env_vars: dict[str, str] | None = None,
        work_dir: str = '/workspace',
        memory_limit: str = '512m',
        cpu_limit: float = 1.0,
        pids_limit: int = 256,
    ) -> DockerEnvironment:
        """Create a hardened Docker environment with security best practices.

        This is a convenience constructor that sets sensible security defaults:
        network disabled, read-only root filesystem, all capabilities dropped,
        no privilege escalation, runs as `nobody`, and uses an init process.

        The root filesystem is read-only; writable tmpfs mounts are provided at
        `/tmp` and the working directory.

        Args:
            image: Docker image to use.
            env_vars: Baseline environment variables to set in the container.
            work_dir: Working directory inside the container.
            memory_limit: Memory limit (e.g. '512m', '1g').
            cpu_limit: CPU limit (e.g. 1.0 for one CPU).
            pids_limit: Maximum number of PIDs in the container.
        """
        return cls(
            image=image,
            env_vars=env_vars,
            work_dir=work_dir,
            network_disabled=True,
            read_only=True,
            cap_drop=['ALL'],
            security_opt=['no-new-privileges'],
            user='nobody',
            pids_limit=pids_limit,
            tmpfs={'/tmp': 'noexec,nosuid,size=64m', work_dir: 'size=128m'},
            init=True,
            memory_limit=memory_limit,
            cpu_limit=cpu_limit,
        )

    @property
    def capabilities(self) -> frozenset[EnvToolName]:  # pragma: lax no cover
        return frozenset(
            {
                'shell',
                'read_file',
                'write_file',
                'edit_file',
            }
        )

    async def __aenter__(self) -> Self:  # pragma: lax no cover
        await anyio.to_thread.run_sync(self._setup)
        return self

    def _setup(self) -> None:
        """Start container (sync, runs in executor)."""
        if self._container is not None:
            return
        self._client = docker.from_env()

        # Create and start container
        kwargs: dict[str, Any] = {
            'image': self._image,
            'command': 'sleep infinity',
            'detach': True,
            'working_dir': self._work_dir,
            'environment': self._env_vars,
            'auto_remove': False,
        }
        if self._volumes:
            kwargs['volumes'] = self._volumes
        if self._memory_limit:
            kwargs['mem_limit'] = self._memory_limit
        if self._cpu_limit:
            kwargs['nano_cpus'] = int(self._cpu_limit * 1e9)
        if self._pids_limit is not None:
            kwargs['pids_limit'] = self._pids_limit
        if self._network_disabled:
            kwargs['network_disabled'] = True
        if self._read_only:
            kwargs['read_only'] = True
        if self._cap_drop:
            kwargs['cap_drop'] = self._cap_drop
        if self._security_opt:
            kwargs['security_opt'] = self._security_opt
        if self._user:
            kwargs['user'] = self._user
        if self._tmpfs:
            kwargs['tmpfs'] = self._tmpfs
        if self._init:
            kwargs['init'] = True

        self._container = cast(Container, self._client.containers.run(**kwargs))

        # Ensure work_dir exists
        self._container.exec_run(['mkdir', '-p', self._work_dir])

    async def __aexit__(self, *_args: Any) -> None:  # pragma: lax no cover
        if self._container is not None:  # pragma: no branch
            await anyio.to_thread.run_sync(self._teardown)

    def _teardown(self) -> None:
        """Stop and remove container (sync, runs in executor)."""
        if self._container is not None:  # pragma: no branch
            try:
                self._container.stop(timeout=5)
            except (DockerException, OSError):
                # Best-effort cleanup: container may already be stopped or removed
                pass
            try:
                self._container.remove(force=True)
            except (DockerException, OSError):
                # Best-effort cleanup: container may already be removed
                pass
            self._container = None
        if self._client is not None:
            try:
                self._client.close()
            except (DockerException, OSError):
                pass
            self._client = None

    @property
    def _required_container(self) -> Container:
        if self._container is None:
            raise RuntimeError('DockerEnvironment not started. Use `async with DockerEnvironment(...) as env:`')
        return self._container

    def _resolve_path(self, path: str) -> str:
        """Resolve a path relative to work_dir for Docker API calls.

        Docker API methods like `put_archive` and `get_archive` resolve
        paths against the container root `/`, not the working directory.
        This helper ensures relative paths are resolved against `work_dir`.
        """
        if not path.startswith('/'):
            return f'{self._work_dir}/{path}'
        return path

    async def create_process(
        self,
        command: str,
        *,
        env: dict[str, str] | None = None,
    ) -> ExecutionProcess:
        return _DockerEnvironmentProcess(self._required_container, command, self._work_dir, env=env)

    async def shell(
        self,
        command: str,
        *,
        timeout: float | None = 120,
        env: dict[str, str] | None = None,
    ) -> ExecutionResult:
        """Execute a command in the container."""
        if timeout is not None and timeout <= 0:
            raise ValueError(f'timeout must be positive or None, got {timeout}')

        def _exec() -> tuple[int, bytes]:
            if timeout is not None:
                # Note: GNU coreutils `timeout 0` means "no timeout" (wait forever),
                # so we validate timeout > 0 above to prevent surprising behavior.
                wrapped = f'timeout {math.ceil(timeout)} sh -c {_shell_escape(command)}'
            else:
                wrapped = command
            exec_kwargs: dict[str, Any] = {'workdir': self._work_dir}
            if env:
                exec_kwargs['environment'] = env
            exit_code, output = self._required_container.exec_run(
                ['sh', '-c', wrapped],
                **exec_kwargs,
            )
            return exit_code, output

        exit_code, output_bytes = await anyio.to_thread.run_sync(_exec)
        output = output_bytes.decode('utf-8', errors='replace')
        # timeout command returns 124 on timeout
        if exit_code == 124 and timeout is not None:
            output += '\n[Command timed out]'
        return ExecutionResult(output=output, exit_code=exit_code)

    async def read_file(self, path: str, *, offset: int = 0, limit: int = 2000) -> str | bytes:
        ext = posixpath.splitext(path)[1].lower()
        if ext in IMAGE_EXTENSIONS:
            return await anyio.to_thread.run_sync(self._read_file_bytes_sync, path)

        def _read() -> str | bytes:
            cmd = _build_read_file_cmd(path, offset=offset, limit=limit)
            exit_code, output = self._required_container.exec_run(['sh', '-c', cmd], workdir=self._work_dir)
            if exit_code != 0:
                raise FileNotFoundError(f'File not found or not readable: {path}')
            try:
                text = output.decode('utf-8')
            except UnicodeDecodeError:
                return self._read_file_bytes_sync(path)
            if text.startswith('__OFFSET_ERROR__:'):
                total_lines = int(text.split(':')[1].strip())
                raise ValueError(f'Offset {offset} exceeds file length ({total_lines} lines).')
            return text

        return await anyio.to_thread.run_sync(_read)

    def _read_file_bytes_sync(self, path: str) -> bytes:
        """Read raw file bytes using Docker's get_archive API."""
        try:
            bits, _ = self._required_container.get_archive(self._resolve_path(path))
        except NotFound:
            raise FileNotFoundError(f'File not found: {path}')
        # get_archive returns a tar stream
        tar_bytes = b''.join(bits)
        with tarfile.open(fileobj=io.BytesIO(tar_bytes)) as tar:
            members = tar.getmembers()
            if not members:  # pragma: no cover
                raise FileNotFoundError(f'File not found: {path}')
            extracted = tar.extractfile(members[0])
            if extracted is None:  # pragma: no cover
                raise FileNotFoundError(f'Cannot read file: {path}')
            return extracted.read()

    async def write_file(self, path: str, content: str | bytes) -> None:
        def _write() -> None:
            full_path = self._resolve_path(path)
            # Ensure parent directory exists
            parent = str(PurePosixPath(full_path).parent)
            self._required_container.exec_run(['mkdir', '-p', parent])

            data = content.encode('utf-8') if isinstance(content, str) else content
            _put_file(self._required_container, full_path, data)

        await anyio.to_thread.run_sync(_write)

    async def replace_str(
        self,
        path: str,
        old: str,
        new: str,
        *,
        replace_all: bool = False,
    ) -> int:
        def _edit() -> int:
            raw = self._read_file_bytes_sync(path)
            text = raw.decode('utf-8')
            new_text, count = apply_edit(text, old, new, path, replace_all=replace_all)
            _put_file(self._required_container, self._resolve_path(path), new_text.encode('utf-8'))
            return count

        return await anyio.to_thread.run_sync(_edit)

    async def is_alive(self) -> bool:
        """Check if the container is running.

        Returns:
            True if the container is running, False otherwise.
        """
        if self._container is None:
            return False

        def _check() -> bool:
            assert self._container is not None
            try:
                self._container.reload()
                return self._container.status == 'running'
            except (DockerException, OSError):
                return False

        return await anyio.to_thread.run_sync(_check)

__init__

__init__(
    *,
    image: str = "python:3.12-slim",
    env_vars: dict[str, str] | None = None,
    work_dir: str = "/workspace",
    volumes: dict[str, dict[str, str]] | None = None,
    memory_limit: str | None = None,
    cpu_limit: float | None = None,
    pids_limit: int | None = None,
    network_disabled: bool = False,
    read_only: bool = False,
    cap_drop: list[str] | None = None,
    security_opt: list[str] | None = None,
    user: str | None = None,
    tmpfs: dict[str, str] | None = None,
    init: bool = False
) -> None

Create a Docker environment.

Parameters:

Name Type Description Default
image str

Docker image to use. Pre-build custom images with any required packages before passing them here.

'python:3.12-slim'
env_vars dict[str, str] | None

Baseline environment variables to set in the container.

None
work_dir str

Working directory inside the container.

'/workspace'
volumes dict[str, dict[str, str]] | None

Volume mounts (Docker format).

None
memory_limit str | None

Memory limit (e.g. '512m', '1g').

None
cpu_limit float | None

CPU limit (e.g. 1.0 for one CPU).

None
pids_limit int | None

Maximum number of PIDs in the container (e.g. 256). Prevents fork bombs.

None
network_disabled bool

Whether to disable network access.

False
read_only bool

Whether to mount the root filesystem as read-only. Use with tmpfs to provide writable scratch space.

False
cap_drop list[str] | None

Linux capabilities to drop (e.g. ['ALL']).

None
security_opt list[str] | None

Security options (e.g. ['no-new-privileges']).

None
user str | None

User to run as inside the container (e.g. 'nobody').

None
tmpfs dict[str, str] | None

tmpfs mounts as {path: options} (e.g. {'/tmp': 'noexec,nosuid,size=64m'}).

None
init bool

Whether to use --init to run an init process as PID 1. Ensures proper signal handling and zombie reaping.

False
Source code in pydantic_ai_slim/pydantic_ai/environments/docker.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def __init__(
    self,
    *,
    image: str = 'python:3.12-slim',
    env_vars: dict[str, str] | None = None,
    work_dir: str = '/workspace',
    volumes: dict[str, dict[str, str]] | None = None,
    memory_limit: str | None = None,
    cpu_limit: float | None = None,
    pids_limit: int | None = None,
    network_disabled: bool = False,
    read_only: bool = False,
    cap_drop: list[str] | None = None,
    security_opt: list[str] | None = None,
    user: str | None = None,
    tmpfs: dict[str, str] | None = None,
    init: bool = False,
) -> None:
    """Create a Docker environment.

    Args:
        image: Docker image to use. Pre-build custom images with any
            required packages before passing them here.
        env_vars: Baseline environment variables to set in the container.
        work_dir: Working directory inside the container.
        volumes: Volume mounts (Docker format).
        memory_limit: Memory limit (e.g. '512m', '1g').
        cpu_limit: CPU limit (e.g. 1.0 for one CPU).
        pids_limit: Maximum number of PIDs in the container (e.g. 256).
            Prevents fork bombs.
        network_disabled: Whether to disable network access.
        read_only: Whether to mount the root filesystem as read-only.
            Use with `tmpfs` to provide writable scratch space.
        cap_drop: Linux capabilities to drop (e.g. `['ALL']`).
        security_opt: Security options (e.g. `['no-new-privileges']`).
        user: User to run as inside the container (e.g. `'nobody'`).
        tmpfs: tmpfs mounts as `{path: options}`
            (e.g. `{'/tmp': 'noexec,nosuid,size=64m'}`).
        init: Whether to use `--init` to run an init process as PID 1.
            Ensures proper signal handling and zombie reaping.
    """
    self._image = image
    self._env_vars = env_vars or {}
    self._work_dir = work_dir
    self._volumes = volumes
    self._memory_limit = memory_limit
    self._cpu_limit = cpu_limit
    self._pids_limit = pids_limit
    self._network_disabled = network_disabled
    self._read_only = read_only
    self._cap_drop = cap_drop
    self._security_opt = security_opt
    self._user = user
    self._tmpfs = tmpfs
    self._init = init

    self._client: docker.DockerClient | None = None
    self._container: Container | None = None

hardened classmethod

hardened(
    *,
    image: str = "python:3.12-slim",
    env_vars: dict[str, str] | None = None,
    work_dir: str = "/workspace",
    memory_limit: str = "512m",
    cpu_limit: float = 1.0,
    pids_limit: int = 256
) -> DockerEnvironment

Create a hardened Docker environment with security best practices.

This is a convenience constructor that sets sensible security defaults: network disabled, read-only root filesystem, all capabilities dropped, no privilege escalation, runs as nobody, and uses an init process.

The root filesystem is read-only; writable tmpfs mounts are provided at /tmp and the working directory.

Parameters:

Name Type Description Default
image str

Docker image to use.

'python:3.12-slim'
env_vars dict[str, str] | None

Baseline environment variables to set in the container.

None
work_dir str

Working directory inside the container.

'/workspace'
memory_limit str

Memory limit (e.g. '512m', '1g').

'512m'
cpu_limit float

CPU limit (e.g. 1.0 for one CPU).

1.0
pids_limit int

Maximum number of PIDs in the container.

256
Source code in pydantic_ai_slim/pydantic_ai/environments/docker.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
@classmethod
def hardened(
    cls,
    *,
    image: str = 'python:3.12-slim',
    env_vars: dict[str, str] | None = None,
    work_dir: str = '/workspace',
    memory_limit: str = '512m',
    cpu_limit: float = 1.0,
    pids_limit: int = 256,
) -> DockerEnvironment:
    """Create a hardened Docker environment with security best practices.

    This is a convenience constructor that sets sensible security defaults:
    network disabled, read-only root filesystem, all capabilities dropped,
    no privilege escalation, runs as `nobody`, and uses an init process.

    The root filesystem is read-only; writable tmpfs mounts are provided at
    `/tmp` and the working directory.

    Args:
        image: Docker image to use.
        env_vars: Baseline environment variables to set in the container.
        work_dir: Working directory inside the container.
        memory_limit: Memory limit (e.g. '512m', '1g').
        cpu_limit: CPU limit (e.g. 1.0 for one CPU).
        pids_limit: Maximum number of PIDs in the container.
    """
    return cls(
        image=image,
        env_vars=env_vars,
        work_dir=work_dir,
        network_disabled=True,
        read_only=True,
        cap_drop=['ALL'],
        security_opt=['no-new-privileges'],
        user='nobody',
        pids_limit=pids_limit,
        tmpfs={'/tmp': 'noexec,nosuid,size=64m', work_dir: 'size=128m'},
        init=True,
        memory_limit=memory_limit,
        cpu_limit=cpu_limit,
    )

shell async

shell(
    command: str,
    *,
    timeout: float | None = 120,
    env: dict[str, str] | None = None
) -> ExecutionResult

Execute a command in the container.

Source code in pydantic_ai_slim/pydantic_ai/environments/docker.py
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
async def shell(
    self,
    command: str,
    *,
    timeout: float | None = 120,
    env: dict[str, str] | None = None,
) -> ExecutionResult:
    """Execute a command in the container."""
    if timeout is not None and timeout <= 0:
        raise ValueError(f'timeout must be positive or None, got {timeout}')

    def _exec() -> tuple[int, bytes]:
        if timeout is not None:
            # Note: GNU coreutils `timeout 0` means "no timeout" (wait forever),
            # so we validate timeout > 0 above to prevent surprising behavior.
            wrapped = f'timeout {math.ceil(timeout)} sh -c {_shell_escape(command)}'
        else:
            wrapped = command
        exec_kwargs: dict[str, Any] = {'workdir': self._work_dir}
        if env:
            exec_kwargs['environment'] = env
        exit_code, output = self._required_container.exec_run(
            ['sh', '-c', wrapped],
            **exec_kwargs,
        )
        return exit_code, output

    exit_code, output_bytes = await anyio.to_thread.run_sync(_exec)
    output = output_bytes.decode('utf-8', errors='replace')
    # timeout command returns 124 on timeout
    if exit_code == 124 and timeout is not None:
        output += '\n[Command timed out]'
    return ExecutionResult(output=output, exit_code=exit_code)

is_alive async

is_alive() -> bool

Check if the container is running.

Returns:

Type Description
bool

True if the container is running, False otherwise.

Source code in pydantic_ai_slim/pydantic_ai/environments/docker.py
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
async def is_alive(self) -> bool:
    """Check if the container is running.

    Returns:
        True if the container is running, False otherwise.
    """
    if self._container is None:
        return False

    def _check() -> bool:
        assert self._container is not None
        try:
            self._container.reload()
            return self._container.status == 'running'
        except (DockerException, OSError):
            return False

    return await anyio.to_thread.run_sync(_check)

pydantic_ai.environments.memory

In-memory execution environment for testing.

All file operations use an in-memory dictionary. Shell commands are handled by an optional callback — if not provided, shell() raises RuntimeError.

MemoryEnvironment

Bases: ExecutionEnvironment

In-memory execution environment for testing.

File operations use an in-memory dictionary, making tests fast and isolated with no filesystem access. Shell commands can optionally be handled by a user-provided callback.

This is the testing counterpart to LocalEnvironment, analogous to how TestModel and FunctionModel relate to real model classes.

Usage
from pydantic_ai.environments.memory import MemoryEnvironment

env = MemoryEnvironment(files={'main.py': 'print("hello")'})
async with env:
    content = await env.read_file('main.py')
    assert 'hello' in content
Source code in pydantic_ai_slim/pydantic_ai/environments/memory.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class MemoryEnvironment(ExecutionEnvironment):
    """In-memory execution environment for testing.

    File operations use an in-memory dictionary, making tests fast and
    isolated with no filesystem access. Shell commands can optionally be
    handled by a user-provided callback.

    This is the testing counterpart to `LocalEnvironment`, analogous to
    how `TestModel` and `FunctionModel` relate to real model classes.

    Usage:
        ```python {test="skip" lint="skip"}
        from pydantic_ai.environments.memory import MemoryEnvironment

        env = MemoryEnvironment(files={'main.py': 'print("hello")'})
        async with env:
            content = await env.read_file('main.py')
            assert 'hello' in content
        ```
    """

    def __init__(
        self,
        files: dict[str, str | bytes] | None = None,
        *,
        command_handler: Callable[[str], ExecutionResult] | None = None,
    ) -> None:
        """Create an in-memory execution environment.

        Args:
            files: Initial files to populate the environment with.
                Keys are file paths, values are file contents (str or bytes).
            command_handler: Optional callback for `shell()` calls.
                Receives the command string and returns an `ExecutionResult`.
                If not provided, `shell()` raises `RuntimeError`.
        """
        self._files: dict[str, str | bytes] = {}
        if files:
            for path, content in files.items():
                self._files[self._normalize(path)] = content
        self._command_handler = command_handler

    @property
    def capabilities(self) -> frozenset[EnvToolName]:
        caps: set[EnvToolName] = {'read_file', 'write_file', 'edit_file'}
        if self._command_handler is not None:
            caps.add('shell')
        return frozenset(caps)

    @property
    def files(self) -> Mapping[str, str | bytes]:
        """Read-only view of the in-memory file system.

        Keys are normalized file paths, values are file contents.
        Useful for test assertions against raw file content without the
        line-number formatting that [`read_file()`][pydantic_ai.environments.memory.MemoryEnvironment.read_file] adds.
        """
        return self._files

    @staticmethod
    def _normalize(path: str) -> str:
        """Normalize a path for consistent storage."""
        normalized = posixpath.normpath(path)
        # Strip leading './' or '/'
        if normalized.startswith('./'):  # pragma: no cover
            normalized = normalized[2:]
        elif normalized.startswith('/'):
            normalized = normalized[1:]
        return normalized

    async def shell(
        self,
        command: str,
        *,
        timeout: float | None = 120,
        env: dict[str, str] | None = None,
    ) -> ExecutionResult:
        """Execute a command using the configured handler.

        Args:
            command: The shell command to execute.
            timeout: Ignored for MemoryEnvironment.
            env: Ignored for MemoryEnvironment.

        Returns:
            The result from the command handler.

        Raises:
            RuntimeError: If no command_handler was provided.
        """
        if self._command_handler is None:
            raise RuntimeError(
                'MemoryEnvironment has no command_handler configured. '
                'Pass command_handler= to the constructor to handle shell() calls.'
            )
        return self._command_handler(command)

    async def read_file(self, path: str, *, offset: int = 0, limit: int = 2000) -> str | bytes:
        normalized = self._normalize(path)

        # Check if path is a "directory" (any file starts with path/)
        if any(k.startswith(normalized + '/') for k in self._files):
            if normalized not in self._files:
                raise FileNotFoundError(f"'{path}' is a directory, not a file.")

        if normalized not in self._files:
            raise FileNotFoundError(f'File not found: {path}')

        content = self._files[normalized]

        # Return raw bytes for image files
        ext = posixpath.splitext(normalized)[1].lower()
        if ext in IMAGE_EXTENSIONS:
            if isinstance(content, bytes):
                return content
            return content.encode('utf-8')

        # Text mode
        if isinstance(content, bytes):
            try:
                text = content.decode('utf-8')
            except UnicodeDecodeError:
                return content
        else:
            text = content

        return format_lines(text, offset, limit)

    async def write_file(self, path: str, content: str | bytes) -> None:
        self._files[self._normalize(path)] = content

    async def replace_str(
        self,
        path: str,
        old: str,
        new: str,
        *,
        replace_all: bool = False,
    ) -> int:
        normalized = self._normalize(path)
        if normalized not in self._files:
            raise FileNotFoundError(f'File not found: {path}')

        content = self._files[normalized]
        text = content.decode('utf-8') if isinstance(content, bytes) else content
        new_text, count = apply_edit(text, old, new, path, replace_all=replace_all)
        self._files[normalized] = new_text
        return count

__init__

__init__(
    files: dict[str, str | bytes] | None = None,
    *,
    command_handler: (
        Callable[[str], ExecutionResult] | None
    ) = None
) -> None

Create an in-memory execution environment.

Parameters:

Name Type Description Default
files dict[str, str | bytes] | None

Initial files to populate the environment with. Keys are file paths, values are file contents (str or bytes).

None
command_handler Callable[[str], ExecutionResult] | None

Optional callback for shell() calls. Receives the command string and returns an ExecutionResult. If not provided, shell() raises RuntimeError.

None
Source code in pydantic_ai_slim/pydantic_ai/environments/memory.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def __init__(
    self,
    files: dict[str, str | bytes] | None = None,
    *,
    command_handler: Callable[[str], ExecutionResult] | None = None,
) -> None:
    """Create an in-memory execution environment.

    Args:
        files: Initial files to populate the environment with.
            Keys are file paths, values are file contents (str or bytes).
        command_handler: Optional callback for `shell()` calls.
            Receives the command string and returns an `ExecutionResult`.
            If not provided, `shell()` raises `RuntimeError`.
    """
    self._files: dict[str, str | bytes] = {}
    if files:
        for path, content in files.items():
            self._files[self._normalize(path)] = content
    self._command_handler = command_handler

files property

files: Mapping[str, str | bytes]

Read-only view of the in-memory file system.

Keys are normalized file paths, values are file contents. Useful for test assertions against raw file content without the line-number formatting that [read_file()][pydantic_ai.environments.memory.MemoryEnvironment.read_file] adds.

shell async

shell(
    command: str,
    *,
    timeout: float | None = 120,
    env: dict[str, str] | None = None
) -> ExecutionResult

Execute a command using the configured handler.

Parameters:

Name Type Description Default
command str

The shell command to execute.

required
timeout float | None

Ignored for MemoryEnvironment.

120
env dict[str, str] | None

Ignored for MemoryEnvironment.

None

Returns:

Type Description
ExecutionResult

The result from the command handler.

Raises:

Type Description
RuntimeError

If no command_handler was provided.

Source code in pydantic_ai_slim/pydantic_ai/environments/memory.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
async def shell(
    self,
    command: str,
    *,
    timeout: float | None = 120,
    env: dict[str, str] | None = None,
) -> ExecutionResult:
    """Execute a command using the configured handler.

    Args:
        command: The shell command to execute.
        timeout: Ignored for MemoryEnvironment.
        env: Ignored for MemoryEnvironment.

    Returns:
        The result from the command handler.

    Raises:
        RuntimeError: If no command_handler was provided.
    """
    if self._command_handler is None:
        raise RuntimeError(
            'MemoryEnvironment has no command_handler configured. '
            'Pass command_handler= to the constructor to handle shell() calls.'
        )
    return self._command_handler(command)