Nei titoli e nei testi troverete qualche rimando cinematografico (ebbene si, sono un cinefilo). Se non vi interessano fate finta di non vederli, già che non sono fondamentali per la comprensione dei post...

Di questo blog ho mandato avanti, fino a Settembre 2018, anche una versione in Spagnolo. Potete trovarla su El arte de la programación en C. Buona lettura.

lunedì 30 maggio 2022

ZeroMQ Festival
come si usa ZeroMQ in C - pt.1

Sue: Lo hai sentito parlare del suo prossimo film? Vuole tentare di far riconciliare Arabi e Israeliani.
Mort: Sono contento che si dedichi alla fantascienza.

Rifkin's Festival è un bel film del Maestro Woody Allen appartenente alla serie di quelli diretti da lui e interpretati da un suo alter-ego (in questo caso il bravissimo Wallace Shawn). È un ode al Cinema (con la C maiuscola), è pieno di citazioni dotte, è brillante. Ed è brillante come l'argomento di questo articolo, la libreria ZeroMQ, che è un festival di sorprese, di ecletticitá e di prestazioni, una libreria "Super" che si distingue nel mondo della messaggistica (e no: non sono un azionista di ZeroMQ, che, tra l'altro, è open-source e gratuita. Ne parlo bene solo perché merita).

ZeroMQ Festival
...e nella prossima scena useremo ZeroMQ...

Veniamo al dunque: cos'è esattamente ZeroMQ? Visto che non amo reinventare la ruota lo faccio dire ai suoi stessi creatori/curatori, che nella loro pagina Web la descrivono così:

ZeroMQ (scritto anche ØMQ, 0MQ o ZMQ) è una libreria di messaggistica asincrona ad alte prestazioni, destinata all'uso in applicazioni distribuite o concorrenti. Fornisce una coda di messaggi, ma a differenza dei middleware orientati ai messaggi, un sistema ZeroMQ può funzionare senza un broker di messaggi dedicato.

ZeroMQ supporta gli schemi di messaggistica più comuni (pub/sub, request/reply, client/server e altri) su una varietà di transports (TCP, in-process, inter-process, multicast, WebSocket e altri), rendendo la messaggistica inter-process semplice come quella inter-thread. In questo modo il codice rimane chiaro, modulare ed estremamente facile da scalare.

Ecco, è evidente che la descrizione qui sopra è di parte ("Ogne scarrafone è bell' a mamma soja") e quindi, per definizione, poco attendibile. Però io non sono di parte e vi assicuro che è tutto vero! ZeroMQ è un unico pacchetto che permette di scrivere codice di messaggistica di tutti i tipi fondamentali: inter-process, inter-thread e inter-network, e senza scomodare le librerie specializzate (come le POSIX IPC o i BSD Socket). E, oltretutto, ZeroMQ è leggera, ed è (relativamente) semplice da usare. E ha altissime prestazioni (veramente molto alte, grazie all'architettura brokerless e a un codice interno genialmente progettato). ZeroMQ non è un prodotto di nicchia semisconosciuto: è usato in vari progetti da una impressionante lista di aziende (come Samsung, AT&T, Spotify, Microsoft e molte altre).

ZeroMQ è multi-piattaforma ed è veramente multi-linguaggio, nel senso che sono disponibili un sacco (ma veramente un sacco!) di interfacce; ad esempio abbiamo: C, C++, C#, Java, Python, Ruby, Dart, Go, NodeJS e molti altri. Ovviamente in questo articolo ci soffermeremo sul suo uso in C, dove abbiamo a disposizione due librerie: libzmq e CZMQ. E perché due librerie? Perché ZeroMQ è un progetto molto versatile e con due personalità: con libzmq si può scrivere codice "classico" usando lo stile low-level  che si usa per programmare, ad esempio, con i BSD Sockets; invece con CZMQ si può scrivere codice di tipo high-level usando funzioni semplici che eseguono attività molto complesse (e che sarebbero complicate da scrivere in modo low-level).

(...apro una doverosa parentesi: per chi non l'avesse ancora capito, io sono un super-fan del C, e quindi, anche se mi costa un po' dirlo, lo dico: ZeroMQ è stato scritto usando il lato oscuro della forza, il C++ (che uso moltissimo anch'io, e anche abbastanza bene, nonostante non sia esattamente il mio linguaggio preferito...). E qui viene il bello: nonostante gli ottimi risultati ottenuti, il creatore del progetto (il bravissimo Martin Sústrik) si è pentito della scelta, affermando che sarebbe venuto meglio in C (l'ha detto lui! Io non centro nulla!). I motivi li ha spiegati qui (leggere con attenzione, è molto interessante). Chiudo la doverosa parentesi...).

Ok, e a questo punto come si procede? Nella pagina Web di ZeroMQ ci sono un sacco di esempi base, e sarebbe troppo facile fare il copia-incolla di qualcuno. Piuttosto mi interessa l'idea di fare un bel benchmark (che, indirettamente, fornisce già un esempio base). Poi, nel prossimo articolo (spoiler: ci sarà una seconda parte) potremo vedere un esempio d'uso un po' più interessante. Per il benchmark ho deciso di seguire, pari pari, il mio vecchio ciclo di articoli sulla POSIX IPC, sia perché l'argomento è (quasi) lo stesso, sia per dimostrare che ZeroMQ è perfettamente integrabile nello stesso codice che usai a suo tempo (e, come detto sopra, ZeroMQ funziona anche in modo inter-process). Ovviamente userò la libreria low-level per ottenere dei sorgenti quasi sovrapponibili a quelli del ciclo POSIX IPC .

E allora, sulla falsariga dei vecchi articoli (che potete trovare qui, qui e qui), useremo i seguenti listati:

  1. Il main di un processo padre (processes.c) che crea ed segue due processi figli con fork + exec. I due processi figli si chiameranno writer e reader.
  2. Il main del processo writer (writer.c).
  3. Il main del processo reader (reader.c).
  4. Un header file (data.h).

Ok, cominciamo da processes.c: vai col codice!

// processes.c - main processo padre
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/wait.h>
#include "data.h"

// funzione main()
int main(int argc, char* argv[])
{
// crea i processi figli
pid_t pid1, pid2;
(pid1 = fork()) && (pid2 = fork());

// test pid processi
if (pid1 == 0) {
// sono il figlio 1
printf("sono il figlio 1 (%d): eseguo il nuovo processo\n", getpid());
char *pathname = "reader";
char *newargv[] = { pathname, NULL };
execv(pathname, newargv);
exit(EXIT_FAILURE); // exec non ritorna mai
}
else if (pid2 == 0) {
// sono il figlio 2
printf("sono il figlio 2 (%d): eseguo il nuovo processo\n", getpid());
char *pathname = "writer";
char *newargv[] = { pathname, NULL };
execv(pathname, newargv);
exit(EXIT_FAILURE); // exec non ritorna mai
}
else if (pid1 > 0 && pid2 > 0) {
// sono il padre
printf("sono il padre (%d): attendo la terminazione dei figli\n", getpid());
int status;
pid_t wpid;
while ((wpid = wait(&status)) > 0)
printf("sono il padre (%d): figlio %d terminato (%d)\n", getpid(),
(int)wpid, status);

// esco
printf("%s: processi terminati\n", argv[0]);
exit(EXIT_SUCCESS);
}
else {
// errore nella fork(): esco
printf("%s: fork error (%s)\n", argv[0], strerror(errno));
exit(EXIT_FAILURE);
}
}

Come avrete notato è identico (come era da sperare) a quello della POSIX IPC Socket, dei vecchi articoli, quindi è il più semplice di tutti i vari processes.c mostrati a suo tempo.

E ora vediamo l’header file data.h che ci mostra il tipo di dati scambiati e definisce il path usato da ZeroMQ per il modo IPC (è un file temporaneo che è consigliabile creare in /tmp):

#ifndef DATA_H
#define DATA_H

// path del file temporaneo per IPC
#define ZEROMQ_PATH "ipc:///tmp/0mq.ipc"

// numero di messaggi da scambiare per il benchmark
#define N_MESSAGES 2000000

// struttura Data per i messaggi
typedef struct {
unsigned long index; // indice dei dati
char text[1024]; // testo dei dati
} Data;

#endif /* DATA_H */

E adesso siamo pronti per vedere il codice di writer.c:

// writer.c - main processo figlio (il Writer (è un Client))
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <zmq.h>
#include "data.h"

// funzione main()
int main(int argc, char *argv[])
{
// creo il context, il socket e la connessione
printf("processo %d partito\n", getpid());
void *context = zmq_ctx_new();
void *writer = zmq_socket(context, ZMQ_PUSH);
zmq_connect(writer, ZEROMQ_PATH);

// loop di scrittura messaggi per il reader
Data my_data;
my_data.index = 0;
do {
// test index per forzare l'uscita
if (my_data.index == N_MESSAGES) {
// il processo chiude il socket ed esce per indice raggiunto
printf("processo %d terminato (text=%s index=%ld)\n",
getpid(), my_data.text, my_data.index);
zmq_close (writer);
zmq_ctx_destroy (context);
exit(EXIT_SUCCESS);
}

// compongo il messaggio e lo invio
my_data.index++;
snprintf(my_data.text, sizeof(my_data.text), "un-messaggio-di-test:%ld",
my_data.index);
} while (zmq_send(writer, &my_data, sizeof(Data), 0) != -1);

// il processo chiude il socket ed esce per altro motivo (errore)
printf("processo %d terminato con errore (%s)\n", getpid(), strerror(errno));
zmq_close(writer);
zmq_ctx_destroy(context);
exit(EXIT_FAILURE);
}

Anche questo codice è semplicissimo e auto-esplicativo (specialmente se avete letto i vecchi articoli): dopo aver aperto il context , il socket  ZeroMQ ed eseguito il bind, esegue un loop di 2000000 (due milioni! Chi offre di più?) di send del messaggio definito in data.h, dopodiché esce. Per ridurre all'osso l’attività della CPU e non falsare i risultati, si potrebbe omettere anche la snprintf() di ogni messaggio e aggiornare solo il campo index: ho fatto dei test e, come era prevedibile, la snprintf() locale è molto più` veloce della scrittura IPC, e quindi, praticamente, i risultati non vengono alterati. Notare che il loop pseudo-infinito prevede una uscita forzata del processo al raggiungimento del numero di scritture prestabilite. In caso di send error  si ferma il loop anticipatamente e si va all'uscita generica di errore. La fase di apertura iniziale dell'ambiente ZeroMQ (context, socket, bind) l'ho eseguita "secca" senza testare gli errori: è un programma di test, non andrà in produzione.

Ma si, dai, passiamo al reader!

// reader.c - main processo figlio (il Reader (è un Server))
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <zmq.h>
#include <time.h>
#include <sys/time.h>
#include "data.h"

// funzione main()
int main(int argc, char *argv[])
{
// creo il context, il socket ed eseguo il bind
printf("processo %d partito\n", getpid());
void *context = zmq_ctx_new();
void *reader = zmq_socket(context, ZMQ_PULL);
zmq_bind(reader, ZEROMQ_PATH);

// set clock e time per calcolare il tempo di CPU e il tempo di sistema
clock_t t_start = clock();
struct timeval tv_start;
gettimeofday(&tv_start, NULL);

// loop di lettura messaggi dal writer
Data my_data;
while (zmq_recv(reader, &my_data, sizeof(Data), 0) != -1) {
// test index per forzare l'uscita
if (my_data.index == N_MESSAGES) {
// get clock e time per calcolare il tempo di CPU e il tempo di sistema
clock_t t_end = clock();
double t_passed = ((double)(t_end - t_start)) / CLOCKS_PER_SEC;
struct timeval tv_end, tv_elapsed;
gettimeofday(&tv_end, NULL);
timersub(&tv_end, &tv_start, &tv_elapsed);

// il processo chiude il socket ed esce per indice raggiunto
printf("reader: ultimo messaggio ricevuto: %s\n", my_data.text);
printf("processo %d terminato (index=%ld CPU time elapsed: "
"%.3f - total time elapsed:%ld.%ld)\n",
getpid(), my_data.index, t_passed, tv_elapsed.tv_sec,
tv_elapsed.tv_usec / 1000);
zmq_close(reader);
zmq_ctx_destroy(context);
exit(EXIT_SUCCESS);
}
}

// il processo chiude il socket ed esce per altro motivo (errore)
printf("processo %d terminato con errore (%s)\n", getpid(), strerror(errno));
zmq_close(reader);
zmq_ctx_destroy(context);
exit(EXIT_FAILURE);
}

Il reader è strutturalmente identico (e speculare) al writer, quindi il loop di lettura ha due possibili uscite (forzata e per errore), ed esegue simmetricamente le stesse operazioni, solo che legge invece di scrivere. Ed ha in più la gestione del calcolo dei tempi di esecuzione, che ci servono per effettuare il benchmark che ci eravamo proposti.

In definitiva si può` affermare che la IPC attraverso ZeroMQ è veramente semplice da implementare, perché è pura attività di read/write su un file. Tutto Ok, quindi… e i risultati? Il test l’ho effettuato su una macchina Linux (Kernel 5.4.0) con un i7 (4 core/8 thread) con 8GB di RAM. Ok, vediamo come è andata (con la stessa forma del vecchio ciclo di articoli):

ZeroMQ (modo IPC)
-----------------
sono il padre (1896): attendo la terminazione dei figli
sono il figlio 1 (1897): eseguo il nuovo processo
sono il figlio 2 (1898): eseguo il nuovo processo
processo 1897 partito
processo 1898 partito
processo 1898 terminato (text=un-messaggio-di-test:2000000 index=2000000)
reader: ultimo messaggio ricevuto: un-messaggio-di-test:2000000
processo 1897 terminato (index=2000000 CPU time elapsed: 2.935 - total time elapsed:1.759)
sono il padre (1896): figlio 1897 terminato (0)
sono il padre (1896): figlio 1898 terminato (0)
./processes: processi terminati

Allora: il nostro test mostra che, usando ZeroMQ in modo IPC, abbiamo scambiato due milioni di messaggi tra i due processi in 1.759 secondi (quindi un messaggio ogni 0,89 us). ZeroMQ è velocissimo! A questo punto forse qualcuno noterà la stranezza del CPU time che è molto più alto del total time: non è un errore, è che su una macchina multi-core come quella di test, ZeroMQ distribuisce opportunamente il carico su thread diversi della CPU e il tempo calcolato nel codice è la somma dei tempi "reali".

Già che c'ero ho pensato che sarebbe stato interessante fare una comparazione diretta con un prodotto similare, quindi ho rieseguito i test con POSIX Message Queue. Ecco i risultati:

POSIX Message Queue
-------------------
sono il figlio 1 (8824): eseguo il nuovo processo
sono il padre (8823): attendo la terminazione dei figli
sono il figlio 2 (8825): eseguo il nuovo processo
processo 8824 partito
processo 8825 partito
processo 8825 terminato (text=un-messaggio-di-test:2000000 index=2000000)
reader: ultimo messaggio ricevuto: un-messaggio-di-test:2000000
processo 8824 terminato (index=2000000 CPU time elapsed: 1.881 - total time elapsed:1.923)
sono il padre (8823): figlio 8825 terminato (0)
sono il padre (8823): figlio 8824 terminato (0)
./processes: processi terminati

I risultati assoluti sono simili (come c'era da aspettarsi) con una leggera superiorità di ZeroMQ. Il multithreading sembra usato differentemente (pare che la Message Queue non lo usa o lo usa in maniera diversa).

A questo punto, per chiudere in bellezza, ho deciso di sfruttare la versatilità di ZeroMQ che permette di cambiare il modo d'uso (con la libreria low-level) con una facilità disarmante, semplicemente giocando con i parametri di zmq_connect(). Vediamo come: writer e reader  IPC usano questo:

writer:
zmq_connect(writer, "ipc:///tmp/0mq.ipc");

reader:
zmq_bind(reader, "ipc:///tmp/0mq.ipc");
Ebbene, per passare al modo TCP è sufficiente fare questo:
writer:
zmq_connect(writer, "tcp://localhost:5555");

reader:
zmq_bind(reader, "tcp://*:5555");
Semplicissimo, no? Così semplice che, in quattro e quattr’otto, ho rifatto i test in modo TCP e ho anche rispolverato il codice del POSIX IPC Socket per fare un confronto di prestazioni. Vediamo::
ZeroMQ (modo TCP)
-----------------
sono il padre (4249): attendo la terminazione dei figli
sono il figlio 1 (4250): eseguo il nuovo processo
sono il figlio 2 (4251): eseguo il nuovo processo
processo 4250 partito
processo 4251 partito
processo 4251 terminato (text=un-messaggio-di-test:2000000 index=2000000)
sono il padre (4249): figlio 4251 terminato (0)
reader: ultimo messaggio ricevuto: un-messaggio-di-test:2000000
processo 4250 terminato (index=2000000 CPU time elapsed: 3.251 - total time elapsed:2.389)
sono il padre (4249): figlio 4250 terminato (0)
./processes: processi terminati

POSIX IPC Socket (modo UDP)
---------------------------
sono il padre (2068): attendo la terminazione dei figli
sono il figlio 1 (2069): eseguo il nuovo processo
sono il figlio 2 (2070): eseguo il nuovo processo
processo 2069 partito
processo 2070 partito
processo 2070 terminato (text=un-messaggio-di-test:2000000 index=2000000)
reader: ultimo messaggio ricevuto: un-messaggio-di-test:2000000
processo 2069 terminato (index=2000000 CPU time elapsed: 4.004 - total time elapsed:5.197)
sono il padre (2068): figlio 2070 terminato (0)
sono il padre (2068): figlio 2069 terminato (0)
./processes: processi terminati

Allora, pare che ZeroMQ in modo TCP è veloce quasi quanto in modo IPC (notevole), e batte nettamente POSIX IPC Socket in modo UDP (un modo più veloce ma meno affidabile di TCP)... che dire? Veramente sorprendente.

E, prima di chiudere questa prima parte, approfitto per far notare che il parametro "type" di zmq_socket() ha un uso importante: ad esempio nel nostro semplice benchmark con un writer che scrive (e basta) e un reader che legge (e basta), si usa il modo push-pull (PUSH sul writer e PULL sul reader, attraverso il parametro type). Invece con un più classico sistema client/server, con un client che scrive (e aspetta una risposta) e un server che legge (e invia una risposta), si dovrebbe usare il modo request-reply (ZMQ_REQ sul client e ZMQ_REP sul server, attraverso il parametro type). Quindi il codice del modo TCP diventa così:

client:
void *requester = zmq_socket(context, ZMQ_REQ);
zmq_connect(requester, "tcp://localhost:5555");

server:
void *responder = zmq_socket(context, ZMQ_REP);
int rc = zmq_bind(responder, "tcp://*:5555");

Semplice ed efficace, direi.

Ok, per oggi può bastare. Nella seconda parte dell'articolo penso che tratteremo usi di ZeroMQ più sofisticati e usando un approccio high-level... ma non trattenete il respiro nell'attesa: non sarebbe certo la prima volta (ahimè) che rimando la seconda parte "a più avanti" perché ho qualche argomento che mi preme trattare prima: cose che succedono...

Ciao, e al prossimo post!

Nessun commento:

Posta un commento