ZeroMQ

From Omnia
Jump to navigation Jump to search

ZeroMQ

ZeroMQ (0MQ ZMQ) is a minimal Message-oriented middleware (MOM)

zeromq - Distributed Computing Made Simple - http://zeromq.org/

Installation

python-zmq - Python bindings for 0MQ library

Ubuntu/Debian/Raspberry Pi:

apt-get install python-zmq
apt-get install python-dev
pip install pyzmq

Redhat: [1]

# redhat 5
wget http://download.opensuse.org/repositories/home:/fengshuo:/zeromq/CentOS_CentOS-5/home:fengshuo:zeromq.repo -O /etc/yum.repos.d/zeromq.repo
# redhat 6
wget http://download.opensuse.org/repositories/home:/fengshuo:/zeromq/CentOS_CentOS-6/home:fengshuo:zeromq.repo -O /etc/yum.repos.d/zeromq.repo 
yum install zeromq
yum install zeromq-devel  # needed for pyzmq
pip install pyzmq

Manual Installation

Note: I couldn't get it to work.

Dependencies: libtool, autoconf, automake, e2fsprogs

Library install:

mkdir -p ~/.src ; cd ~/.src
wget http://download.zeromq.org/zeromq-4.0.3.tar.gz
tar -zvxf zeromq-4.0.3.tar.gz
cd zeromq-4.0.3
./configure
make
sudo make install
sudo ldconfig

--

Python binding install: [2]

easy_install pyzmq
# or
pip install pyzmq

---

Alternate path

# ...
tar -zvxf zeromq-4.0.3.tar.gz
cd zeromq-4.0.3
./configure --prefix=/opt/zeromq
make clean
make
sudo make install
sudo /sbin/ldconfig -v -n /opt/zeromq/lib/  # creates generic links
PKG_CONFIG_PATH=/opt/zeromq/lib/pkgconfig/  sudo pip install --upgrade pyzmq

Python Guide

ØMQ - The Guide - ØMQ - The Guide - http://zguide.zeromq.org/py:all

Examples

git clone --depth=1 git://github.com/imatix/zguide.git

eBooks

ZeroMQ by Pieter Hintjens

ZeroMQ By Pieter Hintjens - O'Reilly Media - http://shop.oreilly.com/product/0636920026136.do

"Dive into ZeroMQ, the smart socket library that gives you fast, easy, message-based concurrency for your applications. With this quick-paced guide, you'll learn hands-on how to use this scalable, lightweight, and highly flexible networking tool for exchanging messages among clusters, the cloud, and other multi-system environments.

ZeroMQ maintainer Pieter Hintjens takes you on a tour of real-world applications, using extended examples in C to help you work with ZeroMQ's API, sockets, and patterns. Learn how to use specific ZeroMQ programming techniques, build multithreaded applications, and create your own messaging architectures. You'll discover how ZeroMQ works with several programming languages and most operating systems - with little or no cost."

ZeroMQ - Free Download eBook - pdf http://filepi.com/i/qo5Uv2G

ZeroMQ by Faruk Akgul

"ØMQ (also spelled ZeroMQ, 0MQ, or ZMQ) is a high-performance asynchronous messaging library aimed at use in scalable distributed or concurrent applications. It provides a message queue, but unlike message-oriented middleware, a ØMQ system can run without a dedicated message broker. The library is designed to have a familiar socket-style API.

ZeroMQ teaches you to use ZeroMQ through examples in C programming language. You will learn how to use fundamental patterns of message / queuing with a step-by-step tutorial approach and how to apply them. Then, you'll learn how to use high level APIs and to work with multiple sockets and multithreaded programs through many examples."

Download:

Patterns

The built-in core ØMQ patterns are:

  • Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
  • Pub-sub, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.
  • Pipeline, which connects nodes in a fan-out/fan-in pattern that can have multiple steps and loops. This is a parallel task distribution and collection pattern.
  • Exclusive pair, which connects two sockets exclusively. This is a pattern for connecting two threads in a process, not to be confused with "normal" pairs of sockets.

Request-Reply

REQuest REPly

fig2.png

This is the request-reply pattern, probably the simplest way to use ØMQ. It maps to RPC and the classic client/server model.

Warning: Must be synchronized. Both send and receive block. You cannot send if there is something waiting on the receive queue (or vice a versa)! Each request must be met with a reply.

"The REQ-REP socket pair is in lockstep. The client issues zmq_send() and then zmq_recv(), in a loop (or once if that's all it needs). Doing any other sequence (e.g., sending two messages in a row) will result in a return code of -1 from the send or recv call. Similarly, the service issues zmq_recv() and then zmq_send() in that order, as often as it needs to."

zmq_hello_server.py :

import zmq
import time

context = zmq.Context()

print "starting server..."
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
print "server started."

while True:
    print "waiting for data..."
    message = socket.recv()
    print "recv:", message

    print "send:", message
    socket.send(message)

zmq_hello_client.py :

import zmq
import time

context = zmq.Context()

print "connecting to server..."
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
print "connected to server."

i = 1
while True:
    i = i + 1
    print "send:", i
    socket.send(str(i))

    message = socket.recv()
    print "message:", message

    time.sleep(1)

Publish-Subscribe

One way to many communication.

fig4.png

Error if you try to send to a SUB socket, or receive on a PUB socket.

The PUB-SUB socket pair is asynchronous. Messages will queue up if you don't receive them fast enough.

Subscribe - "Note that when you use a SUB socket you must set a subscription using zmq_setsockopt() and SUBSCRIBE, as in this code. If you don't set any subscription, you won't get any messages. It's a common mistake for beginners. The subscriber can set many subscriptions, which are added together. That is, if an update matches ANY subscription, the subscriber receives it. The subscriber can also cancel specific subscriptions. A subscription is often, but not necessarily a printable string."

Subscribe Filter - "The ZMQ_SUBSCRIBE option shall establish a new message filter on a ZMQ_SUB socket. Newly created ZMQ_SUB sockets shall filter out all incoming messages, therefore you should call this option to establish an initial message filter. An empty option_value of length zero shall subscribe to all incoming messages. A non-empty option_value shall subscribe to all messages beginning with the specified prefix. Multiple filters may be attached to a single ZMQ_SUB socket, in which case a message shall be accepted if it matches at least one filter."

Binding - "In theory with ØMQ sockets, it does not matter which end connects and which end binds. However, in practice there are undocumented differences that I'll come to later. For now, bind the PUB and connect the SUB, unless your network design makes that impossible."

Slow joiner - "Even if you start a subscriber, wait a while, and then start the publisher, the subscriber will always miss the first messages that the publisher sends."

Synchronize - "In Chapter 2 - Sockets and Patterns we'll explain how to synchronize a publisher and subscribers so that you don't start to publish data until the subscribers really are connected and ready. The alternative to synchronization is to simply assume that the published data stream is infinite and has no start and no end. One also assumes that the subscriber doesn't care what transpired before it started up."

zmq_pub_server.py

#
# Weather update server
# Binds PUB socket to tcp://*:5556
# Publishes random weather updates
#

import zmq
from random import randrange

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")

while True:
    zipcode = randrange(1, 100000)
    temperature = randrange(1, 215) - 80
    relhumidity = randrange(1, 50) + 10

    socket.send("%d %d %d" % (zipcode, temperature, relhumidity))

zmq_pub_client.py:



#
# Weather update client
# Connects SUB socket to tcp://localhost:5556
# Collects weather updates and finds avg temp in zipcode
#

import sys
import zmq

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Collecting updates from weather server…"
socket.connect("tcp://localhost:5556")

# Subscribe to zipcode, default is NYC, 10001
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"
socket.setsockopt(zmq.SUBSCRIBE, zip_filter)

# Process 5 updates
total_temp = 0
for update_nbr in range(5):
    string = socket.recv()
    zipcode, temperature, relhumidity = string.split()
    total_temp += int(temperature)

print "Average temperature for zipcode '%s' was %dF" % (
        zip_filter, total_temp / update_nbr)

---

This shows a filter that collects data from two different remote publishers and sends it to local subscribers: [3]

import zmq
import time
context = zmq.Context()
 
subscriber = context.socket (zmq.SUB)
subscriber.connect ("tcp://192.168.55.112:5556")
subscriber.connect ("tcp://192.168.55.201:7721")
subscriber.setsockopt (zmq.SUBSCRIBE, "NASDAQ")
 
publisher = context.socket (zmq.PUB)
publisher.bind ("ipc://nasdaq-feed")
 
while True:
    message = subscriber.recv()
    publisher.send (message)

--

# Subscribe on everything
frontend.setsockopt(zmq.SUBSCRIBE, )

Parallel Pipeline

Dive and Conquer - task distribution

PUSH (send) and PULL (recv) - both blocking on consumer/producer

Tasks will be divided out fairly among workers.

fig5.png

Our supercomputing application is a fairly typical parallel processing model. We have:

  • A ventilator that produces tasks that can be done in parallel
  • A set of workers that process tasks
  • A sink that collects results back from the worker processes

Server:

import zmq
import time

context = zmq.Context()

print "starting server..."
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")
print "server started."

i = 1
while True:
    try:
        print i
        socket.send(str(i))
        i = i + 1
    except zmq.error.ZMQError:
        pass

Client:

import zmq
import time

context = zmq.Context()

print "connecting to server..."
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE, '')
print "connected to server."

last = 0
while True:
    try:
        message = socket.recv()
        print "message:", message
        current = int(message)
        if current != last + 1:
            print "err count", current, last
        last = current
    except zmq.error.ZMQError as err:
        pass

    time.sleep(1)

code

version

$ python -c 'import zmq ; print zmq.zmq_version()'
4.0.4

non blocking receive

while True:
  try:
    rc = receiver.recv(zmq.DONTWAIT)
  except zmq.ZMQError:
    break

  # do work
  ...

  # No activity, so sleep for 1 msec
  time.sleep(0.001)

polling

# Initialize poll set
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)

while True:
  socks = dict(poller.poll())

  #if receiver in socks and socks[receiver] == zmq.POLLIN:
  if socks.get(receiver) == zmq.POLLIN:
    message = receiver.recv()

  # do work
  ...

Poll Timeout:

poller.poll(5000)  # 5 second timeout

Simpler check, if only one poller:

evts = poller.poll()
if evts:
   message = receiver.recv()

Tutorials

Nicholas Piël » ZeroMQ an introduction - http://nichol.as/zeromq-an-introduction

"ZeroMQ is a messaging library, which allows you to design a complex communication system without much effort. It has been wrestling with how to effectively describe itself in the recent years. In the beginning it was introduced as ‘messaging middleware’ later they moved to ‘TCP on steroids’ and right now it is a ‘new layer on the networking stack’."

Message Queue Comparisons

Message Queue Evaluation Notes - Second Life Wiki - http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes

Failure Philosophy

"ØMQ's error handling philosophy is a mix of fail-fast and resilience. Processes, we believe, should be as vulnerable as possible to internal errors, and as robust as possible against external attacks and errors. To give an analogy, a living cell will self-destruct if it detects a single internal error, yet it will resist attack from the outside by all means possible.

Assertions, which pepper the ØMQ code, are absolutely vital to robust code; they just have to be on the right side of the cellular wall. And there should be such a wall. If it is unclear whether a fault is internal or external, that is a design flaw to be fixed." [4]

Sender Receiver with Kill

# Adds pub-sub flow to receive and respond to kill signal

import sys
import time
import zmq

context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")

# Socket for control input
controller = context.socket(zmq.SUB)
controller.connect("tcp://localhost:5559")
controller.setsockopt(zmq.SUBSCRIBE, "")

# Process messages from receiver and controller
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(controller, zmq.POLLIN)
# Process messages from both sockets
while True:
    socks = dict(poller.poll())

    if socks.get(receiver) == zmq.POLLIN:
        message = receiver.recv()

        # Process task
        workload = int(message) # Workload in msecs

        # Do the work
        time.sleep(workload / 1000.0)

        # Send results to sink
        sender.send(message)

        # Simple progress indicator for the viewer
        sys.stdout.write(".")
        sys.stdout.flush()

    # Any waiting controller command acts as 'KILL'
    if socks.get(controller) == zmq.POLLIN:
        break

Handle Ctrl-C



#
# Shows how to handle Ctrl-C
#
import zmq
import signal

interrupted = False

def signal_handler(signum, frame):
global interrupted
interrupted = True

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5558")

# SIGINT will normally raise a KeyboardInterrupt, just like any other Python call
try:
    socket.recv()
except KeyboardInterrupt:
    print "W: interrupt received, proceeding…"

# or you can use a custom handler
counter = 0
signal.signal(signal.SIGINT, signal_handler)
while True:
    try:
        message = socket.recv(zmq.DONTWAIT)
    except zmq.ZMQError:
        pass
    counter += 1
    if interrupted:
        print "W: interrupt received, killing server…"
        break

If your code is blocking in a blocking call (sending a message, receiving a message, or polling), then when a signal arrives, the call will return with EINTR.

Multithreading with ØMQ

Multithreading (MT) Programming

If there's one lesson we've learned from 30+ years of concurrent programming, it is: just don't share state. It's like two drunkards trying to share a beer. It doesn't matter if they're good buddies. Sooner or later, they're going to get into a fight. And the more drunkards you add to the table, the more they fight each other over the beer. The tragic majority of MT applications look like drunken bar fights.

You should follow some rules to write happy multithreaded code with ØMQ:

  • Isolate data privately within its thread and never share data in multiple threads. The only exception to this are ØMQ contexts, which are threadsafe.
  • Stay away from the classic concurrency mechanisms like as mutexes, critical sections, semaphores, etc. These are an anti-pattern in ØMQ applications.
  • Create one ØMQ context at the start of your process, and pass that to all threads that you want to connect via inproc sockets.
  • Use attached threads to create structure within your application, and connect these to their parent threads using PAIR sockets over inproc. The pattern is: bind parent socket, then create child thread which connects its socket.
  • Use detached threads to simulate independent tasks, with their own contexts. Connect these over tcp. Later you can move these to stand-alone processes without changing the code significantly.
  • All interaction between threads happens as ØMQ messages, which you can define more or less formally.
  • Don't share ØMQ sockets between threads. ØMQ sockets are not threadsafe. Technically it's possible to migrate a socket from one thread to another but it demands skill. The only place where it's remotely sane to share sockets between threads are in language bindings that need to do magic like garbage collection on sockets.

Threading

The MT version of the Hello World service basically collapses the broker and workers into a single process:

fig20.png

"""

Multithreaded Hello World server

Author: Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com>

"""
import time
import threading
import zmq

def worker_routine(worker_url, context):
    """ Worker routine """

    # Socket to talk to dispatcher
    socket = context.socket(zmq.REP)

    socket.connect(worker_url)

    while True:

        string = socket.recv()

        print("Received request: [%s]\n" % (string))

        # do some 'work'
        time.sleep(1)

        #send reply back to client
        socket.send("World")

def main():
    """ server routine """

    url_worker = "inproc://workers"
    url_client = "tcp://*:5555"

    # Prepare our context and sockets
    context = zmq.Context(1)

    # Socket to talk to clients
    clients = context.socket(zmq.ROUTER)
    clients.bind(url_client)

    # Socket to talk to workers
    workers = context.socket(zmq.DEALER)
    workers.bind(url_worker)

    # Launch pool of worker threads
    for i in range(5):
        thread = threading.Thread(target=worker_routine, args=(url_worker, context, ))
        thread.start()

    zmq.device(zmq.QUEUE, clients, workers)

    # We never get here but clean up anyhow
    clients.close()
    workers.close()
    context.term()

if __name__ == "__main__":
    main()

keywords

ZeroMQ 0MQ ZMQ MOM Message-oriented middleware