MSG-RS
Introduction
📖 MSG-RS is a flexible and lightweight messaging library for distributed systems built with Rust and Tokio, designed for performance and reliability.
This library is built and maintained by Chainbound, and is licensed under the MIT License.
Overview
MSG-RS is inspired by projects like ZeroMQ and Nanomsg. It was built because at Chainbound we needed a Rust-native messaging library that was flexible and customizable enough without relying on C bindings or external dependencies.
Another reason for building this library was to modularize the messaging components of our Fiber project. Lastly, this was also used as an opportunity to dive deeper into some async rust patterns to measure optimizations in isolation from other distributed system components.
Features
1. Flexible and modular
MSG-RS is designed to be modular by leveraging Rust's traits and generics. This modularity can be seen in all aspects, from the transport layer to the socket patterns.
2. Durable
Durability is a critical aspect in distributed systems. By using smart abstractions like the ones from StubbornIO, MSG is able to provide reliable reconnections and retries out of the box.
3. Extensible
Even though MSG comes with different socket patterns and transport layers, it is also designed to be easily extensible if you want to bring your own options like authentication, encryption, and compression to the table.
Development Status
MSG is currently in ALPHA, and is not yet recommended for use in production.
Contributing
Contributors are welcome! Please see the contributing guide for more information.
Usage
This section will guide you through the process of adding MSG-RS to your project and using it to build networked distributed systems.
- Getting started
- Transport Layers
- Socket Types
- Connection Authentication
- Message Compression
- Encryption
- Logging
- FAQ
Getting started with MSG-RS
To add MSG-RS to your project, add the following to your Cargo.toml
file:
[dependencies]
msg = { git = "https://github.com/chainbound/msg-rs" }
Warning: MSG-RS is currently in ALPHA, and is not yet recommended for use in production.
We plan to release MSG-RS to Crates.io once it reaches a beta stage. Until then, we recommend using the git dependency as shown above.
Socket types
MSG-RS supports the following socket types:
Request/Reply
The request/reply socket type is used for sending a request to a server and receiving a response.
Example:
use bytes::Bytes; use tokio_stream::StreamExt; use msg::{RepSocket, ReqSocket, Tcp}; #[tokio::main] async fn main() { // Initialize the reply socket (server side) with a transport let mut rep = RepSocket::new(Tcp::default()); rep.bind("0.0.0.0:4444").await.unwrap(); // Initialize the request socket (client side) with a transport let mut req = ReqSocket::new(Tcp::default()); req.connect("0.0.0.0:4444").await.unwrap(); tokio::spawn(async move { // Receive the request and respond with "world" // (RepSocket implements `Stream`) let req = rep.next().await.unwrap(); println!("Message: {:?}", req.msg()); req.respond(Bytes::from("world")).unwrap(); }); let res: Bytes = req.request(Bytes::from("hello")).await.unwrap(); println!("Response: {:?}", res); }
Publish/Subscribe
The publish/subscribe socket type is used for sending a message to multiple subscribers. It works by defining topics over which messages can be sent and received. Subscribers can subscribe to one or more topics, and will receive all messages sent to those topics by publishers.
Example:
use bytes::Bytes; use tokio_stream::StreamExt; use msg::{PubSocket, SubSocket, Tcp}; #[tokio::main] async fn main() { // Initialize the publisher socket (server side) with a transport let mut pub_socket = PubSocket::new(Tcp::default()); pub_socket.bind("0.0.0.0:4444").await.unwrap(); // Initialize the subscriber socket (client side) with a transport let mut sub_socket = SubSocket::new(Tcp::default()); sub_socket.connect("0.0.0.0:4444").await.unwrap(); let topic = "some_interesting_topic".to_string(); // Subscribe to a topic sub_socket.subscribe(topic.clone()).await.unwrap(); tokio::spawn(async move { // Values are `bytes::Bytes` pub_socket.publish(topic, Bytes::from("hello_world")).await.unwrap(); }); let msg = sub_socket.next().await.unwrap(); println!("Received message: {:?}", msg); }
Transport layers
MSG-RS supports multiple transport layers. The transport layer is the one that handles the actual sending and receiving of messages. The following transport layers are supported:
TCP
Why choose TCP?
The TCP transport layer is ideal for scenarios where reliable, ordered, and error-checked delivery of a stream of data is crucial. It ensures that data is delivered in the order it was sent and retransmits lost packets. This makes TCP suitable for applications where data integrity and accuracy are more important than speed.
TCP is especially useful if messages are going to be sent over public internet links, where the quality of the connection cannot be guaranteed and a significant portion of packets may be lost or corrupted.
How to use TCP
In MSG, here is how you can setup any socket type with the TCP transport:
use msg::{RepSocket, ReqSocket, Tcp}; #[tokio::main] async fn main() { // Initialize the reply socket (server side) with default TCP let mut rep = RepSocket::new(Tcp::default()); // Bind the socket to the address. This will start listening for incoming connections. // This method does DNS resolution internally, so you can use hostnames here. rep.bind("0.0.0.0:4444").await.unwrap(); // Initialize the request socket (client side) with default TCP let mut req = ReqSocket::new(Tcp::default()); // Connect the socket to the address. This will initiate a connection to the server. // This method does DNS resolution internally, so you can use hostnames here. req.connect("0.0.0.0:4444").await.unwrap(); // ... }
QUIC
Why choose QUIC?
QUIC is a new transport layer protocol that is built on top of UDP. It is designed to provide the same reliability & security guarantees as TCP + TLS, while solving some of the issues that it has, like
- Head-of-line blocking: If a packet is lost, all subsequent packets are held up until the lost packet is retransmitted. This can be a problem especially when multiplexing multiple streams over a single connection because it can cause a single slow stream to block all other streams.
- Slow connection setup: TCP + TLS requires 2-3 round trips to establish a connection, which can be slow on high latency networks.
- No support for multiplexing: TCP does not support multiplexing multiple streams over a single connection. This means that if you want to send multiple streams of data over a single connection, you have to implement your own multiplexing layer on top of TCP, which can run into issues like head-of-line blocking that we've seen above.
QUIC in MSG
The MSG QUIC implementation is based on quinn. It relies on self-signed
certificates and does not verify server certificates. Also, due to how our Transport
abstraction works, we
don't support QUIC connections with multiple streams. This means that the Quic
transport implementation will
do all its work over a single, bi-directional stream for now.
How to use QUIC
In MSG, here is how you can setup any socket type with the QUIC transport:
use msg::{RepSocket, ReqSocket, Quic}; #[tokio::main] async fn main() { // Initialize the reply socket (server side) with default QUIC let mut rep = RepSocket::new(Quic::default()); // Bind the socket to the address. This will start listening for incoming connections. // This method does DNS resolution internally, so you can use hostnames here. rep.bind("0.0.0.0:4444").await.unwrap(); // Initialize the request socket (client side) with default QUIC let mut req = ReqSocket::new(Quic::default()); // Connect the socket to the address. This will initiate a connection to the server. // This method does DNS resolution internally, so you can use hostnames here. req.connect("0.0.0.0:4444").await.unwrap(); // ... }
IPC
More precisely, MSG-RS supports Unix Domain Sockets (UDS) for IPC.
Why choose IPC?
IPC is a transport layer that allows for communication between processes on the same machine. The main difference between IPC and other transport layers is that IPC sockets use the filesystem as the address namespace.
IPC is useful when you want to avoid the overhead of network sockets and want to have a low-latency communication link between processes on the same machine, all while being able to use the same API as the other transport layers that MSG-RS supports.
Due to its simplicity, IPC is typically faster than TCP and QUIC, but the exact performance improvements also depend on the throughput of the underlying UDS implementation. We only recommend using IPC when you know that the performance benefits outweigh the overhead of using a network socket.
How to use IPC
In MSG, here is how you can setup any socket type with the IPC transport:
use msg::{RepSocket, ReqSocket, Ipc}; #[tokio::main] async fn main() { // Initialize the reply socket (server side) with default IPC let mut rep = RepSocket::new(Ipc::default()); // Bind the socket to the address. This will start listening for incoming connections. // You can use any path that is valid for a Unix Domain Socket. rep.bind("/tmp/msg.sock").await.unwrap(); // Initialize the request socket (client side) with default IPC let mut req = ReqSocket::new(Ipc::default()); // Connect the socket to the address. This will initiate a connection to the server. req.connect("/tmp/msg.sock").await.unwrap(); // ... }
Authentication
Authentication is the process of verifying the identity of a user or process before allowing access to resources.
In MSG, authentication is handled by users of the library through the
Authenticator
trait. This trait is implemented by the user and passed to the
socket type when it is created.
Here is how the Authenticator
trait is defined:
#![allow(unused)] fn main() { // msg-socket/src/lib.rs pub trait Authenticator: Send + Sync + Unpin + 'static { fn authenticate(&self, id: &Bytes) -> bool; } }
The authenticate
method is called by the library whenever a new connection
is established. The id
parameter is the identity of the connecting peer.
Note: the Authenticator is used by the server-side socket only!
Here is an example of how you can add an authenticator to a client-server application:
use msg::{ tcp::{self, Tcp}, Authenticator, ReqSocket, RepSocket, }; // Define some custom authentication logic #[derive(Default)] struct Auth; impl Authenticator for Auth { fn authenticate(&self, id: &Bytes) -> bool { println!("Auth request from: {:?}", id); // Custom authentication logic goes here // ... true } } #[tokio::main] async fn main() { // Initialize the reply socket (server side) with a transport // and an authenticator that we just implemented: let mut rep = RepSocket::new(Tcp::default()).with_auth(Auth); rep.bind("0.0.0.0:4444").await.unwrap(); // Initialize the request socket (client side) with a transport // and an identifier. This will implicitly turn on client authentication. // The identifier will be sent to the server when the connection is established. let mut req = ReqSocket::with_options( Tcp::default(), ReqOptions::default().auth_token(Bytes::from("client1")), ); ... }
Message compression
Sometimes, you may want to compress messages before sending them over the network. MSG-RS supports message compression out of the box, and it is very easy to use.
Compression is most useful in scenarios where you are sending large messages over the network. It can also help reduce the amount of bandwidth used by your application.
In MSG, compression is handled by the socket type:
Request/Response
You can also find a complete example in msg/examples/reqrep_compression.rs.
use msg::{compression::GzipCompressor, ReqSocket, RepSocket, Tcp}; #[tokio::main] async fn main() { // Initialize the reply socket (server side) with a transport let mut rep = RepSocket::new(Tcp::default()) // Enable Gzip compression (compression level 6). .with_compressor(GzipCompressor::new(6)); rep.bind("0.0.0.0:4444").await.unwrap(); // Initialize the request socket (client side) with a transport let mut req = ReqSocket::new(Tcp::default()) // Enable Gzip compression (compression level 6). // The request and response sockets *don't have to* // use the same compression algorithm or level. .with_compressor(GzipCompressor::new(6)); req.connect("0.0.0.0:4444").await.unwrap(); // ... }
Publish/Subscribe
You can also find a complete example in msg/examples/pubsub_compression.rs.
use msg::{compression::GzipCompressor, PubSocket, SubSocket, Tcp}; #[tokio::main] async fn main() { // Configure the publisher socket with options let mut pub_socket = PubSocket::new(Tcp::new()) // Enable Gzip compression (compression level 6) .with_compressor(GzipCompressor::new(6)); // Configure the subscribers with options let mut sub_socket = SubSocket::new(Tcp::default()); // ... }
By looking at this example, you might be wondering: "how does the subscriber know that the publisher is compressing messages, if the subscriber is not configured with Gzip compression?"
The answer is that in MSG, compression is defined by the publisher for each message that is sent.
In practice, each message contains info in its Header
that tells the subscriber whether the message
payload is compressed - and if so, which compression algorithm was used. The subscriber then uses this
info to decompress the message payload before making it available to the user.
All of this is done automatically by MSG and it works out of the box with the default compression methods:
- Gzip
- Zstd
- Snappy
- LZ4
If you wish to use a custom compression algorithm, this is not exposed with a public API yet. If you need this, please open an issue on Github and we will prioritize it!
Encryption
Encryption over messages is not yet supported in MSG-RS unless you're using a transport layer that supports it (like QUIC).
Logging
MSG-RS uses the tracing ecosystem crate for logging.
The tracing targets are configured using the tracing-subscriber crate.
They are named after the crates that they are used in, and can be configured using the RUST_LOG
environment variable.
For example, to enable logging for the msg_socket
crate in your program, simply set:
RUST_LOG=msg_socket=debug cargo run
Metrics
MSG-RS doesn't currently expose any metrics by default.
If you are interested in this feature, please let us know by opening an issue.
FAQ
How do I pronounce MSG-RS?
You can either pronounce it as "message", or as "em-es-gee". We prefer the latter.
Is MSG-RS production ready?
MSG-RS is currently in ALPHA, and is not yet recommended for use in production.
We plan to release MSG-RS to Crates.io once it reaches a beta stage. Until then, we recommend using the git dependency as shown in the Getting started section.
What is the minimum supported Rust version (MSRV)?
MSG-RS currently supports Rust 1.75 or later.
Contribution guidelines
Thanks for your interest in contributing to making MSG better!
How to contribute
Getting started
To get started with MSG, you will need the Rust toolchain installed on your machine. You can find the installation instructions here.
Once you have the necessary tools installed, you can clone the repository and run the tests:
git clone git@github.com:chainbound/msg-rs.git
cd msg-rs
cargo test --all
Development workflow
We use Github for all our development workflow. If you are not familiar with Github, you can find a great guide here.
We use Github issues to track all our work. If you want to contribute, you can find a list of open issues here. If you want to work on an issue, please leave a comment on the specific issue so that we it can be assigned to you.
When testing your changes, please use the following commands and make sure that they all pass:
cargo check --all
cargo test --all
cargo +nightly fmt -- --check
cargo +nightly clippy --all --all-features -- -D warnings
Once you are done with your changes, you can open a pull request. We will review your changes and provide feedback. Once the changes are approved, your pull request will be merged.
Asking for help
If you have any questions, you can open a new issue or join our Discord server.
Code of conduct
MSG adheres to the Rust Code of Conduct. This document describes the minimum behavior expected from all contributors.
License
By contributing to MSG, you agree that your contributions will be licensed under its MIT license.