The current version is 7.4.0

This document is maintained at https://github.com/muoncore/muon-java. Please submit issues at that repository.

Muon Java

Muon Java is the implementation of the Muon microservices toolkit, for the Java platform. Muon enables you to easily build microservices in many languages that have richer, more performant and fully reactive communication semantics and gain access to Muon compatible services written in other languages without having to sacrifice the rich semantics or performance.

Contributing

Muon and the ecosystem services are open source. Contribute to Muon Java and this documentation at http://github.com/muoncore/muon-java

A Microservice based system in 5 minutes

The quickest way to start a new Muon Java Microservice is to clone the example repo at https://github.com/muoncore/muon-java-gradle-example

This is set up to use Gradle as the build tool and Java as the application language.

First, install the Muon CLI, then clone this repository.

This project has an example service already in there that looks similar to

public class MuonServerExample {

  public static void main(String[] args) {

    AutoConfiguration config = MuonConfigBuilder.withServiceIdentifier("my-test-service")               (1)
      .addWriter(conf -> {
        conf.getProperties().put("muon.discovery.factories", "io.muoncore.discovery.amqp.AmqpDiscoveryFactory"); (2)
        conf.getProperties().put("amqp.discovery.url", "amqp://muon:microservices@localhost");

        conf.getProperties().put("muon.transport.factories", "io.muoncore.transport.amqp.AmqpMuonTransportFactory");  (3)
        conf.getProperties().put("amqp.transport.url", "amqp://muon:microservices@localhost");

      })
      .build();

    Muon muon = MuonBuilder.withConfig(config).build();              (4)

    RpcServer rpc = new RpcServer(muon);                             (5)

    rpc.handleRequest(path("/"))                                     (6)
      .addResponseType(Map.class)
      .handler(request -> {
        request.ok(Collections.singletonMap("message", "hello world"));  (7)
      });
  }
}
1 Create a muon configuration using a fluent interface builder.
2 Configure the AMQP transport to connect to the correct broker (see transport, below)
3 Configure the AMQP discovery to connect to the correct broker (see discovery, below)
4 Create a Muon instance on the AMQP transport/ discovery
5 Add an RPC Server to this Muon instance (see protocols, below)
6 Add an RPC endpoint to the server
7 When invoked, respond to the RPC request with some manufactured data.

You can run this from a prebuilt Gradle task

> ./gradlew runServer

You can then use the Muon CLI to interact with the new service, using the name example-service set above as it’s identifier

> muon rpc example-service /

This accesses the rpc protocol and interacts with the / endpoint

You will see the results from the example service that should look like

david@patmos:~/$ muon rpc rpc://example-service/hello

┌────────────┬──────────────────────────────┬───────────────────────────────────────────────┐
│ STATUS     │ CONTENT/TYPE                 │ BODY                                          │
├────────────┼──────────────────────────────┼───────────────────────────────────────────────┤
│ 200        │ application/json             │ {"Hello":"world 1481639846708"}               │
└────────────┴──────────────────────────────┴───────────────────────────────────────────────┘

========= RESPONSE FULL BODY: ========================================================

{ Hello: 'world 1481639846708' }

Congratulations, you have a running Muon service!

Of APIs and Protocols

Muon, at heart, is a toolkit for building new Reactive APIs. These are message based and give the ability to build true APIs that confirm to the guiding principles of the Reactive Manifesto.

To get you started, Muon provides a set of prebuilt types of API that give you semantics beyond HTTP or other RPCish protocols

Once you have constructed a Muon instance, connected it to a Discovery and enabled some transport, you can begin to wrap it with Protocols. This give different types of behaviour, richer semantics.

In addition to the ones described here, you can write your own!

Example Protocols

Here we look at RPC and Reactive Streams. These are hosted at http://github.com/muoncore/stack-rpc and http://github.com/muoncore/stack-reactive-streams

You will find the Java implementations alongside implementations for other Muon implementations of those API types.

RPC

Request/ Response is a well understood communication style where you make a single request and expect to receive a single response

Muon supports this style of communication, over it’s naturally scalable reactive and event based channel communication.

Simple RPC

Here is the simplest possible Muon RPC endpoint. It accepts any data pushed to it, and responds with a simple text message

public void server(Muon muon) throws ExecutionException, InterruptedException {
  RpcServer rpcServer = new RpcServer(muon);

  //request handler
  rpcServer.handleRequest(HandlerPredicates.all(), request -> {
    request.ok("Hi There");
  });

}

The client for this looks like

public void client(Muon muon) throws ExecutionException, InterruptedException {
  RpcClient rpcClient = new RpcClient(muon);

  Response data = rpcClient.request("request://myservice/").get();
  System.out.println("The Data is " + data.getPayload(String.class));

}

The Response object contains meta data about the response, eg, if it succeeded.

Making it Reactive

The handler does not need to response synchronously as in the above example. The response can be invoked from any context, and by any thread.

This will cause an event to flow back down the channel and complete the request/ response cycle.

An example of this in action is

RpcServer rpcServer = new RpcServer(muon);

Queue<RequestWrapper> requestQueue = new LinkedList<>();

rpcServer.handleRequest(all(), requestQueue::add);    (1)

new Thread(() -> {   (2)

  while (true) {

    RequestWrapper wrapper = requestQueue.poll();

    wrapper.ok("Hello");       (3)

    if (requestQueue.isEmpty()) {
      // ... wait ....
    }
  }
  (4)
});
1 As requests arrive, add them to a Queue for processing
2 Start a thread to process the requests in serial
3 Send a response. This is on a different thread
4 Original thread drops out of this method and is re-used by Muon

This demonstrates adding the requests onto a queue and processing them asynchronously.

Be aware that the request will time out on both the client and server side, depending on your configuration.

Reactive Streams

Muon is built to enable the creation of streams easily. Internally everything is treated as a channel, a naturally streaming construction.

This is best accessed via the Reactive Streams API, a cross industry effort to standardise streaming communication with back pressure.

To publish a new stream, create a Publisher and pass it into the publishStream method, giving it a name, and the semantics of the stream.

public void server(Muon muon) throws ExecutionException, InterruptedException {
  ReactiveStreamServer rxServer = new ReactiveStreamServer(muon);                  (1)

  rxServer.publishSource("/counter", COLD, Flowable.range(5, 10)); (2)

  rxServer.publishGeneratedSource("/dynamic-counter", COLD, subscriptionRequest -> {         (3)

    int start = Integer.parseInt(subscriptionRequest.getArgs().getOrDefault("counter", "5")); (4)

    return Flowable.range(start, start + 100);        (5)
  });
}
1 Create the ReactiveStreamServer. The protocol is then attached to the running Muon instance and will be visible in the introspection report.
2 Create a re-usable new RxJava Flowable (which implements Publisher) and expose it on a Muon endpoint.
3 DYnamically generate a Publisher. Parameters can be passed to the endpoint by the client, which allows the endpoint to load data, transform the stream or other tasks before providing the Publisher to be connected to by the remote Subscriber
4 Access the passed params
5 Dynamically build a new Publisher

Here, we use RxJava to demonstrate the creation of a Publisher, however any Reactive Streams compatible framework or library could be used.

To access the data from another service, use the subscribe method, passing in the logical Muon discovery url.

public void client(Muon muon) throws URISyntaxException {
  ReactiveStreamClient rxClient = new ReactiveStreamClient(muon);

  rxClient.subscribe(new URI("stream://my-server/counter"), new Subscriber<StreamData>() {  (1)
    @Override
    public void onSubscribe(Subscription s) {
      s.request(Long.MAX_VALUE);                (2)
    }

    @Override
    public void onNext(StreamData streamData) {  (3)
      Map payload = streamData.getPayload(Map.class);

      //do something with the data ..
      System.out.println(payload);
    }

    @Override
    public void onError(Throwable t) {
      t.printStackTrace();
    }

    @Override
    public void onComplete() {
      // stream is done. clean up resources.
    }
  });

  rxClient.subscribe(new URI("stream://my-server/dynamic-counter?counter=30"),   (4)
    subscriber
  );

}
1 Subscribe to the given Rx endpoint. Pass in a Reactive Streams Subscriber
2 Manage back pressure from the client. Here, we just request as much data as possible.
3 Receive the data. StreamData lets you selectively decode it in various ways to allow heterogeneous data flow over the stream and be correctly decoded.
4 Subscribe to a dynamic endpoint, passing a parameter in the query string.

Again, this example uses Java and shows two separate services communicating using a reactive message stream, with back pressure support managed by the Reactive Stream Subscriber.

Transports

Muon services connect with others to do their work, ordinarily, across a network. The core concept in Muon systems is the channel, a bi-directional message pipe. If you dig into protocols, you will see many of these bichannels being constructed and attached together. One of these bichannels will be obtained from a MuonTransport, which represents a network communication.

As with protocols, transport come in two parts. The MuonTransport interface represents the client side of the transport, and is responsible for connecting to some remote service on demand and transferring messages back/ forth.

The server side of the transport is wholly internal to the transport, and will, in some way, accept connections from another Muon instance and then request an internal bichannel from the ServerStacks interface. This interface facades over the server side of the API types. When creating a Muon server protocol (eg, ReactiveStreamServer), you are providing a source of bichannels for the ServerStacks implementation to give to transports.

So long as you can implement the interface ChannelConnection with MuonMessage flowing through it, you can build a transport for Muon.

Initialising Transports

The most common way of initialising transports is to use MuonConfigBuilder and pass in the transport factories to use. These will use the properties that are set in the config to construct the appropriate factories and register them with the Muon instance.

For example, this service configures the AMQP transport and instructs it to connect to a particular broker.

    AutoConfiguration config = MuonConfigBuilder.withServiceIdentifier("my-test-service")               (1)
      .addWriter(conf -> {
        conf.getProperties().put("muon.discovery.factories", "io.muoncore.discovery.amqp.AmqpDiscoveryFactory"); (2)
        conf.getProperties().put("amqp.discovery.url", "amqp://muon:microservices@localhost");

        conf.getProperties().put("muon.transport.factories", "io.muoncore.transport.amqp.AmqpMuonTransportFactory");  (3)
        conf.getProperties().put("amqp.transport.url", "amqp://muon:microservices@localhost");

      })
      .build();

You can manually construct the Muon instance to programmatically configure the transport, although you also have to programmatically configure the Discovery and Codecs as well.

    EventBus bus = new EventBus();

    AutoConfiguration config = MuonConfigBuilder
           .withServiceIdentifier("my-test-service").build();   (1)

    Muon muon = new MultiTransportMuon(                         (2)
      config,
      new InMemDiscovery(),
      Collections.singletonList(new InMemTransport(config, bus)),  (3)
      new JsonOnlyCodecs());

    Muon muondisco = new MultiTransportMuon(                         (1)
      config,
      new MultiDiscovery(Collections.singletonList(new InMemDiscovery())), (2)
      Collections.singletonList(new InMemTransport(config, bus)),
      new JsonOnlyCodecs());
  }
}

Multiple Transports

Muon Java supports multiple transports at the same time, and will select the best transport to use based on the order of the transport connection urls in the discovery packet (use muon d --raw to see this info from the CLI)

MuonMessages

In commmon with other network models, Muon operates in levels. Each level conceptually speaks to its peer in the other service and implements this by interacting with the level below it.

In Muon, each protocol speaks to its other side implementation by interacting with a set of channels, ultimately rooted in a transport channel. The interface between "protocol channels" and the Transport channel passes a message with a well defined schema, represented in Muon Java as MuonMessage. The transports job is to send and receive these messages across a network. How it does this doesn’t matter as far as the rest of Muon is concerned.

The minimal MuonMessage schema looks like this

Name Type Description

id

String

The message ID Used for correlation

created

long

When the message was created

target_service

String

The logical service name this message is heading for

target_service_instance

String

The instance of the target service this is heading to, if applicable.

origin_service

String

The logical service that created this message.

protocol

String

The protocol this message was created by

step

String

Used by protocols, most commonly to indicate a message type or protocol state machine transition

status

String

Used by some protocols to indicate error states, deprecated

content_type

String

The content type of the payload, eg "application/json"

channel_op

String

Indicate if the message should be considered a terminator, ie dispatch then shut down the channel

payload

byte[]

The payload of the message, encoded as a byte array.

Messages flowing between services are fully symmetrical, all flow using this common schema. Transports are permitted to trim this information for performance once a channel has been established, so long as it is inferred and reconstructed on the receiving side.

To give better usability when constructing channels, Muon Java uses the extended generic types MuonInboundMessage and MuonOutboundMessage. Protocols will construct MuonOutboundMessage instances as needed and consume MuonOutboundMessage instances.

If you wish to construct a new transport, you will be transporting these messages.

Discovery

Muon services connect with others to do their work, ordinarily, across a network. To do this, services need to be able to find each other. Muon takes the position that hardcoding a network address, port or name is a terrible idea. Specifically, Muon supports transport across a network both directly, point to point, and via intermediaries, such as a broker or message router (eg, the muonjs websocket gateway).

This means that all service names must be logical, and so must be dynamically resolvable by services. Muon also takes the general position that fully consistent views of the surrounding system are expensive to produce and maintain, and are often wrong anyway.

There is a common subsystem in all Muon implementations handling discovery. Similar to Transports, Muon Discoveries define a portable data schema that is used to describe services enough to be able to select the appropriate transport to use to connect to it.

Initialising Discoveries

The most common way of initialising discoveries is to use MuonConfigBuilder and pass in the transport factories to use. These will use the properties that are set in the config to construct the appropriate factories and register them with the Muon instance.

For example, this service configures the AMQP transport and instructs it to connect to a particular broker.

    AutoConfiguration config = MuonConfigBuilder.withServiceIdentifier("my-test-service")               (1)
      .addWriter(conf -> {
        conf.getProperties().put("muon.discovery.factories", "io.muoncore.discovery.amqp.AmqpDiscoveryFactory"); (2)
        conf.getProperties().put("amqp.discovery.url", "amqp://muon:microservices@localhost");

        conf.getProperties().put("muon.transport.factories", "io.muoncore.transport.amqp.AmqpMuonTransportFactory");  (3)
        conf.getProperties().put("amqp.transport.url", "amqp://muon:microservices@localhost");

      })
      .build();

You can manually construct the Muon instance to programmatically configure the discovery, although you also have to programmatically configure the Transports and Codecs as well.

    Muon muondisco = new MultiTransportMuon(                         (1)
      config,
      new MultiDiscovery(Collections.singletonList(new InMemDiscovery())), (2)
      Collections.singletonList(new InMemTransport(config, bus)),
      new JsonOnlyCodecs());
1 programatically initialise a Muon instance
2 Use MultiDiscovery to combine discovery information. In this case, from InMemDiscovery only.
Multiple Discoveries

Muon Java supports multiple discoveries at the same time. All of them are active and provide information on how to locate a service. If multiple discoveries are active at once, their information is merged to give a comprehensive view.

Advanced Features

Wiretap

The transport subsystem of Muon injects a WiretapChannel into every connection that is made. This allows you to optionally read the messages moving in and out of the transport and further interpret them. The messages themselves are immutable, and so you recieve the message itself for processing.

Ordinarily, this is used to record the way a service communicates with the rest of the distributed application. This is particularly useful when designing and building a new communication protocol.

Image a service with a single RPC endpoint that accepts an undefined object data structure, which represent using a Map. It then responds inline wth the number 42. A very simple service.

rpc.handleRequest(all(), request -> {
  request.ok(42);
});

We would like to generate a list of all the services that are calling this one, without altering the business method.

We could implement this using a wiretap, this will extract a stream of all the requests that match a particular filter. The stream interface provided implements the Reactive Stream interface.

Set<String> remoteServices = new HashSet<>();     (1)
Subscriber<MuonMessage> sub = new Subscriber<MuonMessage>() {
  @Override
  public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); }

  @Override
  public void onNext(MuonMessage msg) {
    remoteServices.add(msg.getSourceServiceName());  (2)
  }

  @Override
  public void onError(Throwable t) {

  }

  @Override
  public void onComplete() {

  }
};

muon.getTransportControl().tap(                      (3)
  msg ->
    msg.getStep().equals(REQUEST))                   (4)
  .subscribe(sub);
1 The services that have connected to this one via the RPC endpont.
2 A Reactive Streams Subscriber, converting the messages into the list of origin service names
3 Adding the tap into the Muon transport subsystem
4 The filter matches for the step field on the MuonMessage, picking out particular message types for processing.

Whenever you then perform an RPC communication, the Request TransportMessage events will be selected by your wiretap and passed into the Subscriber, in this case the Rx Subscriber.

Encoding Support

To go across a network, all payloads used in Muon protocols ultimately need to be converted to arrays of bytes.

Muon can uses a variety of encoding methods, depending on your configuration and the types that you want to use.

The default codec in Muon Java for internal types is now Avro, and it is recommended that you use Avro if you can, as this gives an improved experience for integration with external tooling, as Muon can negotiate the sending of Avro schemas and provides them to clients via the Introspection protocol.

There are different sources of Avro Schema information that Muon can use :-

  • You generate a SpecificRecord extending class from an existing Schema, using the Avro tools.

  • For regular POJOS with no more information, Muon will auto create a schema from your Java type and send that. This does not include any validation rules, and auto generates the Schema types from the Java types, which may be incorrect. All fields are marked as Nullable

The fallback Codec is JSON, encoded using GSON. This is used when no other codec can be negotiated, such as for Muon Node/ MuonJS

For Muon implementations that don’t support Avro, Muon will send JSON encoded messages and process the responses from JSON into Java types.