Skip to content

Test 03 Message Provider

Producer test of example message.

This test will read a pact between the message handler and the message provider and then validate the pact against the provider.

Attributes

PACT_DIR = Path(__file__).parent.parent.parent / 'pacts'.resolve() module-attribute

RESPONSES: dict[str, dict[str, str]] = {'a request to write test.txt': {'function_name': 'send_write_event'}, 'a request to read test.txt': {'function_name': 'send_read_event'}} module-attribute

Classes

Functions

message_producer(message: str, metadata: dict[str, Any] | None) -> Message

Function to produce a message for the provider.

This specific implementation is rather simple as it returns static content. In fact, a straight mapping of the message names to the expected responses could be given to the message handler directly. However, this function is provided to demonstrate the capability of the message handler to be very generic.

PARAMETER DESCRIPTION
message

The message name.

TYPE: str

metadata

Any metadata associated with the message which can be used to determine the response.

TYPE: dict[str, Any] | None

Source code in examples/tests/v3/test_03_message_provider.py
def message_producer(message: str, metadata: dict[str, Any] | None) -> Message:  # noqa: ARG001
    """
    Function to produce a message for the provider.

    This specific implementation is rather simple as it returns static content.
    In fact, a straight mapping of the message names to the expected responses
    could be given to the message handler directly. However, this function is
    provided to demonstrate the capability of the message handler to be very
    generic.

    Args:
        message:
            The message name.

        metadata:
            Any metadata associated with the message which can be used to
            determine the response.
    """
    producer = FileSystemMessageProducer()
    producer.queue = MagicMock()

    return Message(
        contents=json.dumps(RESPONSES[message]).encode("utf-8"),
        content_type="application/json",
        metadata=None,
    )

test_producer() -> None

Test the message producer.

Source code in examples/tests/v3/test_03_message_provider.py
def test_producer() -> None:
    """
    Test the message producer.
    """
    verifier = Verifier("provider").message_handler(message_producer)
    verifier.verify()