The current version is 7.3.2

This document is maintained at https://github.com/muoncore/muon-java. Please submit issues at that repository.

Muon Java

Muon Java is the implementation of the Muon microservices toolkit, for the Java platform. Muon enables you to easily build microservices in many languages that have richer, more performant and fully reactive communication semantics and gain access to Muon compatible services written in other languages without having to sacrifice the rich semantics or performance.

Contributing

Muon and the ecosystem services are open source. Contribute to Muon Java and this documentation at http://github.com/muoncore/muon-java

A Microservice based system in 5 minutes

The quickest way to start a new Muon Java Microservice is to clone the example repo at https://github.com/muoncore/muon-java-gradle-example

This is set up to use Gradle as the build tool and Java as the application language.

First, install the Muon CLI, then clone this repository.

This project has an example service already in there that looks similar to

public class MuonServerExample {


    public static void main(String[] args) throws Exception {
        AutoConfiguration config = MuonConfigBuilder
                      .withServiceIdentifier("example-service").build(); (1)

        Muon muon = MuonBuilder.withConfig(config).build();              (2)

        muon.handleRequest(all(), request -> {                           (3)
            System.out.println("A request has been made ");
            Map data = new HashMap();
            data.put("Hello", "world " + System.currentTimeMillis());
            request.ok(data);                                            (4)
        });
        System.out.println("Use Ctrl-C to exit");
    }
}
1 Create a muon configuration using a fluent interface builder.
2 Construct a Muon instance from the config. It will appear in the network soon after this.
3 Setup an RPC Protocol Handler, for all request paths.
4 When invoked, respond to the RPC request with some manufactured data.

You can run this from a prebuilt Gradle task

> ./gradlew runServer

You can then use the Muon CLI to interact with the new service, using the name example-service set above as it’s identifier

> muon rpc rpc://example-service/

You will see the results from the example service that should look like

david@patmos:~/$ muon rpc rpc://example-service/hello

┌────────────┬──────────────────────────────┬───────────────────────────────────────────────┐
│ STATUS     │ CONTENT/TYPE                 │ BODY                                          │
├────────────┼──────────────────────────────┼───────────────────────────────────────────────┤
│ 200        │ application/json             │ {"Hello":"world 1481639846708"}               │
└────────────┴──────────────────────────────┴───────────────────────────────────────────────┘

========= RESPONSE FULL BODY: ========================================================

{ Hello: 'world 1481639846708' }

Congratulations, you have a running Muon service!

Core Features

Reactive RPC

Request/ Response is a well understood communication style where you make a single request and expect to receive a single resonse

Muon supports this style of communication, over it’s naturally scalable reactive and event based channel communication.

Simple RPC

Here is the simplest possible Muon RPC endpoint. It accepts any data pushed to it, and responsds with a simple text message

package io.muoncore.example;

import io.muoncore.Muon;

import java.util.concurrent.ExecutionException;


public class ReactiveRPC {

    public void exec(Muon muon) throws ExecutionException, InterruptedException {

        //request handler
//        muon.handleRequest(all(), request -> {
//            request.ok("Hi There");
//        });
//
//        //request client
//        Response data = muon.request("request://myservice/").get();
//
//        System.out.println("The Data is " + data.getPayload(String.class));
    }
}

The client for this looks like

package io.muoncore.example;

import io.muoncore.Muon;

import java.util.concurrent.ExecutionException;


public class ReactiveRPC {

    public void exec(Muon muon) throws ExecutionException, InterruptedException {

        //request handler
//        muon.handleRequest(all(), request -> {
//            request.ok("Hi There");
//        });
//
//        //request client
//        Response data = muon.request("request://myservice/").get();
//
//        System.out.println("The Data is " + data.getPayload(String.class));
    }
}

The Response object contains meta data about the reponse, if it succeeded.

Making it Reactive

The handler does not need to response synchronously as in the above example. The response can be invoked from any context, and by any thread.

This will cause an event to flow back down the channel and complete the request/ response cycle.

An example of this in action is

package io.muoncore.example;

import io.muoncore.Muon;

import java.util.LinkedList;
import java.util.concurrent.ExecutionException;

public class ReactiveRPCAsync {

    public void exec(Muon muon) throws ExecutionException, InterruptedException {

//        Queue<RequestWrapper> requestQueue = new LinkedList<>();
//
//        //request handler
//        muon.handleRequest(all(), requestQueue::add);
//
//        new Thread(() -> {
//
//            while(true) {
//
//                RequestWrapper wrapper = requestQueue.poll();
//
//                wrapper.ok("Hello");
//
//                if (requestQueue.isEmpty()) {
//                    // ... wait ....
//                }
//            }
//        });
//
//        //request client
//        Response data = muon.request("request://myservice/").get();
//
//        System.out.println("The Data is " + data.getPayload(String.class));
    }
}

This demonstrates adding the requests onto a queue and processing them asynchronously.

Be aware that the request will time out on both the client and server side, depending on your configuration.

Batch handling RPC

A common failure of RPC based systems is that they attempt to perform too much work in concurrently, and in logical isolation. This then ends up causing thread thrashing, overwhelming of the thread pool or overload on some backing data store.

Taking the above mechanism to it’s logical conclusion, it becomes trivial to batch up the processing of request.

package io.muoncore.example;

import io.muoncore.Muon;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutionException;


public class ReactiveRPCBatch {

    public void exec(Muon muon) throws ExecutionException, InterruptedException {

//        Queue<RequestWrapper> requestQueue = new LinkedList<>();
//
//        //request handler
//        muon.handleRequest(all(), requestQueue::add);


        //TODO, a good eaxmple of the pattern


    }
}

This example will, every 5 seconds, drain the queue, generate a single answer and send it to all of them.

Reactive Streams

Muon is built to enable the creation of streams easily. Internally everything is treated as a channel, a naturally streaming construction.

This is best accessed via the Reactive Streams API, a cross industry effort to standardise streaming communication with back pressure.

To publish a new stream, create a Publisher and pass it into the publishStream method, giving it a name, and the semantics of the stream.

Unresolved directive in submodules/java/doc/core.adoc - include::src/main/java/io/muoncore/example/ReactiveStreams.java[]

Here, we use Spring Reactor to demonstrate the creation of a Publisher, however any Reactive Streams compatible framework or library could be used.

To access the data from another service, use the subscribe method, passing in the logical Muon discovery url.

Unresolved directive in submodules/java/doc/core.adoc - include::src/main/java/io/muoncore/example/ReactiveStreams.java[]

Again, this example uses Java and shows two separate services communicating using a reactive message stream, with back pressure support managed by Reactor.

Wiretap

The transport subsystem of Muon injects a WiretapChannel into every connection that is made. This allows you to optionally read the messages moving in and out of the transport and further interpret them. The messages themselves are immutable, and so you recieve the message itself for processing.

Ordinarily, this is used to record the way a service communicates with the rest of the distributed application. This is particularly useful when designing and building a new communication protocol.

Image a service with a single RPC endpoint that accepts an undefined object data structure, which represent using a Map. It then responds inline wth the number 42. A very simple service.

        Broadcaster<String> publisher = Broadcaster.create();

//        muon.handleRequest(all(), request -> {
//            request.ok(42);
//        });

We would like to generate a list of all the services that are calling this one, without altering the business method.

We could implement this using a wiretap, this will extract a stream of all the requests that match a particular filter. The stream interface provided implements the Reactive Stream interface.

//        Set<String> remoteServices = new HashSet<>();     (1)
//        Broadcaster<MuonMessage> requests = Broadcaster.create();
//
//        requests.consume(msg -> {
//            remoteServices.add(msg.getSourceServiceName());  (2)
//        });
//
//        muon.getTransportControl().tap(                      (3)
//                msg -> msg.getStep().equals(RRPEvents.REQUEST)).subscribe(requests);  (4)
1 The services that have connected to this one via the RPC endpont.
2 A Spring Reactor Broadcaster, converting the messages into the list.
3 Adding the tap into the Muon transport subsystem
4 The filter matches for the EventType field on the TransportMessage, picking out particular messages.

Whenever you then perform an RPC communication, the Request TransportMessage events will be selected by your wiretap and passed into the Subscriber, in this case the Reactor Broadcaster.

//        int value = muon.request("request://myservice/").get().getPayload(Integer.class);

As many wiretaps as you like can be active. You may add another Wiretap later on to select the Response messages, and it will then begin to receive all the messages required.

//        Broadcaster<MuonMessage> responses = Broadcaster.create();
//
//        responses.consume(msg -> {
//            System.out.println("Sent a response to " + msg.getTargetServiceName());
//        });
//
//        muon.getTransportControl().tap(
//                msg -> msg.getStep().equals(RRPEvents.RESPONSE)).subscribe(responses);

Finish this event guide. Write up the event guide with a concrete example

Users. regsitering, removing. Simple streaming aggregation. THen with a projection instead.

Event Based Systems

Event based architectures have many powerful and useful properties when building distributed and scalable systems. The Muon ecosystem provides special support for building event based systems, in the form of :=

  • Microservices providing optimised event storage, transformation and query.

  • Muon protocols and APIs to interact with event based data.

Eventing in Muon is taken to mean persisted event architectures, where the streams of events are stored and used at future points. This is often called event sourcing.

Running an Event Store

To get started with

Setting up the event client

Unresolved directive in submodules/java/doc/eventsystem.adoc - include::src/main/java/io/muoncore/example/event/EventClientExample.java[tags=createclient]

Emitting Events

Unresolved directive in submodules/java/doc/eventsystem.adoc - include::src/main/java/io/muoncore/example/event/EventClientExample.java[tags=emitevent]

Replaying Events

Unresolved directive in submodules/java/doc/eventsystem.adoc - include::src/main/java/io/muoncore/example/event/EventClientExample.java[tags=replay]

Event Sourcing

talk about replaying data and ingesting it into some internal model for awesomeness to ensure.

TODO, link core event sourcing docs.

Apart from the recommended event store, Photon, Muon Java provides an in memory, simplified event store that only implements the core ingest and stream functionality of a muon compatible event store. You can read more about it here

This is most useful during building tests.

Using Event Projections

Extending Muon Java

In common with all Muon libraries, Muon Java is a restricted implementation of Communicating Sequential Processes. Extension then mostly involves creating and exposing various combinations of processes and channels.

Encoding

Transports

Discovery

Adding a new interaction protocol

Before reading on, many systems won’t need to do this. The baked in protocols (RPC, Streaming etc) are generally good enough in most circumstances. If you do have a specific need to control the way your services interact in a more granular way then read on!

To implement a new protocol, you need to define the API(s) that you want to provide to your users, define the event interchange that will need to happen to communicate that between services, and then create the processes need to bring those events to life.

This guide focuses on how to implement this in Muon-Java. For a fuller discussion on event protocols, see the main guide.

TODO, link to main guide on protocols

Designing a Protocol

Client Side

Designing Building a Muon Client Process
Designing the User API and Protocol Stack

Server Side

Designing Building a Muon Server Process
Registering the ServerStack