Skip to content

Message Producer

Message producer for non-HTTP interactions.

This modules implements a very basic message producer which could send to an eventing system, such as Kafka, or a message queue.

Note that the code in this module is agnostic of Pact (i.e., this would be your production code). The pact-python dependency only appears in the tests. This is because the consumer is not concerned with Pact, only the tests are.

Classes

FileSystemAction

Bases: Enum

Represents a file system action.

Attributes

READ = 'READ' class-attribute instance-attribute
WRITE = 'WRITE' class-attribute instance-attribute

FileSystemEvent

Bases: NamedTuple

Represents a file system event.

Attributes

action: Literal[FileSystemAction.READ, FileSystemAction.WRITE] instance-attribute
contents: str | None instance-attribute
path: str instance-attribute

FileSystemMessageProducer()

A message producer for file system events.

Source code in examples/src/message_producer.py
def __init__(self) -> None:
    """
    Initialize the message producer.
    """
    self.queue = MockMessageQueue()

Attributes

queue = MockMessageQueue() instance-attribute

Functions

send_read_event(filename: str) -> None

Send a read event to a message queue.

:param filename: The name of the file.

Source code in examples/src/message_producer.py
def send_read_event(self, filename: str) -> None:
    """
    Send a read event to a message queue.

    :param filename: The name of the file.
    """
    message = FileSystemEvent(
        action=FileSystemAction.READ,
        path=filename,
        contents=None,
    )
    self.send_to_queue(message)
send_to_queue(message: FileSystemEvent) -> None

Send a message to a message queue.

:param message: The message to send.

Source code in examples/src/message_producer.py
def send_to_queue(self, message: FileSystemEvent) -> None:
    """
    Send a message to a message queue.

    :param message: The message to send.
    """
    self.queue.send(
        json.dumps({
            "action": message.action.value,
            "path": message.path,
            "contents": message.contents,
        })
    )
send_write_event(filename: str, contents: str) -> None

Send a write event to a message queue.

PARAMETER DESCRIPTION
filename

The name of the file.

TYPE: str

contents

The contents of the file.

TYPE: str

Source code in examples/src/message_producer.py
def send_write_event(self, filename: str, contents: str) -> None:
    """
    Send a write event to a message queue.

    Args:
        filename: The name of the file.
        contents: The contents of the file.
    """
    message = FileSystemEvent(
        action=FileSystemAction.WRITE,
        path=filename,
        contents=contents,
    )
    self.send_to_queue(message)

MockMessageQueue()

A mock message queue.

Source code in examples/src/message_producer.py
def __init__(self) -> None:
    """
    Initialize the message queue.
    """
    self.messages: list[str] = []

Attributes

messages: list[str] = [] instance-attribute

Functions

send(message: str) -> None

Send a message to the queue.

PARAMETER DESCRIPTION
message

The message to send.

TYPE: str

Source code in examples/src/message_producer.py
def send(self, message: str) -> None:
    """
    Send a message to the queue.

    Args:
        message: The message to send.
    """
    self.messages.append(message)