您的位置:首页 > 编程语言 > Go语言

Fast, Scalable Networking in Go with Mangos-nanomsg in go

2016-01-15 20:35 639 查看
package main
import (
"fmt"
"os"
"time"
"github.com/gdamore/mangos"
"github.com/gdamore/mangos/protocol/pub"
"github.com/gdamore/mangos/protocol/sub"
"github.com/gdamore/mangos/transport/ipc"
"github.com/gdamore/mangos/transport/tcp"
)
func die(format
string, v ...interface{}) {
fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
os.Exit(1)
}
func date()
string {
return time.Now().Format(time.ANSIC)
}
func server(url
string) {
var sock mangos.Socket
var err
error
if sock, err = pub.NewSocket(); err !=
nil {
die("can't get new pub socket:
%s", err)
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Listen(url); err !=
nil {
die("can't listen on pub socket:
%s", err.Error())
}
for {
// Could also use sock.RecvMsg to get header
d := date()
fmt.Printf("SERVER: PUBLISHING DATE
%s\n", d)
if err = sock.Send([]byte(d)); err !=
nil {
die("Failed publishing:
%s", err.Error())
}
time.Sleep(time.Second)
}
}
func client(url
string, name
string) {
var sock mangos.Socket
var err
error
var msg []byte
if sock, err = sub.NewSocket(); err !=
nil {
die("can't get new sub socket:
%s", err.Error())
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Dial(url); err !=
nil {
die("can't dial on sub socket:
%s", err.Error())
}
// Empty byte array effectively subscribes to everything
err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
if err != nil {
die("cannot subscribe:
%s", err.Error())
}
for {
if msg, err = sock.Recv(); err !=
nil {
die("Cannot recv:
%s", err.Error())
}
fmt.Printf("CLIENT(%s): RECEIVED
%s\n", name,
string(msg))
}
}
func main() {
if len(os.Args) >
2 && os.Args[1] ==
"server" {
server(os.Args[2])
os.Exit(0)
}
if len(os.Args) >
3 && os.Args[1] ==
"client" {
client(os.Args[2], os.Args[3])
os.Exit(0)
}
fmt.Fprintf(os.Stderr,
"Usage: pubsub server|client <URL> <ARG>\n")
os.Exit(1)
}
survey, _ = surveyor.NewSocket()
survey.AddTransport(tcp.NewTransport())
survey.Listen("tcp://:8000")
survey.SetOption(mangos.OptionSurveyTime, time.Second)
for {
survey.Send("Is anybody out there?")
for {
if msg, err = survey.Recv(); err !=
nil {
break
}
fmt.Printf("Discovered:
%s\n",
string(msg))
}
}
In the past, I’ve looked at nanomsg and why it’s a formidable alternative to the well-regarded

ZeroMQ. Like ZeroMQ, nanomsg is a native library which markets itself as a way to build fast and scalable networking layers. I won’t go into detail on how nanomsg accomplishes this since my analysis of it already covers that fairly extensively, but instead
I want to talk about a Go implementation of the protocol called
Mangos.1
If you’re not familiar with nanomsg or Scalability Protocols, I recommend reading my

overview of those first.

nanomsg is a shared library written in C. This, combined with its zero-copy API, makes it an extremely low-latency transport layer. While there are a lot of
client bindings which allow you to use nanomsg from other languages, dealing with shared libraries can often be a pain—not to mention it complicates deployment.

More and more companies are
starting to use Go for backend development because of its speed and concurrency primitives. It’s really good at building server components that scale. Go obviously provides the
APIs needed for socket networking, but building a scalable distributed system that’s reliable using these primitives can be somewhat onerous. Solutions like nanomsg’s Scalability Protocols and ZeroMQ
attempt to make this much easier by providing useful communication patterns and by taking care of other messaging concerns like queueing.

Naturally, there are Go bindings for
nanomsg and ZeroMQ, but like I said, dealing with shared libraries can be fraught with peril. In Go (and often other languages), we tend to avoid loading native libraries if we can. It’s much easier
to reason about, debug, and deploy a single binary than multiple. Fortunately, there’s a really nice implementation of nanomsg’s Scalability Protocols in pure Go called Mangos by Garrett D’Amore of
illumos fame.

Mangos offers an idiomatic Go implementation and interface which affords us the same messaging patterns that nanomsg provides while maintaining compatibility. Pub/Sub, Pair, Req/Rep, Pipeline, Bus, and Survey are all there. It also supports the same pluggable
transport model, allowing additional transports to be added (and extended2)
on top of the base TCP, IPC, and inproc ones.3
Mangos has been tested for interoperability with nanomsg using the
nanocat command-line interface.

One of the advantages of using a language like C is that it’s not garbage collected. However, if you’re using Go with nanomsg, you’re already paying the cost of GC. Mangos makes use of object pools in order to reduce pressure on the garbage collector. We
can’t turn Go’s GC off, but we can make an effort to minimize pauses. This is critical for high-throughput systems, and Mangos tends to perform quite comparably to nanomsg.

Mangos (and nanomsg) has a very familiar, socket-like API. To show what this looks like, the code below illustrates a simple example of how the Pub/Sub protocol is used to build a fan-out messaging system.

view raw

mangos_example.go hosted with

by
GitHub
My message queue test framework,
Flotilla, uses the Req/Rep protocol to allow clients to send requests to distributed daemon processes, which handle them and respond. While this is a very simple use case where you could just as easily get away with raw TCP sockets, there are more advanced
cases where Scalability Protocols make sense. We also get the added advantage of transport abstraction, so we’re not strictly tied to TCP sockets.
I’ve been building a distributed messaging system using Mangos as a means of federated communication. Pub/Sub enables a fan-out, interest-based broadcast and Bus facilitates many-to-many messaging. Both of these are exceptionally useful for connecting disparate
systems. Mangos also supports an experimental new protocol called Star. This pattern is like Bus, but when a message is received by an immediate peer, it’s propagated to all other members of the topology.

My favorite Scalability Protocol is Survey. As I discussed in my nanomsg overview, there are a lot of really interesting applications of this. Survey allows a process to query the state of multiple peers in one shot. It’s similar to Pub/Sub in that the surveyor
publishes a single message which is received by all the respondents (although there’s no topic subscriptions). The respondents then send a message back, and the surveyor collects these responses. We can also enforce a deadline on the respondent replies, which
makes Survey particularly useful for service discovery.

view raw

mangos_service_discovery.go hosted with

by
GitHub
With my messaging system, I’ve used Survey to implement a heartbeat protocol. When a broker spins up, it begins broadcasting a heartbeat using a Survey socket. New brokers can connect to existing ones, and they reply to the heartbeat which allows brokers
to “discover” each other. If a heartbeat isn’t received before the deadline, the peer is removed. Mangos also handles reconnects, so if a broker goes offline and comes back up, peers will automatically reconnect.

To summarize, if you’re building distributed systems in Go, consider taking a look at Mangos. You can certainly roll your own messaging layer with raw sockets, but you’re going to end up writing
a lot of logic for a robust system. Mangos, and nanomsg in general, gives you the right abstraction to quickly build systems that scale and are
fast.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: