The State Machine

Overview

The State Machine is a core component that manages the operating modes of the car. It ensures thread-safe mode transitions and controls the active processes for each mode.

Adding a New Mode

To add a new mode, you need to modify the SystemMode class in src/statemachine/systemMode.py.

Add a new entry to the SystemMode Enum. Each mode defines which processes are enabled and their configuration.

NEW_MODE = {
    "mode": "new_mode",
    "camera": {
        "process": {
            "enabled": True,
        },
        "thread": {
            "resolution": "720p",
        }
    },
    # ... other processes
}

Configuration Details:

  • process: The enabled flag is actually a message sent to the process (e.g. processCamera). The process uses this flag to pause or resume its owning threads.

  • thread: This dictionary is for the threads owned by the process. You can pass custom messages here, such as "resolution": "720p".

    Note: The “resolution” key is just an example. Changing this value does not automatically change the camera’s actual resolution unless the thread logic is implemented to read this value and apply the change.

Adding a Transition

To allow transitions to your new mode, modify the TransitionTable class in src/statemachine/transitionTable.py.

First, ensure you have created SystemMode.NEW_MODE as described above. Then, add the allowed transitions for each mode in the _TRANSITIONS dictionary.

You need to define: 1. Which existing modes can transition TO your new mode. 2. Which modes your new mode can transition TO (so you can switch out of it).

SystemMode.DEFAULT: { # DEFAULT is the startup mode
    "dashboard_new_button": SystemMode.NEW_MODE,
    # ...
},

# Define transitions FROM your new mode to others
SystemMode.NEW_MODE: {
    "dashboard_default_button": SystemMode.DEFAULT,
    "dashboard_auto_button": SystemMode.AUTO,
    # ...
}

Usage & Handling State Changes

To use the State Machine, you get the instance and request mode changes. Processes and threads must subscribe to state changes to react accordingly.

Tip

For complete implementation details, please refer to the following files in the Brain repository:

  • src/dashboard/processDashboard.py (Requesting mode changes)

  • src/hardware/camera/processCamera.py (Handling process state changes)

  • src/hardware/camera/threads/threadCamera.py (Handling thread state changes)

1. Requesting a Mode Change

This example from processDashboard.py shows how to initialize the State Machine and request a mode change.

from src.statemachine.stateMachine import StateMachine

class processDashboard(WorkerProcess):
    def __init__(self, ...):
        # ...
        # Get the singleton instance of the State Machine
        self.stateMachine = StateMachine.get_instance()
        # ...

    def handle_driving_mode(self, dataDict):
        """Handle driving mode change request."""
        # Request a mode change. The argument is the TRANSITION NAME defined in the TransitionTable.
        self.stateMachine.request_mode(f"dashboard_{dataDict['Value']}_button")

2. Handling State Changes in a Process

In the process (e.g. processCamera.py), you subscribe to StateChange messages in __init__. The handler checks the enabled flag to pause or resume threads.

from src.utils.messages.allMessages import StateChange
from src.utils.messages.messageHandlerSubscriber import messageHandlerSubscriber
from src.statemachine.systemMode import SystemMode

class processCamera(WorkerProcess):
    def __init__(self, queueList, ...):
        self.queuesList = queueList
        # ...
        # Subscribe to StateChange messages
        self.stateChangeSubscriber = messageHandlerSubscriber(self.queuesList, StateChange, "lastOnly", True)

        super(processCamera, self).__init__(self.queuesList, ...)

    # This method is automatically called by the WorkerProcess (parent class) loop
    def state_change_handler(self):
        message = self.stateChangeSubscriber.receive()
        if message is not None:
            # Access the configuration for this process in the new mode
            modeDict = SystemMode[message].value["camera"]["process"]

            if modeDict["enabled"] == True:
                self.resume_threads()
            elif modeDict["enabled"] == False:
                self.pause_threads()

3. Handling State Changes in a Thread

Threads (e.g. threadCamera.py) can also subscribe to StateChange to receive custom configurations (like the “resolution” example).

from src.utils.messages.allMessages import StateChange
from src.utils.messages.messageHandlerSubscriber import messageHandlerSubscriber
from src.statemachine.systemMode import SystemMode

class threadCamera(ThreadWithStop):
    def __init__(self, queuesList, ...):
        # ...
        self.queuesList = queuesList
        self.subscribe()
        # ...

    def subscribe(self):
        # Subscribe to StateChange messages
        self.stateChangeSubscriber = messageHandlerSubscriber(self.queuesList, StateChange, "lastOnly", True)

    # This method is automatically called by the ThreadWithStop (parent class) loop
    def thread_work(self):
        # ...
        message = self.stateChangeSubscriber.receive()
        if message is not None:
             # Access the thread configuration
             threadConfig = SystemMode[message].value["camera"]["thread"]
             if "resolution" in threadConfig:
                 print(f"Requested resolution: {threadConfig['resolution']}")
                 # Apply resolution logic here...