Technically Feasible

Implementing and Testing Delayed Processing in Java

Likeness of Michael Oldroyd
Michael Oldroyd

A common pattern for event driven programs is a delay and retry mechanism. A recent problem I encountered involved delayed processing of messages from a kafka stream. The approach explored was through use of ScheduledExecutorService, which is one of the parts of Java's concurrency package. One concrete implementation is ScheduledThreadPoolExecutor. This implementation manages a thread pool, as the name suggests.

To start, you will need an instance of the ScheduledThreadPoolExecutor for use in your application. You set the minimum idle threads for the thread pool during construction.

public ScheduledExecutorService scheduledThreadPoolExecutor() {
    return new ScheduledThreadPoolExecutor(4);
}

Approaching the problem from a test-driven perspective meant finding a method to reliably test that the processing was in fact delayed. This can be achieved using a CountDownLatch.

class BasicScheduledThreadExecutorTest {
    private static final long DELAY_TIME = 1_000_000L;

    ScheduledExecutorService scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);

    @Test
    void testADelayedExecution() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);

        scheduledThreadPoolExecutor.schedule(
                countDownLatch::countDown,
                DELAY_TIME,
                TimeUnit.NANOSECONDS
        );

        LocalDateTime start = LocalDateTime.now();
        countDownLatch.await(2L, TimeUnit.SECONDS);
        LocalDateTime finish = LocalDateTime.now();

        long duration = Duration.between(start, finish).toNanos();

        MatcherAssert.assertThat(duration, Matchers.greaterThanOrEqualTo(DELAY_TIME));
        System.out.println(duration);
    }
}

Essentially, the test will wait until the countdown has been exceeded or the latch times out. If it takes longer than 2 seconds the latch throws an InterruptedException, failing the test.

Problems with conversions #

The above code works well, and provides a fairly stable outcome. When I was experimenting with millisecond durations, I was seeing results which were almost always failing. It seemed like the execution of the task was supposedly always earlier than the delay time defined in the test;

java.lang.AssertionError: 
Expected: a value equal to or greater than <1000L>
     but: <996L> was less than <1000L>

Using nanoseconds fixed this problem. Just something to bear in mind 😉.

Test Double with Mockito #

Obviously this is a toy example. You would likely distribute the executor within your production code, and trigger countDownLatch.countDown() with a test double. If you use Mockito, you can use a spy for this purpose. The syntax is a bit fiddly;

doAnswer((Answer<Void>) invocation -> {
    countDownLatch.countDown();
    return null;
}).when(someMockedInterface).someMethodThatGetsCalled();

Essentially, our test double invokes the latch when it is invoked as part of the test harness instead of the production implementation.

Example Project #

Example code for this article can be found over at gitlab.com

Image of me

Michael Oldroyd

Michael is a Software Engineer working in the North West of England.