Relatively Ignorant A blog on things about which I may or may not know stuff08c393f45296536a29df2057943b75ebb9d0a221

Extending WireMock for delayed callbacks

|

WireMock is a flexible Java test double for HTTP APIs that can be run in-process and as a standalone application. It has many built-in features and can also be extended.

Delayed callbacks

A little while back I had set up WireMock in standalone mode as part of testing a client’s service. In some cases the service makes HTTP calls to other services that perform some work and call back later. The delay can be as much as 10 minutes: much longer than an HTTP server timeout.

I wrote a WireMock extension that models these asynchronous APIs so we could test them. The extension is in Kotlin and a simplified version is the GitHub project WireMock extension for asynchronous APIs with later callbacks.

WireMock configuration

WireMock in standalone mode is usually configured with mappings specified in JSON. Here is an example mapping for the asynchronous API:

{
  "request": {
    "method": "POST",
    "url": "/contract/action"
  },
  "response": {
    "transformers": [
      "DelayedCallback"
    ],
    "transformerParameters": {
      "median": 4000,
      "sigma": 0.4
    }
  }
}

This mapping:

  • Accepts POST requests to the URI /contract/action.
  • Transforms the response using a transformer with name DelayedCallback.
  • Specifies parameters median and sigma for the transformer.

Contract

The asynchronous APIs follow a contract that includes a payload and a callback URL. Here is a simplified version:

data class ContractRequest(
    val correlationId: String,
    val payload: String,
    val callbackUrl: String,
)

data class ContractResponse(
    val correlationId: String,
    val message: String,
    val timestamp: String = Instant.now().toString(),
)

Transformer

The DelayedCallback transformer extends the WireMock ResponseTransformer abstract class.

class DelayedCallback : ResponseTransformer() {

    companion object {
        @JvmStatic
        val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(10)
        val objectMapper = jacksonObjectMapper()
    }

    // Identify this transformer in mapping specifications.
    override fun getName() = "DelayedCallback"
    // Only apply the transformer when specified.
    override fun applyGlobally() = false

    override fun transform(request: Request, response: Response, files: FileSource, parameters: Parameters): Response {

        // 1
        val contractRequest = try {
           objectMapper.readValue<ContractRequest>(request.bodyAsString)
        } catch (ex: Exception) {
           logger.error("Exception reading contract request", ex)
           return Response.Builder.like(response)
               .status(400)
               .body(ex.message)
               .build()
        }

        // 2
        val delayMillis = callbackDelayMillis(parameters)

        // 3
        executor.schedule({ contractRequest.callback() }, delayMillis, TimeUnit.MILLISECONDS)

        // 4
        val result = ContractResponse(
            contractRequest.correlationId,
            "Acknowledged the request. Will call back after $delayMillis ms",
        )
        return Response.Builder.like(response)
            .status(200)
            .body(objectMapper.writeValueAsString(result))
            .headers(HttpHeaders(HttpHeader.httpHeader("Content-Type", "application/json")))
            .build()
    }

    private fun callbackDelayMillis(parameters: Parameters?) =
        parameters?.let {
            val median = it.getDoubleValue("median", 1000.0)
            val sigma = it.getDoubleValue("sigma", 0.1)
            randomLogNormalMillis(median, sigma)
        } ?: 1000L

    private fun randomLogNormalMillis(median: Double, sigma: Double) =
        (exp(ThreadLocalRandom.current().nextGaussian() * sigma) * median).roundToLong()
}

The transform function

  1. Reads the ContractRequest value from the incoming request.

  2. Calculates the callback delay in milliseconds from a lognormal distribution using the supplied parameters and the simple default of fixed, 1-second delay. See below for information about reading double parameter values.

  3. Schedules the callback function using the executor service.

  4. Returns a contract response with the specified correlationId.

The callback function

This function is defined as an extension on ContractRequest. It uses the OkHttp client from the library already present in WireMock and sends another ContractResponse object with the same correlationId.

fun ContractRequest.callback() {

    val okClient = OkHttpClient()

    val body = objectMapper.writeValueAsString(ContractResponse(correlationId, "All processing complete"))
    val request = Request.Builder()
        .url(callbackUrl)
        .post(body.toRequestBody("application/json".toMediaType()))
        .build()

    okClient.newCall(request).execute().use { response ->
        if (!response.isSuccessful)
            logger.error("Error calling back: ${response.message}")
        else
            logger.info("Callback successful: ${response.message}")
    }
}

WireMock Parameters extension function

An extension to the WireMock Parameters class makes it easy to read double values safely:

fun Parameters.getDoubleValue(key: String, default: Double) = if (key in this)
    when (val value = get(key)) {
        is Double -> value
        is Int -> value.toDouble()
        is String -> value.toDoubleOrNull() ?: default
        else -> default
    }
else default

The tests for this function show how it works.

Putting it together

A Gradle ‘uber-JAR’ task bundles WireMock standalone and extension code into one large JAR:

tasks.register<Jar>("uberJar") {
    archiveClassifier.set("uber")

    from(sourceSets.main.get().output)

    dependsOn(configurations.runtimeClasspath)
    from({
        configurations.runtimeClasspath.get().filter { it.name.endsWith("jar") }.map { zipTree(it) }
    })
}

The command line to start it becomes:

java -cp wiremock-async-uber.jar \
  com.github.tomakehurst.wiremock.standalone.WireMockServerRunner \
  --extensions mjs.wiremock.DelayedCallback
  • The main class needs to be specified explicitly.
  • WireMock is passed a comma-separated list of extension class names.

Running private Code With Me servers

|

JetBrains is developing Code With Me that performs a very similar function to Visual Studio Code Live Share:

Code With Me is a new collaborative development and pair programming service. It enables you to share the currently opened project in your IDE with others, and work on it together in real time.

It has been in Early Access Program for a few months and works pretty well.

Code With Me uses central servers to connect users with each other. Once connected they can communicate directly with each other.

Running servers in a closed environment

By default, Code With Me uses public servers provided by JetBrains, but you can set up your own.

I have been working with a large company that uses JetBrains IDEs extensively and has almost exclusively switched to remote work. They could really benefit from using Code With Me but are understandably concerned to protect their code and sensitive information and might not be comfortable using external servers.

I wanted to see how easy it would be to run private Code With Me servers on the company’s managed cloud infrastructure. The company has a platform for deploying containerised workloads on AWS that made it easy to set up. By default, services deployed on it are only visible to users on the internal network and VPN.

Can we use only a lobby server?

Code With Me uses two kinds of servers:

  • a lobby server that connects parties who want to code together; and
  • one or more relay servers that connect users in case direct P2P connections don’t work or are forbidden.

I also wanted to test the idea that relay servers are not needed for internal company users. Developers who work together are all on internal company networks or the corporate VPN, which are both in the private 172.16.0.0/12 address range, so this idea seemed possible.

I modified the instructions in the Code With Me administration guide:

  • I downloaded the latest version of the lobby server from the download link on that page. (The link takes you to an obligatory name and email form before revealing links to lobby and relay servers.)
  • I set up a simplified Dockerfile for the lobby server, reduced from the one on that page:
FROM debian:buster-slim

ARG DISTRIBUTION_VERSION=1400
ADD lobby-server-linux-x64.${DISTRIBUTION_VERSION}.tar.gz /home/lobby-server

RUN apt-get update && apt-get install -y unzip net-tools procps && apt-get clean

WORKDIR /home/lobby-server

ENV JAVA_HOME /home/lobby-server/jbr
ENV SERVER_PORT 8080
ENV ENABLED_FEATURES p2p_quic,direct_tcp

ENTRYPOINT ["bin/lobby-server"]

EXPOSE 8080

Compared with the example in the administration guide:

  • There was no need for a config.json file because we were not setting up relay servers.
  • The platform can provision Redis instances for services, so REDIS_HOST and REDIS_PORT were set in platform configuration.
  • BASE_URL was set per environment in platform configuration.
  • The platform provisions load balancers with certificates so there is no need for NGINX and certificate configuration.
  • Removed ws_relay from the list of ENABLED_FEATURES.

Did it work?

Yes!

We tried using direct_tcp without p2p_quic but it stopped working: the latter was also required for P2P communication to work in this environment.

Tracing Spring asynchronous code with New Relic – a better way

|

In my earlier post about tracing Spring asynchronous code with New Relic I showed a simple solution using a subclass of ApplicationEvent to carry a New Relic token. It has some disadvantages:

  1. Code that uses it must explicitly declare New Relic tracing using the @Trace annotation, must create subclasses of TracedEvent and must call the TracedEvent#linkToken method on the event object.

  2. Each token can only be expired once, even if an event is listened to by multiple listeners.

A better way

This method uses an implementation of java.util.concurrent.Executor that wraps a delegate instance.

  1. The NewRelicTraceExecutor#execute method is called in the parent thread. It constructs a TracedRunnable that wraps the Runnable instance it is given.

  2. The TracedRunnable#run method is called in the child thread. It calls Token#linkAndExpire method before calling run on its delegate Runnable.

All the New Relic-specific code is in this one class, which can be wired into a Spring Boot application to be used with ApplicationEventMulticaster. Each event listener has its own Runnable instance with its own New Relic token.

package com.example.tracing;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.Trace;

import java.util.concurrent.Executor;

public class NewRelicTraceExecutor implements Executor {

    private final Executor delegate;

    public NewRelicTraceExecutor(Executor delegate) {
        this.delegate = delegate;
    }

    @Override
    public void execute(Runnable command) {
        Token token = NewRelic.getAgent().getTransaction().getToken();
        delegate.execute(new TracedRunnable(command, token));
    }

    static class TracedRunnable implements Runnable {

        private final Runnable delegate;
        private final Token token;

        TracedRunnable(Runnable delegate, Token token) {
            this.delegate = delegate;
            this.token = token;
        }

        @Trace(async = true)
        @Override
        public void run() {
            token.linkAndExpire();
            delegate.run();
        }
    }
}

As before, there is a dependency on the New Relic API. In Gradle:

    implementation 'com.newrelic.agent.java:newrelic-api:5.11.0'

Tracing Spring asynchronous code with New Relic

|

I have been working with Spring Boot microservices in an environment that is monitored using New Relic. Applications instrumented by New Relic are deployed with agents that send status and other information to a central server for monitoring and analysis.

New Relic’s Distributed Tracing enables complex request flows to be traced through multiple services instrumented with its agents. This is a powerful tool for quickly finding interesting or anomalous traces so they can be examined. We instrumented the Spring Boot services with New Relic and were able to follow synchronous calls made to downstream services.

The problem

But it didn’t trace all calls to other services. We executed some code asynchronously using Spring’s custom application events. Events are published by an ApplicationEventMulticaster configured with a task executor, and subscribed to by asynchronous listenters. We found that New Relic trace context was not being transferred with the events to the listeners in different threads.

When the asynchronous listener code called other services, those services were not recognised by New Relic as participating in the same distributed trace.

A simple solution

Our solution was to extend the Spring ApplicationEvent class to carry with it a New Relic trace token, and for the listener code to link that token to its New Relic context.

Prerequisite

Include the New Relic agent in the project’s runtime dependencies. In Gradle:

    implementation 'com.newrelic.agent.java:newrelic-api:5.10.0'

The TracedEvent class

package com.example.events;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Token;
import org.springframework.context.ApplicationEvent;

public class TracedEvent extends ApplicationEvent {
    
    private Token traceToken;
    
    TracedEvent(Object eventObject) {
        super(eventObject);
        traceToken = NewRelic.getAgent().getTransaction().getToken();
    }
 
    public void linkToken() {
        traceToken.linkAndExpire();
    }
}

There is no need for null checking on New Relic classes because NewRelic.getAgent() always returns a usable object. When the code executes without an actual agent connected, it returns an instance of NoOpAgent that returns a safe instance of Transaction that itself returns a safe, do-nothing instance of Token.

Listener code

Important parts of the code:

public class SomeEvent extends TracedEvent {
    // etc.
}
import com.newrelic.api.agent.Trace;

@Service
public class ExampleListener {

    @Trace(async = true) // Ensure New Relic traces this method’s thread
    public void onEvent(SomeEvent event) {
        event.linkToken(); // Do this first

        // Act on the event
    }
}

Future improvements

This simple solution was adequate for our immediate purposes but is not complete. With ApplicationEventMulticaster an event may be listened to by multiple listeners but the token will be expired by the first listener that uses it. In our case each event had only one listener.

It is valid to retrieve multiple tokens from a single New Relic transaction and use each one independently. We could fetch a token for each listener or to fetch a token for each thread used by the event multitasker’s task executor.

It is better to use Spring configuration to automatically fetch tokens and use them in new contexts. Spring Cloud Sleuth uses this technique to ensure tracing information is propagated to new threads.

Octopus Deploy server in AWS and polling tentacles

|

I am using Octopus Deploy on a current project to deploy to a number of targets in tightly-controlled, on-premises environments. We are using polling tentacles so we don’t need to get ingress firewall rules manually created for every deployment target.

Tentacle-to-server communication

Octopus Deploy server is deployed into AWS and needs to be configured to securely accept connections from polling tentacles:

  • the Octopus web portal on its assigned port
  • the Octopus server for tentacle instructions, usually on port 10943

The first connection is HTTP or HTTPS and can be secured simply in AWS with any load balancer that presents a certificate and offloads TLS, forwarding HTTP requests to the server.

The second connection is HTTPS but must be secured from end to end. On installation, both server and tentacle generate a self-signed certificate, which they use to secure all communication with each other. This means the Octopus Deploy server cannot be deployed behind a device that offloads the TLS certificate.

AWS Load Balancers

The current generation of AWS Elastic Load Balancers come in two types: Application Load Balancers and Network Load Balancers.

Application Load Balancers can route traffic based on host, header, path etc. and are very flexible. But they can only accept HTTP and HTTPS connections and always offload TLS in the latter case.

Network Load Balancers do not support complex routing rules but can offload certificates for some requests and allow TCP passthrough of others. This solution meets our needs:

  • A TLS listener on port 443 offloads the certificate on requests to the web portal, which are forwarded over HTTP.

  • A TCP listener on port 10943 passes requests through unchanged to port 10943 to the same server.

Security Group differences

AWS application and network load balancers work differently with security groups. Application load balancers have security groups attached to them and apply ingress rules. In contrast, network load balancers do not have security groups attached; here the security rules of target instances apply, using their listening ports.

In our configuration the security group for the server specifies port 10943 for the traffic that passes through the load balancer, and port 80 for the web portal traffic.