Lately, I’ve been working on a hobby project that has many scenarios where operations are getting scheduled to be handled asynchronously in the future - sometimes after a few minutes, sometimes after several hours. Since I’m using Behat for the end-to-end tests, I started exploring ways to test these delayed scenarios as well. There are some good libraries for hooking into the Symfony Messenger transports to verify that messages have been dispatched/scheduled, but my goal with Behat is to focus more on the outcome rather than just the scheduling. In this article, we’ll look at how to “travel” through time, process the scheduled jobs, and assert the expected results.

Imagine that we’re working on an application for incident reporting. Every incident should have a priority assigned, which will be automatically escalated unless the incident is resolved, using the following logic:

  • All incidents are created with a low priority
  • After an hour, the priority is escalated to medium
  • After 5 hours, the priority is escalated to high

We can map these requirements in Gherkin scenarios like this:

Feature: Time-based incident priority escalation

    Scenario: Incident has just been created
        When an incident is created
        Then the incident should have a low priority

    Scenario: Incident not resolved for 1 hour
        Given an incident is created
        And an hour has passed
        Then the incident should have a medium priority

    Scenario: Incident not resolved for 5 hours
        Given an incident is created
        And 5 hours have passed
        Then the incident should have a high priority
Note: This is a simplified example for demonstration purposes. The namespaces in the classes will be omitted, but you can organize them in your application as needed. For simplicity, we'll be using the Symfony Messenger functionalities directly. If you're interested in abstracting these functionalities, I've written an article on that topic few years ago.

First, let’s define our very basic Incident entity. Besides the properties, it’ll have an escalate() method for updating the priority to the next one. As defined in the requirements, every incident will have a low priority by default.

use Doctrine\ORM\Mapping as ORM;

#[ORM\Entity]
#[ORM\Table(name: 'incidents')]
class Incident
{
    #[ORM\Id]
    #[ORM\GeneratedValue]
    #[ORM\Column]
    private int $id;

    #[ORM\Column]
    private string $title;

    #[ORM\Column(enumType: Priority::class)]
    private Priority $priority = Priority::Low;

    #[ORM\Column]
    private bool $resolved = false;

    // ...

    public function escalate(): void
    {
        // check if already resolved, etc.

        $this->setPriority(
            $this->priority->next()
        );
    }
}

For the list of priorities, we’ll have a Priority enum. The enum will also contain the logic for deciding the next priority.

enum Priority: string
{
    case Low = 'low';
    case Medium = 'medium';
    case High = 'high';

    public function next(): Priority
    {
        return match ($this) {
            self::Low => self::Medium,
            self::Medium => self::High,
            self::High => self::High
        };
    }

    // ...
}

In the application, we’ll have defined two message buses - one for commands and one for the background jobs. The commands will always be handled synchronously by the command bus, so we’ll have a commands transport defined that will use the Messenger’s sync transport. The background jobs will be handled asynchronously by the other message bus, so we’ll use a transport that supports delayed handling, e.g. Redis.

framework:
    messenger:
        transports:
            background_jobs:
                dsn: '%env(MESSENGER_TRANSPORT_DSN_BACKGROUND_JOBS)%'
            commands: 'sync://'

        buses:
            background_jobs.bus: ~
            command.bus: ~

# ...

Next, let’s go through the list of steps used in the scenarios above, and write the Behat definitions for each of them.

We’ll be using the FriendsOfBehat/SymfonyExtension Behat extension, which is the officially recommended replacement of the legacy (and now abandoned) Behat/Symfony2Extension extension. This extension will automatically register our contexts as services, as well as autowire and autoconfigure them, thus making it possible to easily inject any other service into them. Additionally, we’ll use its integration for Mink as a driver here, but the same should work for any other driver.

Creating the incident

For simplicity, our application will create a new incident whenever a GET request is received on the “/incidents” path, so we’ll use the Mink session to open that page.

use Behat\Behat\Context\Context;
use Behat\Mink\Session;
use Behat\Step\When;

final class IncidentsContext implements Context
{
    public function __construct(
        private Session $session,
        // ...
    ) {
    }

    #[When('an incident is created')]
    public function anIncidentIsCreated(): void
    {
        $this->session->visit('/incidents');
    }

    // ...
}

On that path, we’ll have a route defined in the IncidentsController controller. From here, we’ll dispatch a command for creating the incident to the command bus we defined above.

use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Attribute\Route;

final class IncidentsController
{
    #[Route('/incidents', name: 'incidents.store', methods: ['GET'])]
    public function store(MessageBusInterface $commandBus): Response
    {
        $commandBus->dispatch(new CreateIncidentCommand('Example incident'));

        return new Response();
    }
}

We’ll define the command class as a message, and we’ll route it to the commands transport.

use Symfony\Component\Messenger\Attribute\AsMessage;

#[AsMessage('commands')]
final readonly class CreateIncidentCommand
{
    public function __construct(
        public string $title
    ) {
    }
}

In the command handler, we’ll initialize a new Incident object, pass the information from the command object, and store it using a repository.

Finally, we’ll use the message bus for the background jobs to schedule two delayed background jobs for escalating the priority. We could dispatch a domain event using another message (event) bus here instead, and have the background jobs be scheduled in a listener, but again, we’re omitting that for simplicity.

Using the Messenger’s DelayStamp, we’ll schedule the first job for an hour - when the incident’s priority may be escalated to medium, and the second job for five hours - when the priority may be escalated to high.

use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;

#[AsMessageHandler]
final class CreateIncidentCommandHandler
{
    public function __construct(
        private IncidentsRepositoryInterface $incidents,
        private MessageBusInterface $backgroundJobsBus
    ) {
    }

    public function __invoke(CreateIncidentCommand $command): void
    {
        $incident = new Incident();
        $incident->setTitle($command->title);
        // ...

        $this->incidents->store($incident);

        $this->backgroundJobsBus->dispatch(
            new EscalateIncidentPriority($incident->getId()), [
                new DelayStamp(3600000), // 1 hour
            ]
        );

        $this->backgroundJobsBus->dispatch(
            new EscalateIncidentPriority($incident->getId()), [
                new DelayStamp(18000000), // 5 hours
            ]
        );
    }
}

Same as we did with the command, we’ll mark the background job as message, but we’ll route it to the background_jobs transport instead.

use Symfony\Component\Messenger\Attribute\AsMessage;

#[AsMessage('background_jobs')]
final readonly class EscalateIncidentPriority
{
    public function __construct(
        public int $incidentId
    ) {
    }
}

In its handler, we’ll find the Incident record, and use its escalate method that we defined above, then store it back in the database. In a real application, we’ll probably have more checks here as well (e.g. the escalation method may throw an exception that the incident is already resolved).

use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
final class EscalateIncidentPriorityHandler
{
    public function __construct(
        private IncidentsRepositoryInterface $incidents
    ) {
    }

    public function __invoke(EscalateIncidentPriority $job): void
    {
        $incident = $this->incidents->find($job->incidentId);

        $incident->escalate();

        $this->incidents->store($incident);
    }
}

Fast-forwarding the current time

For checking the current time in our tests, we’ll use the Symfony’s Clock component and its Symfony\Component\Clock\ClockInterface interface. The component provides a mock clock in the Symfony\Component\Clock\MockClock class, which we’ll use as implementation in the test environment. To achieve that, we’ll update the interface definition in the config/services_test.yaml file:

services:
# ...

    Symfony\Component\Clock\MockClock: ~
    Symfony\Component\Clock\ClockInterface: '@Symfony\Component\Clock\MockClock'

Now that we’ll be getting the mock clock in our contexts, we’ll use its sleep method to move the current time forward for as many hours (or seconds, minutes, etc.) as we want.

use Behat\Behat\Context\Context;
use Behat\Step\Given;
use Symfony\Component\Clock\ClockInterface;

final class TimeTravelContext implements Context
{
    public function __construct(
        private ClockInterface $clock
    ) {
    }

    #[Given('an hour has passed')]
    #[Given(':hours hours have passed')]
    public function anHourHasPassed(int $hours = 1): void
    {
        $this->clock->sleep($hours * 60 * 60);
    }
}

We’ll be using the same clock object later in another context class to get the current time to make sure only the background jobs for which the scheduled timing has passed are handled. At that point, it will already be set to return the time needed for the current scenario.

Verifying the outcome

To verify that only the expected background jobs have been handled, we’ll simply fetch the incident using the repository, and compare its priority to the one defined as expected in the scenario. As we don’t know the ID of the incident, we’ll fetch the last one from the database.

use Behat\Behat\Context\Context;
use Behat\Step\Then;
use Webmozart\Assert\Assert;

final class IncidentsContext implements Context
{
    public function __construct(
        private IncidentsRepositoryInterface $incidents,
        // ...
    ) {
    }

    #[Then('the incident should have a :priority priority')]
    public function theIncidentShouldHaveALowPriority(string $priority): void
    {
        $incident = $this->incidents->last();

        Assert::same(
            Priority::fromString($priority)->value,
            $incident->getPriority()->value,
            "Expected incident to have %s priority, got %s"
        );
    }

    // ...
}

If we now execute the tests, the second and the third scenarios (where we expected the priority to be escalated) will fail.

Feature: Time-based incident priority escalation

    Scenario: Incident has just been created
        When an incident is created
        Then the incident should have a low priority
    
    Scenario: Incident not resolved for 1 hour
        Given an incident is created
        And an hour has passed
        Then the incident should have a medium priority
            Expected incident to have "medium" priority, got "low" (Webmozart\Assert\InvalidArgumentException)
    
    Scenario: Incident not resolved for 5 hours
        Given an incident is created
        And 5 hours have passed
        Then the incident should have a high priority
            Expected incident to have "high" priority, got "low" (Webmozart\Assert\InvalidArgumentException)

This is an expected outcome, as there isn’t any worker running that will be handling the background jobs, so the priorities of the incidents in the failing scenarios never get updated.

Something similar will happen if we update the transport for the background jobs to use the sync one too. In this case, the jobs will be handled immediately after they’re dispatched, thus immediately updating incidents’ priorities to medium and then to high.

Then the incident should have a low priority
    Expected incident to have "low" priority, got "high" (Webmozart\Assert\InvalidArgumentException)

...

Then the incident should have a medium priority
    Expected incident to have "medium" priority, got "high" (Webmozart\Assert\InvalidArgumentException)

Making the tests pass

As we saw, we are already updating the current time in the tests, so the only missing piece is being in control of which background jobs will get handled. That means that we need to be checking the background jobs sent to the message bus, and handling only the ones for which the scheduled time has passed.

To achieve that, we’ll create a new custom Messenger transport, which we’ll use for the background jobs in the test environment.

For a class to be registered and used as Symfony Messenger transport, it has to implement the \Symfony\Component\Messenger\Transport\TransportInterface interface. If we take a look at that interface, we can see that it’s just a combination of the interfaces for the two main parts of the transport - a sender and a receiver.

interface TransportInterface extends ReceiverInterface, SenderInterface
{
}

For our transport implementation to be clearer, we’ll have separate classes for the sender and the receiver. Then, we’ll combine them in a transport class.

A sender is a class responsible for sending the envelopes to the underlying transport, i.e. temporarily persisting the envelopes in some sort of storage, e.g. database, redis, memory, etc., until they’re received by the receiver and passed to a message bus for handling. It has to implement the \Symfony\Component\Messenger\Transport\Sender\SenderInterface interface.

A receiver is a class that is responsible for providing the envelopes stored by the sender to whoever needs to process them. For example, if we have a separate worker running by the messenger:consume console command, it uses the selected transport’s receiver to poll for messages and then send them to the appropriate message bus. In our case we’ll do the same but instead of having a worker running, we’ll use the receiver manually in our step definitions.

Let’s start by defining both the classes and combining them into a class that will implement the Transport interface. For now, let’s focus on the bigger picture and leave the actual implementations for later.

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;

final class Sender implements SenderInterface
{
    public function send(Envelope $envelope): Envelope { }
}
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;

final class Receiver implements ReceiverInterface
{
    public function get(): iterable { }

    public function ack(Envelope $envelope): void { }

    public function reject(Envelope $envelope): void { }
}
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\TransportInterface;

final class Transport implements TransportInterface
{
    public function __construct(
        private Sender $sender,
        private Receiver $receiver
    ) {
    }

    public function get(): iterable
    {
        return $this->receiver->get();
    }

    public function ack(Envelope $envelope): void
    {
        $this->receiver->ack($envelope);
    }

    public function reject(Envelope $envelope): void
    {
        $this->receiver->reject($envelope);
    }

    public function send(Envelope $envelope): Envelope
    {
        return $this->sender->send($envelope);
    }
}

Now that we have the new transport, we need to:

  1. In the application, send the background jobs to this transport
  2. In the Behat step definitions, use the transport’s receiver to get the messages and pass them to the message bus

To be able to actually use the transport, we need one more piece - a factory that will be creating instances of it. Here we’ll also define a DSN associated to our transport, which will tell Symfony when to use this factory.

use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

final class TransportFactory implements TransportFactoryInterface
{
    public function __construct(
        private Sender $sender,
        private Receiver $receiver
    ) {
    }

    public function createTransport(
        #[\SensitiveParameter] string $dsn,
        array $options,
        SerializerInterface $serializer
    ): TransportInterface {
        return new Transport($this->sender, $this->receiver);
    }

    public function supports(#[\SensitiveParameter] string $dsn, array $options): bool
    {
        return $dsn === 'time-sensitive-transport://';
    }
}

Next, in config/packages/test/messenger.yaml, using the DSN we defined in the factory, we’ll update the background_jobs transport to use our custom transport.

framework:
    messenger:
        transports:
            background_jobs: 'time-sensitive-transport://'

With that configuration, whenever in our application a background job instance is dispatched to the background jobs message bus, it will be sent using our transport’s sender.

So, now that we know that the background jobs will be reaching it when dispatched, we need to use the same transport in the Behat step definitions to make sure they’re handled when needed. For that, we’ll create a Behat hook that will be executed before each step in our scenarios. In this hook, we’ll use the transport’s get method (implemented in the receiver) which will give us all the messages that have been previously kept by the sender.

Each of the messages given to us by the transport will be in the form of an Envelope object, and will contain a DispatchAtStamp stamp (which, as we’ll see in a bit, we’ll be adding in the sender). Using these stamps, we’ll check if the scheduled time has already passed. If yes, we’ll provide the message to the background jobs message bus for further handling by the appropriate background job handler. If no exception is thrown during the handling of the message, we’ll tell the transport that the handling is successfully done by calling its ack method. Otherwise, we’ll also let the transport know by calling its reject method. In both cases, the transport will remove the message, so it won’t be handled again before the next step. We can additionally add some logging here for better debugging.

use Behat\Behat\Context\Context;
use Behat\Hook\BeforeStep;
use Symfony\Component\Clock\ClockInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;

final class BackgroundJobsContext implements Context
{
    public function __construct(
        private Transport $transport,
        private ClockInterface $clock,
        private MessageBusInterface $backgroundJobsBus
    ) {
    }

    #[BeforeStep]
    public function processJobs(): void
    {
        $now = $this->clock->now();

        foreach ($this->transport->get() as $envelope) {
            if ($this->getDispatchTime($envelope) > $now) {
                continue;
            }

            try {
                $this->backgroundJobsBus->dispatch($envelope);

                $this->transport->ack($envelope);
            } catch (\Throwable) {
                $this->transport->reject($envelope);
            }
        }
    }

    private function getDispatchTime(Envelope $envelope): \DateTimeImmutable
    {
        $dispatchAt = $envelope->last(DispatchAtStamp::class);

        return \DateTimeImmutable::createFromTimestamp($dispatchAt->getTimestamp());
    }
}

An important thing to note here is that when injecting the Transport instance in the hook, it is not the same instance that we’ll have within the application itself. While running Behat with the Symfony extension, there will be two separate containers - one for the application, and one for the actual tests.

As described in the extension docs:

That means there is a first instance of your Symfony kernel with a corresponding container, and that instance is used to configure (and possibly autowire) your context classes.

The second instance of the Symfony kernel and its corresponding container is used when you make requests to your application with the symfony Mink driver.

Because of this, if we keep the scheduled messages in memory, the instance of the transport in the context won’t have access to that list.

A possible workaround here is to inject the application’s container in the contexts, but that will work only for scenarios where the Mink driver is used. Also, it can become unreliable if we have scenarios with multiple Mink requests.

When making multiple Mink requests within a single scenario, the second kernel and container (behat.driver.service_container) needs to be reset to provide a clean state for the second and every additional request. This reset will happen immediately before the second and any subsequent request is handed to the kernel. So, while in general it is possible to inspect the driver’s container state after requests, setting it up (bringing it into desired state) easily is only possible for the first request.

Needing to share the scheduled background jobs between the two instances of the transport, we’ll use the Symfony Cache component to temporarily persist them somewhere else.

We’ll start by defining a new cache pool in the test environment (config/packages/test/cache.yaml). In the example here, we’ll use the filesystem adapter.

framework:
    cache:
        pools:
            scheduled_jobs_pool:
                adapter: cache.adapter.filesystem

Next, let’s take a look at the actual implementation of the sender. When a message is dispatched to a message bus, Messenger calls the send method of the sender, and passes the message wrapped in an envelope. Here, we can adjust that envelope as needed, e.g. by adding more stamps to it. In our case we need to add two more stamps.

When scheduling the background jobs in the command handler above, we added DelayStamp with the delay value in milliseconds, relative to the current time. As the current time will be modified when we get the messages from the receiver, we need to keep the actual value of the time when that background job should be handled.

Because of that, we’ll calculate that information, and stick it to the envelope object using a new DispatchAtStamp stamp, as defined here:

use Symfony\Component\Messenger\Stamp\StampInterface;

final readonly class DispatchAtStamp implements StampInterface
{
    public function __construct(
        private int $timestamp
    ) {
    }

    public function getTimestamp(): int
    {
        return $this->timestamp;
    }
}

In a second stamp, which will be an instance of the Symfony\Component\Messenger\Stamp\TransportMessageIdStamp class, we’ll store an ID (which we can generate as an UUID using the Symfony UID component) of the message specific to our transport. We’ll later use that ID to identify the specific envelope when we need to remove the message from the cache.

Finally, we’ll use the Messenger’s serializer service to serialize the envelope (which will also clean it from some non-persistable stamps), and store it in the cache pool.

use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Clock\ClockInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Uid\Uuid;

final class Sender implements SenderInterface
{
    public function __construct(
        private CacheItemPoolInterface $scheduledJobsPool,
        private ClockInterface $clock,
        private SerializerInterface $serializer
    ) {
    }

    public function send(Envelope $envelope): Envelope
    {
        $dispatchAt = $this->calculateDispatchTime($envelope);
        $messageId = Uuid::v7()->toString();

        $envelope = $envelope->with(
            new DispatchAtStamp($dispatchAt->getTimestamp()),
            new TransportMessageIdStamp($messageId)
        );

        $this->cacheScheduledJob($envelope);

        return $envelope;
    }

    private function calculateDispatchTime(Envelope $envelope): \DateTimeImmutable
    {
        $dispatchAt = $this->clock->now();
        $delayStamp = $envelope->last(DelayStamp::class);

        if ($delayStamp === null) {
            return $dispatchAt;
        }

        $delaySeconds = $delayStamp->getDelay() / 1000;

        return $dispatchAt->modify(
            sprintf("+%d seconds", $delaySeconds)
        );
    }

    private function cacheScheduledJob(Envelope $envelope): void
    {
        $item = $this->scheduledJobsPool->getItem('scheduled_jobs');
        $jobs = $item->isHit() ? $item->get() : [];

        $envelope = $this->serializer->encode($envelope);
        $jobs[] = $envelope;

        $item->set($jobs);

        $this->scheduledJobsPool->save($item);
    }
}

The last part of the process we need to cover is the receiver. As we saw above, the main responsibility of the receiver is to provide the messages stored in the transport to whoever wants to process them. Additionally, it should be able to handle message acknowledges and rejections.

To provide the messages, we’ll read them from the same cache pool as above. Then we’ll deserialize each of them using the same serializer we used in the sender, so we can get them in the form of an envelope again.

An important thing to do here is to add a ReceivedStamp stamp to the envelope. With this stamp added, next time we dispatch the message (in the Behat context above), Messenger will know that it has already been processed by a transport and will directly pass it to the appropriate handler. Without this stamp, the message will again reach the sender and be cached instead of handled.

use Psr\Cache\CacheItemInterface;
use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

final class Receiver implements ReceiverInterface
{
    public function __construct(
        private CacheItemPoolInterface $scheduledJobsPool,
        private SerializerInterface $serializer
    ) {
    }

    public function get(): iterable
    {
        return array_map(
            fn(Envelope $envelope) => $envelope->with(
                new ReceivedStamp('time-sensitive-transport')
            ),
            $this->getCachedEnvelopes()
        );
    }

    /** @return Envelope[] */
    private function getCachedEnvelopes(): array
    {
        $item = $this->getCacheItem();
        $envelopes = $item->isHit() ? $item->get() : [];

        return array_map(
            fn(array $envelope) => $this->serializer->decode($envelope),
            $envelopes
        );
    }

    private function getCacheItem(): CacheItemInterface
    {
        return $this->scheduledJobsPool->getItem('scheduled_jobs');
    }

    // ...
}

When acknowledging or rejecting a message, we saw above that we’ll call the ack and reject methods respectively. In them, we’ll share the same logic and we’ll just remove the messages from the cache. During this process, we’ll use the TransportMessageIdStamp stamp we added to each of the messages in the sender, so we can identify and remove the correct message in the cache pool.

use Psr\Cache\CacheItemInterface;
use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

final class Receiver implements ReceiverInterface
{
    public function __construct(
        private CacheItemPoolInterface $scheduledJobsPool,
        private SerializerInterface $serializer
    ) {
    }

    // ...

    public function ack(Envelope $envelope): void
    {
        $this->removeFromCache($envelope);
    }

    public function reject(Envelope $envelope): void
    {
        $this->removeFromCache($envelope);
    }

    private function removeFromCache(Envelope $envelope): void
    {
        $cachedEnvelopes = $this->getCachedEnvelopes();
        $idToBeRemoved = $this->getTransportMessageId($envelope);

        $withoutDeleted = array_filter(
            $cachedEnvelopes,
            fn(Envelope $envelope) => $this->getTransportMessageId($envelope) !== $idToBeRemoved
        );

        $serialized = array_map(
            fn(Envelope $envelope) => $this->serializer->encode($envelope),
            $withoutDeleted
        );

        $cacheItem = $this->getCacheItem();
        $cacheItem->set($serialized);

        $this->scheduledJobsPool->save($cacheItem);
    }

    private function getTransportMessageId(Envelope $envelope): string
    {
        return $envelope->last(TransportMessageIdStamp::class)->getId();
    }
}

To ensure that we’ll have an empty list of scheduled jobs before each scenario, we’ll define another hook in which we’ll clear the cache pool.

use Behat\Behat\Context\Context;
use Behat\Hook\BeforeScenario;
use Psr\Cache\CacheItemPoolInterface;

final class CacheContext implements Context
{
    public function __construct(
        private CacheItemPoolInterface $scheduledJobsPool
    ) {
    }

    #[BeforeScenario]
    public function clearCachedMessages(): void
    {
        $this->scheduledJobsPool->clear();
    }
}

If we run the Behat tests again, they will now pass.

Next steps

In this article, we saw how we can get control over which scheduled jobs will be handled and when. If our application is much bigger, checking the cache before each step may become a performance concern, so in such a case we may use Behat tags to mark features/scenarios where this should be applied. The process can also be tweaked to consider retrying strategies, etc.

At the time of writing, the current versions are PHP 8.4, Symfony 7.3, and Behat 3.23.


Photo for social media by Nur Yilmaz.