MSG-RS

Introduction

📖 MSG-RS is a flexible and lightweight messaging library for distributed systems built with Rust and Tokio, designed for performance and reliability.

This library is built and maintained by Chainbound, and is licensed under the MIT License.

Overview

MSG-RS is inspired by projects like ZeroMQ and Nanomsg. It was built because at Chainbound we needed a Rust-native messaging library that was flexible and customizable enough without relying on C bindings or external dependencies.

Another reason for building this library was to modularize the messaging components of our Fiber project. Lastly, this was also used as an opportunity to dive deeper into some async rust patterns to measure optimizations in isolation from other distributed system components.

Features

1. Flexible and modular

MSG-RS is designed to be modular by leveraging Rust's traits and generics. This modularity can be seen in all aspects, from the transport layer to the socket patterns.

2. Durable

Durability is a critical aspect in distributed systems. By using smart abstractions like the ones from StubbornIO, MSG is able to provide reliable reconnections and retries out of the box.

3. Extensible

Even though MSG comes with different socket patterns and transport layers, it is also designed to be easily extensible if you want to bring your own options like authentication, encryption, and compression to the table.

Development Status

MSG is currently in ALPHA, and is not yet recommended for use in production.

Contributing

Contributors are welcome! Please see the contributing guide for more information.

Usage

This section will guide you through the process of adding MSG-RS to your project and using it to build networked distributed systems.

Getting started with MSG-RS

To add MSG-RS to your project, add the following to your Cargo.toml file:

[dependencies]
msg = { git = "https://github.com/chainbound/msg-rs" }

Warning: MSG-RS is currently in ALPHA, and is not yet recommended for use in production.

We plan to release MSG-RS to Crates.io once it reaches a beta stage. Until then, we recommend using the git dependency as shown above.

Socket types

MSG-RS supports the following socket types:

Request/Reply

The request/reply socket type is used for sending a request to a server and receiving a response.

Example:

use bytes::Bytes;
use tokio_stream::StreamExt;

use msg::{RepSocket, ReqSocket, Tcp};

#[tokio::main]
async fn main() {
    // Initialize the reply socket (server side) with a transport
    let mut rep = RepSocket::new(Tcp::default());
    rep.bind("0.0.0.0:4444").await.unwrap();

    // Initialize the request socket (client side) with a transport
    let mut req = ReqSocket::new(Tcp::default());
    req.connect("0.0.0.0:4444").await.unwrap();

    tokio::spawn(async move {
        // Receive the request and respond with "world"
        // (RepSocket implements `Stream`)
        let req = rep.next().await.unwrap();
        println!("Message: {:?}", req.msg());

        req.respond(Bytes::from("world")).unwrap();
    });

    let res: Bytes = req.request(Bytes::from("hello")).await.unwrap();
    println!("Response: {:?}", res);
}

Publish/Subscribe

The publish/subscribe socket type is used for sending a message to multiple subscribers. It works by defining topics over which messages can be sent and received. Subscribers can subscribe to one or more topics, and will receive all messages sent to those topics by publishers.

Example:

use bytes::Bytes;
use tokio_stream::StreamExt;

use msg::{PubSocket, SubSocket, Tcp};

#[tokio::main]
async fn main() {
    // Initialize the publisher socket (server side) with a transport
    let mut pub_socket = PubSocket::new(Tcp::default());
    pub_socket.bind("0.0.0.0:4444").await.unwrap();

    // Initialize the subscriber socket (client side) with a transport
    let mut sub_socket = SubSocket::new(Tcp::default());
    sub_socket.connect("0.0.0.0:4444").await.unwrap();

    let topic = "some_interesting_topic".to_string();

    // Subscribe to a topic
    sub_socket.subscribe(topic.clone()).await.unwrap();

    tokio::spawn(async move {
        // Values are `bytes::Bytes`
        pub_socket.publish(topic, Bytes::from("hello_world")).await.unwrap();
    });

    let msg = sub_socket.next().await.unwrap();
    println!("Received message: {:?}", msg);
}

Transport layers

MSG-RS supports multiple transport layers. The transport layer is the one that handles the actual sending and receiving of messages. The following transport layers are supported:

TCP

Why choose TCP?

The TCP transport layer is ideal for scenarios where reliable, ordered, and error-checked delivery of a stream of data is crucial. It ensures that data is delivered in the order it was sent and retransmits lost packets. This makes TCP suitable for applications where data integrity and accuracy are more important than speed.

TCP is especially useful if messages are going to be sent over public internet links, where the quality of the connection cannot be guaranteed and a significant portion of packets may be lost or corrupted.

How to use TCP

In MSG, here is how you can setup any socket type with the TCP transport:

use msg::{RepSocket, ReqSocket, Tcp};

#[tokio::main]
async fn main() {
    // Initialize the reply socket (server side) with default TCP
    let mut rep = RepSocket::new(Tcp::default());
    // Bind the socket to the address. This will start listening for incoming connections.
    // This method does DNS resolution internally, so you can use hostnames here.
    rep.bind("0.0.0.0:4444").await.unwrap();

    // Initialize the request socket (client side) with default TCP
    let mut req = ReqSocket::new(Tcp::default());
    // Connect the socket to the address. This will initiate a connection to the server.
    // This method does DNS resolution internally, so you can use hostnames here.
    req.connect("0.0.0.0:4444").await.unwrap();

    // ...
}

QUIC

Why choose QUIC?

QUIC is a new transport layer protocol that is built on top of UDP. It is designed to provide the same reliability & security guarantees as TCP + TLS, while solving some of the issues that it has, like

  • Head-of-line blocking: If a packet is lost, all subsequent packets are held up until the lost packet is retransmitted. This can be a problem especially when multiplexing multiple streams over a single connection because it can cause a single slow stream to block all other streams.
  • Slow connection setup: TCP + TLS requires 2-3 round trips to establish a connection, which can be slow on high latency networks.
  • No support for multiplexing: TCP does not support multiplexing multiple streams over a single connection. This means that if you want to send multiple streams of data over a single connection, you have to implement your own multiplexing layer on top of TCP, which can run into issues like head-of-line blocking that we've seen above.

QUIC in MSG

The MSG QUIC implementation is based on quinn. It relies on self-signed certificates and does not verify server certificates. Also, due to how our Transport abstraction works, we don't support QUIC connections with multiple streams. This means that the Quic transport implementation will do all its work over a single, bi-directional stream for now.

How to use QUIC

In MSG, here is how you can setup any socket type with the QUIC transport:

use msg::{RepSocket, ReqSocket, Quic};

#[tokio::main]
async fn main() {
    // Initialize the reply socket (server side) with default QUIC
    let mut rep = RepSocket::new(Quic::default());
    // Bind the socket to the address. This will start listening for incoming connections.
    // This method does DNS resolution internally, so you can use hostnames here.
    rep.bind("0.0.0.0:4444").await.unwrap();

    // Initialize the request socket (client side) with default QUIC
    let mut req = ReqSocket::new(Quic::default());
    // Connect the socket to the address. This will initiate a connection to the server.
    // This method does DNS resolution internally, so you can use hostnames here.
    req.connect("0.0.0.0:4444").await.unwrap();

    // ...
}

IPC

More precisely, MSG-RS supports Unix Domain Sockets (UDS) for IPC.

Why choose IPC?

IPC is a transport layer that allows for communication between processes on the same machine. The main difference between IPC and other transport layers is that IPC sockets use the filesystem as the address namespace.

IPC is useful when you want to avoid the overhead of network sockets and want to have a low-latency communication link between processes on the same machine, all while being able to use the same API as the other transport layers that MSG-RS supports.

Due to its simplicity, IPC is typically faster than TCP and QUIC, but the exact performance improvements also depend on the throughput of the underlying UDS implementation. We only recommend using IPC when you know that the performance benefits outweigh the overhead of using a network socket.

How to use IPC

In MSG, here is how you can setup any socket type with the IPC transport:

use msg::{RepSocket, ReqSocket, Ipc};

#[tokio::main]
async fn main() {
    // Initialize the reply socket (server side) with default IPC
    let mut rep = RepSocket::new(Ipc::default());
    // Bind the socket to the address. This will start listening for incoming connections.
    // You can use any path that is valid for a Unix Domain Socket.
    rep.bind("/tmp/msg.sock").await.unwrap();

    // Initialize the request socket (client side) with default IPC
    let mut req = ReqSocket::new(Ipc::default());
    // Connect the socket to the address. This will initiate a connection to the server.
    req.connect("/tmp/msg.sock").await.unwrap();

    // ...
}

Authentication

Authentication is the process of verifying the identity of a user or process before allowing access to resources.

In MSG, authentication is handled by users of the library through the Authenticator trait. This trait is implemented by the user and passed to the socket type when it is created.

Here is how the Authenticator trait is defined:

#![allow(unused)]
fn main() {
// msg-socket/src/lib.rs
pub trait Authenticator: Send + Sync + Unpin + 'static {
    fn authenticate(&self, id: &Bytes) -> bool;
}
}

The authenticate method is called by the library whenever a new connection is established. The id parameter is the identity of the connecting peer.

Note: the Authenticator is used by the server-side socket only!

Here is an example of how you can add an authenticator to a client-server application:

use msg::{
    tcp::{self, Tcp},
    Authenticator, ReqSocket, RepSocket,
};

// Define some custom authentication logic
#[derive(Default)]
struct Auth;

impl Authenticator for Auth {
    fn authenticate(&self, id: &Bytes) -> bool {
        println!("Auth request from: {:?}", id);
        // Custom authentication logic goes here
        // ...
        true
    }
}

#[tokio::main]
async fn main() {
    // Initialize the reply socket (server side) with a transport
    // and an authenticator that we just implemented:
    let mut rep = RepSocket::new(Tcp::default()).with_auth(Auth);
    rep.bind("0.0.0.0:4444").await.unwrap();

    // Initialize the request socket (client side) with a transport
    // and an identifier. This will implicitly turn on client authentication.
    // The identifier will be sent to the server when the connection is established.
    let mut req = ReqSocket::with_options(
        Tcp::default(),
        ReqOptions::default().auth_token(Bytes::from("client1")),
    );

    ...
}

Message compression

Sometimes, you may want to compress messages before sending them over the network. MSG-RS supports message compression out of the box, and it is very easy to use.

Compression is most useful in scenarios where you are sending large messages over the network. It can also help reduce the amount of bandwidth used by your application.

In MSG, compression is handled by the socket type:


Request/Response

You can also find a complete example in msg/examples/reqrep_compression.rs.

use msg::{compression::GzipCompressor, ReqSocket, RepSocket, Tcp};

#[tokio::main]
async fn main() {
    // Initialize the reply socket (server side) with a transport
    let mut rep = RepSocket::new(Tcp::default())
            // Enable Gzip compression (compression level 6).
            .with_compressor(GzipCompressor::new(6));

    rep.bind("0.0.0.0:4444").await.unwrap();

    // Initialize the request socket (client side) with a transport
    let mut req = ReqSocket::new(Tcp::default())
            // Enable Gzip compression (compression level 6).
            // The request and response sockets *don't have to*
            // use the same compression algorithm or level.
            .with_compressor(GzipCompressor::new(6));

    req.connect("0.0.0.0:4444").await.unwrap();

    // ...
}

Publish/Subscribe

You can also find a complete example in msg/examples/pubsub_compression.rs.

use msg::{compression::GzipCompressor, PubSocket, SubSocket, Tcp};

#[tokio::main]
async fn main() {
    // Configure the publisher socket with options
    let mut pub_socket = PubSocket::new(Tcp::new())
        // Enable Gzip compression (compression level 6)
        .with_compressor(GzipCompressor::new(6));

    // Configure the subscribers with options
    let mut sub_socket = SubSocket::new(Tcp::default());

    // ...
}

By looking at this example, you might be wondering: "how does the subscriber know that the publisher is compressing messages, if the subscriber is not configured with Gzip compression?"

The answer is that in MSG, compression is defined by the publisher for each message that is sent. In practice, each message contains info in its Header that tells the subscriber whether the message payload is compressed - and if so, which compression algorithm was used. The subscriber then uses this info to decompress the message payload before making it available to the user.

All of this is done automatically by MSG and it works out of the box with the default compression methods:

  • Gzip
  • Zstd
  • Snappy
  • LZ4

If you wish to use a custom compression algorithm, this is not exposed with a public API yet. If you need this, please open an issue on Github and we will prioritize it!

Encryption

Encryption over messages is not yet supported in MSG-RS unless you're using a transport layer that supports it (like QUIC).

Logging

MSG-RS uses the tracing ecosystem crate for logging.

The tracing targets are configured using the tracing-subscriber crate. They are named after the crates that they are used in, and can be configured using the RUST_LOG environment variable.

For example, to enable logging for the msg_socket crate in your program, simply set:

RUST_LOG=msg_socket=debug cargo run

Metrics

MSG-RS doesn't currently expose any metrics by default.

If you are interested in this feature, please let us know by opening an issue.

FAQ

How do I pronounce MSG-RS?

You can either pronounce it as "message", or as "em-es-gee". We prefer the latter.

Is MSG-RS production ready?

MSG-RS is currently in ALPHA, and is not yet recommended for use in production.

We plan to release MSG-RS to Crates.io once it reaches a beta stage. Until then, we recommend using the git dependency as shown in the Getting started section.

What is the minimum supported Rust version (MSRV)?

MSG-RS currently supports Rust 1.75 or later.

Contribution guidelines

Thanks for your interest in contributing to making MSG better!

How to contribute

Getting started

To get started with MSG, you will need the Rust toolchain installed on your machine. You can find the installation instructions here.

Once you have the necessary tools installed, you can clone the repository and run the tests:

git clone git@github.com:chainbound/msg-rs.git
cd msg-rs

cargo test --all

Development workflow

We use Github for all our development workflow. If you are not familiar with Github, you can find a great guide here.

We use Github issues to track all our work. If you want to contribute, you can find a list of open issues here. If you want to work on an issue, please leave a comment on the specific issue so that we it can be assigned to you.

When testing your changes, please use the following commands and make sure that they all pass:

cargo check --all
cargo test --all
cargo +nightly fmt -- --check
cargo +nightly clippy --all --all-features -- -D warnings

Once you are done with your changes, you can open a pull request. We will review your changes and provide feedback. Once the changes are approved, your pull request will be merged.

Asking for help

If you have any questions, you can open a new issue or join our Discord server.

Code of conduct

MSG adheres to the Rust Code of Conduct. This document describes the minimum behavior expected from all contributors.

License

By contributing to MSG, you agree that your contributions will be licensed under its MIT license.