Workspace management commands.
This module provides commands for creating, managing, and cleaning up F1 analysis workspaces.
create_workspace(workspace_id=None, description=None, max_retries=3)
Create a new workspace directory structure.
Implements collision retry logic for auto-generated workspace IDs.
| PARAMETER | DESCRIPTION |
workspace_id | Workspace identifier. Auto-generated if None. TYPE: str | None DEFAULT: None |
description | Optional description for the workspace. TYPE: str | None DEFAULT: None |
max_retries | Maximum retry attempts for UUID collision (default: 3). TYPE: int DEFAULT: 3 |
| RETURNS | DESCRIPTION |
dict | Dictionary with workspace information: |
dict | { "workspace_id": str, "workspace_path": str, "created_at": str, |
dict | |
| RAISES | DESCRIPTION |
ValueError | If workspace already exists for the given workspace_id. |
RuntimeError | If failed to generate unique workspace ID after max_retries. |
Source code in packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
| def create_workspace(workspace_id: str | None = None, description: str | None = None, max_retries: int = 3) -> dict:
"""Create a new workspace directory structure.
Implements collision retry logic for auto-generated workspace IDs.
Args:
workspace_id: Workspace identifier. Auto-generated if None.
description: Optional description for the workspace.
max_retries: Maximum retry attempts for UUID collision (default: 3).
Returns:
Dictionary with workspace information:
{
"workspace_id": str,
"workspace_path": str,
"created_at": str,
}
Raises:
ValueError: If workspace already exists for the given workspace_id.
RuntimeError: If failed to generate unique workspace ID after max_retries.
"""
if workspace_id is not None:
# Explicit workspace_id provided - no retry logic
if workspace_exists(workspace_id):
raise ValueError(f"Workspace already exists for workspace ID: {workspace_id}")
return _create_workspace_internal(workspace_id, description)
# Auto-generate workspace ID with collision retry
for _attempt in range(max_retries):
workspace_id = generate_workspace_id()
if not workspace_exists(workspace_id):
return _create_workspace_internal(workspace_id, description)
raise RuntimeError(f"Failed to generate unique workspace ID after {max_retries} attempts")
|
get_workspace_path(workspace_id)
Get the workspace path for a given workspace ID.
| PARAMETER | DESCRIPTION |
workspace_id | The workspace identifier. TYPE: str |
| RETURNS | DESCRIPTION |
Path | Path to the workspace directory. |
Source code in packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
| def get_workspace_path(workspace_id: str) -> Path:
"""Get the workspace path for a given workspace ID.
Args:
workspace_id: The workspace identifier.
Returns:
Path to the workspace directory.
"""
return get_workspace_base() / workspace_id
|
workspace_exists(workspace_id)
Check if a workspace exists for the given workspace ID.
| PARAMETER | DESCRIPTION |
workspace_id | The workspace identifier. TYPE: str |
| RETURNS | DESCRIPTION |
bool | True if workspace exists, False otherwise. |
Source code in packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
| def workspace_exists(workspace_id: str) -> bool:
"""Check if a workspace exists for the given workspace ID.
Args:
workspace_id: The workspace identifier.
Returns:
True if workspace exists, False otherwise.
"""
workspace_path = get_workspace_path(workspace_id)
return workspace_path.exists() and workspace_path.is_dir()
|
get_workspace_info(workspace_id)
Get information about a workspace.
| PARAMETER | DESCRIPTION |
workspace_id | The workspace identifier. TYPE: str |
| RETURNS | DESCRIPTION |
dict | Dictionary with workspace metadata including: |
dict | { "workspace_id": str, "workspace_path": str, "created_at": str, "last_accessed": str, "description": str (optional), "data_files": list[str], "chart_files": list[str], |
dict | |
| RAISES | DESCRIPTION |
ValueError | If workspace doesn't exist. |
Source code in packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
| def get_workspace_info(workspace_id: str) -> dict:
"""Get information about a workspace.
Args:
workspace_id: The workspace identifier.
Returns:
Dictionary with workspace metadata including:
{
"workspace_id": str,
"workspace_path": str,
"created_at": str,
"last_accessed": str,
"description": str (optional),
"data_files": list[str],
"chart_files": list[str],
}
Raises:
ValueError: If workspace doesn't exist.
"""
if not workspace_exists(workspace_id):
raise ValueError(f"Workspace does not exist for workspace ID: {workspace_id}")
workspace_path = get_workspace_path(workspace_id)
metadata_path = workspace_path / ".metadata.json"
if metadata_path.exists():
with open(metadata_path) as f:
metadata = json.load(f)
else:
metadata = {
"workspace_id": workspace_id,
"created_at": "unknown",
"last_accessed": "unknown",
}
# List data and chart files
data_dir = workspace_path / "data"
chart_dir = workspace_path / "charts"
data_files = [f.name for f in data_dir.iterdir() if f.is_file()] if data_dir.exists() else []
chart_files = [f.name for f in chart_dir.iterdir() if f.is_file()] if chart_dir.exists() else []
return {
**metadata,
"workspace_path": str(workspace_path),
"data_files": sorted(data_files),
"chart_files": sorted(chart_files),
}
|
list_workspaces(show_all=False)
List all workspaces.
| PARAMETER | DESCRIPTION |
show_all | If True, include all workspaces. If False, only recent ones. TYPE: bool DEFAULT: False |
| RETURNS | DESCRIPTION |
list[dict] | List of workspace information dictionaries sorted by last_accessed (newest first). |
Source code in packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
| def list_workspaces(show_all: bool = False) -> list[dict]:
"""List all workspaces.
Args:
show_all: If True, include all workspaces. If False, only recent ones.
Returns:
List of workspace information dictionaries sorted by last_accessed (newest first).
"""
workspace_base = get_workspace_base()
if not workspace_base.exists():
return []
workspaces = []
for workspace_dir in workspace_base.iterdir():
if not workspace_dir.is_dir():
continue
workspace_id = workspace_dir.name
try:
info = get_workspace_info(workspace_id)
workspaces.append(info)
except Exception:
# Skip corrupted workspaces
continue
# Sort by last_accessed (newest first)
def get_last_accessed(ws):
try:
return datetime.fromisoformat(ws["last_accessed"].rstrip("Z"))
except Exception:
return datetime.min
workspaces.sort(key=get_last_accessed, reverse=True)
if not show_all:
# Limit to 10 most recent
workspaces = workspaces[:10]
return workspaces
|
remove_workspace(workspace_id)
Remove a workspace and all its contents.
| PARAMETER | DESCRIPTION |
workspace_id | The workspace identifier. TYPE: str |
| RAISES | DESCRIPTION |
ValueError | If workspace doesn't exist. |
Source code in packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
| def remove_workspace(workspace_id: str) -> None:
"""Remove a workspace and all its contents.
Args:
workspace_id: The workspace identifier.
Raises:
ValueError: If workspace doesn't exist.
"""
if not workspace_exists(workspace_id):
raise ValueError(f"Workspace does not exist for workspace ID: {workspace_id}")
workspace_path = get_workspace_path(workspace_id)
shutil.rmtree(workspace_path)
|
clean_workspaces(older_than_days=None, remove_all=False)
Clean up old workspaces.
| PARAMETER | DESCRIPTION |
older_than_days | Remove workspaces not accessed in this many days. TYPE: int | None DEFAULT: None |
remove_all | If True, remove all workspaces (overrides older_than_days). TYPE: bool DEFAULT: False |
| RETURNS | DESCRIPTION |
dict | Dictionary with cleanup statistics: |
dict | { "removed_count": int, "removed_workspaces": list[str], |
dict | |
Source code in packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
| def clean_workspaces(older_than_days: int | None = None, remove_all: bool = False) -> dict:
"""Clean up old workspaces.
Args:
older_than_days: Remove workspaces not accessed in this many days.
remove_all: If True, remove all workspaces (overrides older_than_days).
Returns:
Dictionary with cleanup statistics:
{
"removed_count": int,
"removed_workspaces": list[str],
}
"""
workspace_base = get_workspace_base()
if not workspace_base.exists():
return {"removed_count": 0, "removed_workspaces": []}
removed_workspaces = []
for workspace_dir in workspace_base.iterdir():
if not workspace_dir.is_dir():
continue
workspace_id = workspace_dir.name
# Check if should remove
should_remove = remove_all
if not should_remove and older_than_days is not None:
try:
info = get_workspace_info(workspace_id)
last_accessed = datetime.fromisoformat(info["last_accessed"].rstrip("Z")).replace(tzinfo=UTC)
age = datetime.now(UTC) - last_accessed
if age > timedelta(days=older_than_days):
should_remove = True
except Exception:
# If can't read metadata, don't remove (play it safe)
continue
if should_remove:
try:
remove_workspace(workspace_id)
removed_workspaces.append(workspace_id)
except Exception:
# Skip if removal fails
continue
return {
"removed_count": len(removed_workspaces),
"removed_workspaces": removed_workspaces,
}
|
Update the last_accessed timestamp in workspace metadata.
| PARAMETER | DESCRIPTION |
workspace_id | The workspace identifier. TYPE: str |
| RAISES | DESCRIPTION |
ValueError | If workspace doesn't exist. |
Source code in packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
| def update_workspace_metadata(workspace_id: str) -> None:
"""Update the last_accessed timestamp in workspace metadata.
Args:
workspace_id: The workspace identifier.
Raises:
ValueError: If workspace doesn't exist.
"""
if not workspace_exists(workspace_id):
raise ValueError(f"Workspace does not exist for workspace ID: {workspace_id}")
workspace_path = get_workspace_path(workspace_id)
metadata_path = workspace_path / ".metadata.json"
# Read existing metadata or create new
if not metadata_path.exists():
# Metadata missing, recreate it
now = datetime.now(UTC)
metadata = {
"workspace_id": workspace_id,
"created_at": now.isoformat() + "Z",
"last_accessed": now.isoformat() + "Z",
}
else:
with open(metadata_path) as f:
metadata = json.load(f)
metadata["last_accessed"] = datetime.now(UTC).isoformat() + "Z"
# Atomic write using tempfile + rename
# Create temp file in same directory to ensure same filesystem
fd, temp_path = tempfile.mkstemp(dir=workspace_path, prefix=".metadata.tmp.", suffix=".json")
try:
with os.fdopen(fd, "w") as f:
json.dump(metadata, f, indent=2)
# Atomic rename (POSIX guarantee)
os.replace(temp_path, metadata_path)
except Exception:
# Clean up temp file on error
with suppress(Exception):
os.unlink(temp_path)
raise
|