Skip to content

Message

Handler for non-HTTP interactions.

This module implements a very basic handler to handle JSON payloads which might be sent from Kafka, or some queueing system. Unlike a HTTP interaction, the handler is solely responsible for processing the message, and does not necessarily need to send a response.

Classes

Filesystem()

Filesystem interface.

Source code in examples/src/message.py
def __init__(self) -> None:
    """Initialize the filesystem connection."""

Functions

read(file: str) -> str

Read contents from a file.

Source code in examples/src/message.py
def read(self, file: str) -> str:
    """Read contents from a file."""
    raise NotImplementedError
write(_file: str, _contents: str) -> None

Write contents to a file.

Source code in examples/src/message.py
def write(self, _file: str, _contents: str) -> None:
    """Write contents to a file."""
    raise NotImplementedError

Handler()

Message queue handler.

This class is responsible for handling messages from the queue.

This ensures the underlying filesystem is ready to be used.

Source code in examples/src/message.py
def __init__(self) -> None:
    """
    Initialize the handler.

    This ensures the underlying filesystem is ready to be used.
    """
    self.fs = Filesystem()

Attributes

fs = Filesystem() instance-attribute

Functions

process(event: Dict[str, Any]) -> Union[str, None]

Process an event from the queue.

The event is expected to be a dictionary with the following mandatory keys:

  • action: The action to be performed, either READ or WRITE.
  • path: The path to the file to be read or written.

The event may also contain an optional contents key, which is the contents to be written to the file. If the contents key is not present, an empty file will be written.

Source code in examples/src/message.py
def process(self, event: Dict[str, Any]) -> Union[str, None]:
    """
    Process an event from the queue.

    The event is expected to be a dictionary with the following mandatory
    keys:

    - `action`: The action to be performed, either `READ` or `WRITE`.
    - `path`: The path to the file to be read or written.

    The event may also contain an optional `contents` key, which is the
    contents to be written to the file. If the `contents` key is not
    present, an empty file will be written.
    """
    self.validate_event(event)

    if event["action"] == "WRITE":
        self.fs.write(event["path"], event.get("contents", ""))
        return None
    if event["action"] == "READ":
        return self.fs.read(event["path"])

    msg = f"Invalid action: {event['action']!r}"
    raise ValueError(msg)
validate_event(event: Union[Dict[str, Any], Any]) -> None staticmethod

Validates the event received from the queue.

The event is expected to be a dictionary with the following mandatory keys:

  • action: The action to be performed, either READ or WRITe.
  • path: The path to the file to be read or written.
Source code in examples/src/message.py
@staticmethod
def validate_event(event: Union[Dict[str, Any], Any]) -> None:  # noqa: ANN401
    """
    Validates the event received from the queue.

    The event is expected to be a dictionary with the following mandatory
    keys:

    - `action`: The action to be performed, either `READ` or `WRITe`.
    - `path`: The path to the file to be read or written.
    """
    if not isinstance(event, dict):
        msg = "Event must be a dictionary."
        raise TypeError(msg)
    if "action" not in event:
        msg = "Event must contain an 'action' key."
        raise ValueError(msg)
    if "path" not in event:
        msg = "Event must contain a 'path' key."
        raise ValueError(msg)
    if event["action"] not in ["READ", "WRITE"]:
        msg = "Event must contain a valid 'action' key."
        raise ValueError(msg)
    try:
        Path(event["path"])
    except TypeError as err:
        msg = "Event must contain a valid 'path' key."
        raise ValueError(msg) from err