Nei titoli e nei testi troverete qualche rimando cinematografico (ebbene si, sono un cinefilo). Se non vi interessano fate finta di non vederli, già che non sono fondamentali per la comprensione dei post...

Di questo blog ho mandato avanti, fino a Settembre 2018, anche una versione in Spagnolo. Potete trovarla su El arte de la programación en C. Buona lettura.

domenica 7 marzo 2021

Il buono, il brutto, l'IPC
considerazioni sulle prestazioni della POSIX IPC - pt.3

Il Biondo: Vedi, il mondo si divide in due categorie: chi ha la pistola carica, e chi scava. Tu scavi

E siamo arrivati (finalmente! uff...) all'ultima parte della nostra personale Trilogia del dollaro... ebbene si, lo ammetto: la Trilogia del IPC, non diventerà una pietra miliare come i mitici tre film del grandissimo Sergio Leone, ma comunque io ce l'ho messa tutta, e spero che a qualcuno tutta questa sbrodolata torni utile. E, se non siete d'accordo, questa volta vi manderò a casa il cattivo a convincervi!

...il mondo si divide in due categorie: chi sa usare l'IPC e chi no...

Allora: chi ha letto le prime due parti della serie (qui e qui), oltre a concorrere per il premio "il lettore più paziente dell'anno" può leggere in scioltezza quanto segue. Per gli altri raccomando una veloce lettura previa (almeno per capire de che se sta a parla') e, per punizione, lettura "in ginocchio sui ceci"  (un po' di cultura popolare non guasta mai... anzi: promemoria per un futuro articolo: "Considerazioni sull'uso dei detti popolari nella divulgazione informatica").

E dunque: abbiamo già fornito tre (gagliardi) esempi d'uso di POSIX IPC: FIFO (Named Pipe), Message Queue  e UNIX domain socket (IPC socket). Oggi tocca alla quarta e ultima, la Shared Memory, che ho lasciato in fondo perché è un po' diversa dalle altre, non essendo un tipico mezzo di scambio di messaggi ma, come dice il nome, un sistema di condivisione della memoria. Rivediamo un attimo la definizione:

Shared Memory: la comunicazione tra due o più processi viene raggiunta attraverso un pezzo di memoria condiviso tra tutti i processi. La memoria condivisa deve essere protetta dagli accessi simultanei usando meccanismi di sincronizzazione.

E infatti, in un altro articolo, avevamo già visto un esempio d'uso nella sua forma tipica. Beh, ho deciso di inserire ugualmente in questa specie di "Olimpiadi dell'IPC"  anche la Shared Memory forzandola a scambiare messaggi. Visto che questo uso è una forzatura non potremo aspettarci né grandi prestazioni né un codice particolarmente lineare e senza ridondanze, ma sarà, comunque, un interessante esercizio di programmazione.

E cominciamo con la parte "normale" di questo ciclo di articoli, ossia l’header data.h, il padre processes.c e i due figli writer.c reader.c… vai col codice!

#ifndef DATA_H
#define DATA_H

// nome del memory mapped file
#define MMAP_NAME "mymmap"

// numero di messaggi da scambiare per il benchmark
#define N_MESSAGES 2000000

// struttura Data per i messaggi
typedef struct {
unsigned long index; // indice dei dati
char text[1024]; // testo dei dati
} Data;

#endif /* DATA_H */
// processes.c - main processo padre
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/wait.h>
#include "mmap.h"
#include "data.h"

// funzione main()
int main(int argc, char* argv[])
{
// creo il memory mapped file
shdata *data;
if ((data = memMapOpen(MMAP_NAME, sizeof(Data), true)) == NULL) {
// errore di creazione
printf("%s: non posso creare il memory mapped file (%s)\n", argv[0],
strerror(errno));
exit(EXIT_FAILURE);
}

// crea i processi figli
pid_t pid1, pid2;
(pid1 = fork()) && (pid2 = fork());

// test pid processi
if (pid1 == 0) {
// sono il figlio 1
printf("sono il figlio 1 (%d): eseguo il nuovo processo\n", getpid());
char *pathname = "reader";
char *newargv[] = { pathname, NULL };
execv(pathname, newargv);
exit(EXIT_FAILURE); // exec non ritorna mai
}
else if (pid2 == 0) {
// sono il figlio 2
printf("sono il figlio 2 (%d): eseguo il nuovo processo\n", getpid());
char *pathname = "writer";
char *newargv[] = { pathname, NULL };
execv(pathname, newargv);
exit(EXIT_FAILURE); // exec non ritorna mai
}
else if (pid1 > 0 && pid2 > 0) {
// sono il padre
printf("sono il padre (%d): attendo la terminazione dei figli\n", getpid());
int status;
pid_t wpid;
while ((wpid = wait(&status)) > 0)
printf("sono il padre (%d): figlio %d terminato (%d)\n", getpid(),
(int)wpid, status);

// rimuovo il memory mapped file ed esco
printf("%s: processi terminati\n", argv[0]);
memMapClose(MMAP_NAME, data);
exit(EXIT_SUCCESS);
}
else {
// errore nella fork(): rimuovo il memory mapped file ed esco
printf("%s: fork error (%s)\n", argv[0], strerror(errno));
memMapClose(MMAP_NAME, data);
exit(EXIT_FAILURE);
}
}
// writer.c - main processo figlio
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include "mmap.h"
#include "data.h"

// funzione main()
int main(int argc, char* argv[])
{
// apro il memory mapped file
printf("processo %d partito\n", getpid());
shdata *data;
if ((data = memMapOpen(MMAP_NAME, sizeof(Data), false)) == NULL) {
// errore di apertura
printf("%s: non posso aprire il memory mapped file (%s)\n", argv[0],
strerror(errno));
exit(EXIT_FAILURE);
}

// loop di scrittura messaggi per il reader
Data my_data;
my_data.index = 0;
for (;;) {
// test index per forzare l'uscita
if (my_data.index == N_MESSAGES) {
// il processo esce per indice raggiunto
printf("processo %d terminato (text=%s index=%ld)\n", getpid(),
my_data.text, my_data.index);
exit(EXIT_SUCCESS);
}

// compongo il messaggio e lo invio
my_data.index++;
snprintf(my_data.text, sizeof(my_data.text), "un-messaggio-di-test:%ld",
my_data.index);
memMapWrite(data, &my_data, sizeof(Data));
}

// il processo esce per altro motivo (errore: non gestito in questa versione)
printf("processo %d terminato con errore (%s)\n", getpid(), strerror(errno));
exit(EXIT_FAILURE);
}
// reader.c - main processo figlio
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <time.h>
#include <sys/time.h>
#include "mmap.h"
#include "data.h"

// funzione main()
int main(int argc, char* argv[])
{
// apro il memory mapped file
printf("processo %d partito\n", getpid());
shdata *data;
if ((data = memMapOpen(MMAP_NAME, sizeof(Data), false)) == NULL) {
// errore di apertura
printf("%s: non posso aprire il memory mapped file (%s)\n", argv[0],
strerror(errno));
exit(EXIT_FAILURE);
}

// set clock e time per calcolare il tempo di CPU e il tempo di sistema
clock_t t_start = clock();
struct timeval tv_start;
gettimeofday(&tv_start, NULL);

// loop di lettura messaggi dal writer
Data my_data;
for (;;) {
// leggo un messaggio
memMapRead(data, &my_data, sizeof(Data));

// test index per forzare l'uscita
if (my_data.index == N_MESSAGES) {
// get clock e time per calcolare il tempo di CPU e il tempo di sistema
clock_t t_end = clock();
double t_passed = ((double)(t_end - t_start)) / CLOCKS_PER_SEC;
struct timeval tv_end, tv_elapsed;
gettimeofday(&tv_end, NULL);
timersub(&tv_end, &tv_start, &tv_elapsed);

// il processo esce per indice raggiunto
printf("reader: ultimo messaggio ricevuto: %s\n", my_data.text);
printf("processo %d terminato "
"(index=%ld CPU time elapsed: %.3f s - total time elapsed:%ld.%ld s)\n",
getpid(), my_data.index, t_passed, tv_elapsed.tv_sec,
tv_elapsed.tv_usec / 1000);
exit(EXIT_SUCCESS);
}
}

// il processo esce per altro motivo (errore: non gestito in questa versione)
printf("processo %d terminato con errore (%s)\n", getpid(), strerror(errno));
exit(EXIT_FAILURE);
}

Ok, fino ad adesso nessuna sorpresa, il codice è mooolto simile a quello degli altri esempi visti finora, tranne che le funzioni di read/write sono usate (come avranno notato i lettori più attenti) senza testare il valore di ritorno. E adesso, per rompere la monotonia (e prima che qualcuno si addormenti) vediamo la "forzatura"  di questo codice, ovvero la mini-libreria mmap che ho scritto per usare la memoria condivisa per leggere e scrivere messaggi: puro masochismo però è un codice interessante (spero). Vediamolo, sono due file, mmap.h e mmap.c:

// mmap.h - header mini-libreria IPC con memory mapped file
#include <pthread.h>
#include <stdbool.h>

// struttura per i dati condivisi
typedef struct {
pthread_mutex_t mutex; // mutex comune ai processi
pthread_cond_t cond; // condition variable comune ai processi
bool data_ready; // flag per dati disponibili (true=ready)
size_t len; // lunghezza campo data
char data[1]; // dati da condividere
} shdata;

// prototipi globali
shdata *memMapOpen(const char *mmname, size_t len, bool create);
void memMapClose(const char *mmname, shdata *ptr);
void memMapRead(shdata *ptr, void *buf, size_t count);
void memMapWrite(shdata *ptr, const void *buf, size_t count);
// mmap.c - implementazione mini-libreria IPC con memory mapped file
#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#include "mmap.h"

// memMapOpen() - apre un memory mapped file
shdata *memMapOpen(
const char *mmname,
size_t len,
bool create)
{
shdata *ptr;

// test se modo create o modo open di un mmfile già creato
if (create) {
// apre un memory mapped file (il file "mmname" è creato in /dev/shm)
int fd;
if ((fd = shm_open(mmname, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR)) == -1)
return NULL; // esce con errore

// tronca un memory mapped file
if (ftruncate(fd, sizeof(shdata) + len) == -1)
return NULL; // esce con errore

// mappa un memory mapped file
if ((ptr = mmap(NULL, sizeof(shdata) + len, PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0)) == MAP_FAILED)
return NULL; // esce con errore

// chiude la shared memory: questo non compromette il map eseguito
close(fd);

// init mutex in modo "shared memory"
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&ptr->mutex, &attr);
pthread_mutexattr_destroy(&attr);

// init condition variable in modo "shared memory"
pthread_condattr_t attrcond;
pthread_condattr_init(&attrcond);
pthread_condattr_setpshared(&attrcond, PTHREAD_PROCESS_SHARED);
pthread_cond_init(&ptr->cond, &attrcond);
pthread_condattr_destroy(&attrcond);

// init altri dati
ptr->data_ready = false;
ptr->len = len;
}
else {
// apre un memory mapped file (il file "shmname" è creato in /dev/shm)
int fd;
if ((fd = shm_open(mmname, O_RDWR, S_IRUSR | S_IWUSR)) == -1)
return NULL; // esce con errore

// mappa un memory mapped file
if ((ptr = mmap(NULL, sizeof(shdata) + len, PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0)) == MAP_FAILED)
return NULL; // esce con errore

// chiude la shared memory: questo non compromette il map eseguito
close(fd);
}

// ritorna il pointer
return ptr;
}

// memMapClose() - chiude un memory mapped file
void memMapClose(
const char *mmname,
shdata *ptr)
{
// rilascio tutte le risorse acquisite
pthread_mutex_destroy(&ptr->mutex);
pthread_cond_destroy(&ptr->cond);
munmap(ptr, sizeof(shdata) + ptr->len);
shm_unlink(mmname);
}

// memMapRead() - legge dati dal mapped-file
void memMapRead(
shdata *ptr,
void *buf,
size_t count)
{
// lock mutex
pthread_mutex_lock(&ptr->mutex);

// aspetta la condizione
while (!ptr->data_ready)
pthread_cond_wait(&ptr->cond, &ptr->mutex);

// legge i dati dal mapped-file e segnala la condizione
memcpy(buf, ptr->data, count);
ptr->data_ready = false;
pthread_cond_signal(&ptr->cond);

// unlock mutex
pthread_mutex_unlock(&ptr->mutex);
}

// memMapWrite() - scrive dati nel mapped-file
void memMapWrite(
shdata *ptr,
const void *buf,
size_t count)
{
// lock mutex
pthread_mutex_lock(&ptr->mutex);

// aspetta la condizione
while (ptr->data_ready)
pthread_cond_wait(&ptr->cond, &ptr->mutex);

// scrive i dati sul mapped-file esegnala la condizione
memcpy(ptr->data, buf, count);
ptr->data_ready = true;
pthread_cond_signal(&ptr->cond);

// unlock mutex
pthread_mutex_unlock(&ptr->mutex);
}

Ok, no? È un codice abbastanza semplificato che, per essere messo in produzione, necessiterebbe di un po' più di controllo degli errori... ma è solo un esercizio, chi userebbe un sistema così per mandare messaggi quando ci sono le funzioni ad-hoc della famiglia Message Queues? Comunque, è un esempio funzionante (compilare ed eseguire per credere, eh!). Ovviamente, visto il tipo di funzionamento della Shared Memory (vedi la definizione qua sopra) nella struttura base shdata sono previsti un mutex e una condition variable per gestire la sincronizzazione degli accessi (meccanismo che è, invece, intrinseco con le altre POSIX IPC viste negli articoli precedenti).

Ci manca solo da descrivere una cosa: il trucco del "char data[1]"  (in mmap.h) usato per rendere generici i dati da condividere. Questo campo è (non a caso) l'ultimo della struttura dati "shdata" che descrive il mapped-file, e funziona così: quando si crea il file si passa, alla memMapOpen(), il size dei dati da scambiare (usando un sizeof): quindi nel nostro caso la dimensione passata è sizeof(Data). Dentro la memMapOpen() il mapped-file viene mappato (usando la system call mmap()) con la dimensione totale richiesta, che è composta da una parte base fissa (la struttura shdata) e dalla parte che potremmo definire "variabile" (la struttura Data). Il risultato finale è un mapped-file impostato per scambiare dati nella sua parte variabile "char data[1]", che di base è lunga "un char" ma che, in realtà, è lunga "sizeof(Data) char" una volta mappato il file. Un trucchetto da niente.

E abbiamo anche i risultati!

sono il padre (14836): attendo la terminazione dei figli
sono il figlio 1 (14837): eseguo il nuovo processo
sono il figlio 2 (14838): eseguo il nuovo processo
processo 14837 partito
processo 14838 partito
processo 14838 terminato (text=un-messaggio-di-test:2000000 index=2000000)
reader: ultimo messaggio ricevuto: un-messaggio-di-test:2000000
processo 14837 terminato (index=2000000 CPU time elapsed: 4.077 s - total time elapsed:4.657 s)
sono il padre (14836): figlio 14837 terminato (0)
sono il padre (14836): figlio 14838 terminato (0)
./processes: processi terminati

Evidentemente i risultati non sono stupefacenti, ma neanche disastrosi: un messaggio ogni 2 us, e siamo quasi al livello delle IPC socket. Quindi: l'uso improprio della Shared Memory viene bocciato a causa dei risultati non proprio eccellenti e, soprattutto, per la inutile complicazione del codice. Però vi assicuro che, se non la usiamo per leggere/scrivere messaggi, ma la usiamo "come si deve" (tipo l'esempio dell'articolo citato sopra) accedendo  ad essa con un pointer, consente di scrivere codice elegante ed efficiente, che può risultare utile in molti progetti.

Teoricamente l'articolo e il ciclo dovrebbero terminare qui, ma quell'irascibile di mio cuggino mi ha ricordato (con male parole, al solito)  che avevo promesso di fare un confronto finale con i thread:

mio cuggino: E tu vorresti perdere una occasione per sfatare la leggenda urbana che i thread sono più veloci dei processi? Le promesse si mantengono! Non ti leggo più!
io: va bene, ma visto che un confronto di codice e prestazioni coi thread l'avevo già fatto in quell'altro articolo, ti va bene se qui lo sfumo un po'?

E allora sfumerò e sarò brevissimo: senza stare a ripetere codice già visto, provate a immaginare (e magari provate a scriverlo, è un utile e semplice esercizio) un confronto con la Message Queue, che è velocissima e ben si presta all'uso sia multiprocess che multithread (è anche thread-safe!). Scrivete un semplice processo padre che crea la coda esattamente come nell'esempio già visto, e che invece di generare due processi figli, crea due thread. I due thread eseguono due funzioni che devono essere (praticamente) identiche ai main() dell'esempio con la Message Queue (si può fare, ve lo assicuro). Aggiungete un po' di pepe e sale, compilate ed eseguite. I risultati dorrebbero essere questi:

thread 140026602206976 partito
thread 140026593814272 partito
thread 8884 terminato (text=un-messaggio-di-test:2000000 index=2000000)
reader: ultimo messaggio ricevuto: un-messaggio-di-test:2000000
thread 8884 terminato (index=2000000 CPU time elapsed: 3.848 - total time elapsed:1.931)
./threads: thread terminati

Ma guarda un po'... le prestazioni sono identiche alla versione multiprocess! E non fatevi ingannare dal CPU time che è il doppio del total time: non è un errore, è che su una macchina multi-core come quella che ho usato per i test (un i7 4 core/8 thread) i due thread vengono eseguiti su due thread diversi della CPU e il tempo calcolato nel codice è la somma dei due tempi "reali". E quindi è vero: come già visto anteriormente, i processi (almeno su Linux) hanno una velocità paragonabile a quella dei thread, e la scelta multithread/multiprocess si deve fare secondo altri criteri (ricordate le regole che avevo descritto in quell'altro articolo?). Meditate gente, meditate...

Ciao, e al prossimo post!

P.S. (…comunque, i sorgenti di questo articolo, inclusi quelli non mostrati del test in multithread, li trovate nel mio repository GitHub. Buona lettura!…)

sabato 6 febbraio 2021

Per qualche IPC in più
considerazioni sulle prestazioni della POSIX IPC - pt.2

Colonnello Mortimer: È un posto ideale per un'imboscata. Lo prenderemo tra due fuochi.
Il Monco:Ah già... tu dall'esterno, io dall'interno ... come al solito...

E siamo arrivati alla seconda parte della Trilogia del dollaro... oops: la Trilogia del IPC, da non confondere con i mitici tre film del grandissimo Sergio Leone (film che sono un must-view per qualsiasi cinefilo che si rispetti). Il titolo di questo articolo somiglia molto a un altro che trattava un argomento abbastanza diverso. Non è che sono a corto di idee, ma, dividendo in tre parti la trama della POSIX IPC, non posso fare a meno di citare una delle più belle trilogie della storia del Cinema (quello con C maiuscola con il nostro amato C Language). Dovete scusarmi. E, se non volete scusarmi, stavolta vi mando a casa Il Monco per convincervi...

...prima la Message Queue o gli IPC Socket? Tanto sono spacciati lo stesso...

E come anticipato dalla frase introduttiva, questa volta tratteremo altre due POSIX IPC, la Message Queue e la UNIX domain socket (IPC socket)  (nella prima parte avevamo parlato di FIFO (Named Pipe), ricordate?). Cominceremo dalla Message Queue che, come già detto, è:

Message Queue: comunicazione tra due o più processi con capacità full duplex. I processi comunicano tra loro pubblicando un messaggio e recuperandolo dalla coda. Una volta recuperato, il messaggio non è più disponibile nella coda.

Il meccanismo di uso è molto (ma molto) simile a quello della FIFO e, praticamente, basta sostituire le chiamate open(), close(), read(), write() e remove() con le corrispondenti chiamate della famiglia mq: mq_open(), mq_close(), mq_send(), mq_receive() e mq_unlink(). Ok, ci siamo: possiamo vedere in una sola botta l'header data.h, il padre processes.c e i due figli writer.c e reader.c... vai col codice!

#ifndef DATA_H
#define DATA_H

// path della message queue
#define MQUEUE_PATH "/mymqueue"

// numero di messaggi da scambiare per il benchmark
#define N_MESSAGES 2000000

// struttura Data per i messaggi
typedef struct {
unsigned long index; // indice dei dati
char text[1024]; // testo dei dati
} Data;

#endif /* DATA_H */


// processes.c - main processo padre
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <mqueue.h>
#include <sys/wait.h>
#include "data.h"

// funzione main()
int main(int argc, char* argv[])
{
// set attributi della message queue
struct mq_attr ma;
ma.mq_maxmsg = 10; // numero massimo di messaggi permessi nella coda
ma.mq_msgsize = sizeof(Data); // massimo size di un messaggio (in byte)
// creo la message queue
if (mq_open(MQUEUE_PATH, O_RDWR | O_CREAT, 0700, &ma) == -1) {
// errore di creazione
printf("%s: non posso creare la message queue (%s)\n", argv[0], strerror(errno));
exit(EXIT_FAILURE);
}

// crea i processi figli
pid_t pid1, pid2;
(pid1 = fork()) && (pid2 = fork());

// test pid processi
if (pid1 == 0) {
// sono il figlio 1
printf("sono il figlio 1 (%d): eseguo il nuovo processo\n", getpid());
char *pathname = "reader";
char *newargv[] = { pathname, NULL };
execv(pathname, newargv);
exit(EXIT_FAILURE); // exec non ritorna mai
}
else if (pid2 == 0) {
// sono il figlio 2
printf("sono il figlio 2 (%d): eseguo il nuovo processo\n", getpid());
char *pathname = "writer";
char *newargv[] = { pathname, NULL };
execv(pathname, newargv);
exit(EXIT_FAILURE); // exec non ritorna mai
}
else if (pid1 > 0 && pid2 > 0) {
// sono il padre
printf("sono il padre (%d): attendo la terminazione dei figli\n", getpid());
int status;
pid_t wpid;
while ((wpid = wait(&status)) > 0)
printf("sono il padre (%d): figlio %d terminato (%d)\n", getpid(),
(int)wpid, status);

// rimuovo la message queue ed esco
printf("%s: processi terminati\n", argv[0]);
mq_unlink(MQUEUE_PATH);
exit(EXIT_SUCCESS);
}
else {
// errore nella fork(): rimuovo la message queue ed esco
printf("%s: fork error (%s)\n", argv[0], strerror(errno));
mq_unlink(MQUEUE_PATH);
exit(EXIT_FAILURE);
}
}


// writer.c - main processo figlio
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <mqueue.h>
#include "data.h"

// funzione main()
int main(int argc, char *argv[])
{
// apro la message queue in modo scrittura
printf("processo %d partito\n", getpid());
mqd_t mq;
if ((mq = mq_open(MQUEUE_PATH, O_WRONLY)) == -1) {
// errore di apertura
printf("%s: non posso aprire la message queue (%s)\n", argv[0], strerror(errno));
exit(EXIT_FAILURE);
}

// loop di scrittura messaggi per il reader
Data my_data;
my_data.index = 0;
do {
// test index per forzare l'uscita
if (my_data.index == N_MESSAGES) {
// il processo chiude la message queue ed esce per indice raggiunto
printf("processo %d terminato (text=%s index=%ld)\n",
getpid(), my_data.text, my_data.index);
mq_close(mq);
exit(EXIT_SUCCESS);
}

// compongo il messaggio e lo invio
my_data.index++;
snprintf(my_data.text, sizeof(my_data.text), "un-messaggio-di-test:%ld",
my_data.index);
} while (mq_send(mq, (char *)(&my_data), sizeof(Data), 1) != -1);

// il processo chiude la message queue ed esce per altro motivo (errore)
printf("processo %d terminato con errore (%s)\n", getpid(), strerror(errno));
mq_close(mq);
exit(EXIT_FAILURE);
}


// reader.c - main processo figlio
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <mqueue.h>
#include <time.h>
#include <sys/time.h>
#include "data.h"

// funzione main()
int main(int argc, char *argv[])
{
// apro la message queue in modo lettura
printf("processo %d partito\n", getpid());
mqd_t mq;
if ((mq = mq_open(MQUEUE_PATH, O_RDONLY)) == -1) {
// errore di apertura
printf("%s: non posso aprire la message queue (%s)\n", argv[0], strerror(errno));
exit(EXIT_FAILURE);
}

// set clock e time per calcolare il tempo di CPU e il tempo di sistema
clock_t t_start = clock();
struct timeval tv_start;
gettimeofday(&tv_start, NULL);

// loop di lettura messaggi dal writer
Data my_data;
while (mq_receive(mq, (char *)(&my_data), sizeof(Data), NULL) != -1) {
// test index per forzare l'uscita
if (my_data.index == N_MESSAGES) {
// get clock e time per calcolare il tempo di CPU e il tempo di sistema
clock_t t_end = clock();
double t_passed = ((double)(t_end - t_start)) / CLOCKS_PER_SEC;
struct timeval tv_end, tv_elapsed;
gettimeofday(&tv_end, NULL);
timersub(&tv_end, &tv_start, &tv_elapsed);

// il processo chiude la message queue ed esce per indice raggiunto
printf("reader: ultimo messaggio ricevuto: %s\n", my_data.text);
printf("processo %d terminato "
"(index=%ld CPU time elapsed: %.3f - total time elapsed:%ld.%ld)\n",
getpid(), my_data.index, t_passed, tv_elapsed.tv_sec,
tv_elapsed.tv_usec / 1000);
mq_close(mq);
exit(EXIT_SUCCESS);
}
}

// il processo chiude la message queue ed esce per altro motivo (errore)
printf("processo %d terminato con errore (%s)\n", getpid(), strerror(errno));
mq_close(mq);
exit(EXIT_FAILURE);
}

Tutto chiaro? Sicuramente, perché, come promesso, è quasi sovrapponibile al codice visto nell'ultimo articolo, quindi è veramente molto semplice da interpretare, supponendo che anche il primo esempio lo fosse (e lo spero vivamente!). La differenza maggiore (e più interessante) sta nell'inizializzazione della struttura mq_attr (che ho messo nel processo padre), che permette, rispetto alla Named Pipe, una maggiore personalizzazione del funzionamento. E già che ci siamo vi faccio notare anche un piccolo “trucchetto” che ho applicato:

Anche se la famiglia di funzioni mq_ è orientata a manipolare messaggi di testo, è possibile "ingannare" le funzioni per manipolare quello che si vuole: nel nostro caso vogliamo scrivere/leggere una struct Data e per realizzarlo è sufficiente fare (nelle send/recv) un cast a char* del buffer: non bisogna mai dimenticare che una struttura, anche complessa, è pur sempre un insieme (anzi, un array) di byte, quindi questa operazione è del tutto legale (ma non cercate mai di spedire strutture che contengono pointer... questa si che non è una buona idea).

Quindi, riepilogando: semplicità d’uso e e flessibilità e… i risultati? Vediamoli:

sono il padre (19567): attendo la terminazione dei figli
sono il figlio 1 (19568): eseguo il nuovo processo
sono il figlio 2 (19569): eseguo il nuovo processo
processo 19568 partito
processo 19569 partito
processo 19569 terminato (text=un-messaggio-di-test:2000000 index=2000000)
reader: ultimo messaggio ricevuto: un-messaggio-di-test:2000000
processo 19568 terminato (index=2000000 CPU time elapsed: 1.832 - total time elapsed:1.860)
sono il padre (19567): figlio 19568 terminato (0)
sono il padre (19567): figlio 19569 terminato (0)
./processes: processi terminati

Non male, no? È solo leggermente più lenta della Named Pipe (che era velocissima). In questo caso abbiamo un messaggio ogni 0,92 us (e con la Pipe era uno ogni 0,87 us). Insomma, la Message Queue passa a pieni voti l'esame per modo d'uso e prestazioni e si pone allo stesso livello della Fifo.

Ok, e adesso a che POSIX IPC tocca? Ma alla UNIX Domain Socket! E anche in questo caso può tornare utile un piccolo promemoria:

UNIX domain socket (IPC socket): comunicazione tra processi che usa i socket del Kernel (che non usano TCP/IP). Nel nostro caso (IPC tra processi locali) è preferibile, per ovvi motivi, usare questi socket invece dei Network socket.

Il meccanismo di uso è, ancora una volta, simile a quello della Fifo, anche se il modello utilizzato è il classico dei socket Server/Client. Comunque, per mantenere la coerenza della trattazione, ho mantenuto la nomenclatura reader/writer riuscendo a ottenere, tutto sommato, un codice abbastanza simile a quello degli esempi già visti. E anche in questo caso vediamo in una botta sola l'header data.h, il padre processes.c e i due figli writer.c e reader.c... vai col codice!

#ifndef DATA_H
#define DATA_H

// path del file per ipc socket
#define IPCS_PATH "myipcs"

// numero di messaggi da scambiare per il benchmark
#define N_MESSAGES 2000000

// struttura Data per i messaggi
typedef struct {
unsigned long index; // indice dei dati
char text[1024]; // testo dei dati
} Data;

#endif /* DATA_H */
// processes.c - main processo padre
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/wait.h>
#include "data.h"

// funzione main()
int main(int argc, char* argv[])
{
// crea i processi figli
pid_t pid1, pid2;
(pid1 = fork()) && (pid2 = fork());

// test pid processi
if (pid1 == 0) {
// sono il figlio 1
printf("sono il figlio 1 (%d): eseguo il nuovo processo\n", getpid());
char *pathname = "reader";
char *newargv[] = { pathname, NULL };
execv(pathname, newargv);
exit(EXIT_FAILURE); // exec non ritorna mai
}
else if (pid2 == 0) {
// sono il figlio 2
printf("sono il figlio 2 (%d): eseguo il nuovo processo\n", getpid());
char *pathname = "writer";
char *newargv[] = { pathname, NULL };
execv(pathname, newargv);
exit(EXIT_FAILURE); // exec non ritorna mai
}
else if (pid1 > 0 && pid2 > 0) {
// sono il padre
printf("sono il padre (%d): attendo la terminazione dei figli\n", getpid());
int status;
pid_t wpid;
while ((wpid = wait(&status)) > 0)
printf("sono il padre (%d): figlio %d terminato (%d)\n", getpid(),
(int)wpid, status);

// esco
printf("%s: processi terminati\n", argv[0]);
exit(EXIT_SUCCESS);
}
else {
// errore nella fork(): esco
printf("%s: fork error (%s)\n", argv[0], strerror(errno));
exit(EXIT_FAILURE);
}
}
// writer.c - main processo figlio
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include "data.h"

// funzione main()
int main(int argc, char *argv[])
{
// creo il socket in modo IPC e Datagram
printf("processo %d partito\n", getpid());
int sock;
if ((sock = socket(AF_UNIX, SOCK_DGRAM, 0)) == -1) {
// errore di creazione
printf("%s: non posso creare il socket (%s)\n", argv[0], strerror(errno));
exit(EXIT_FAILURE);
}

// prepara la struttura sockaddr_un per il reader (è un server) remoto
struct sockaddr_un reader;
memset(&reader, 0, sizeof(struct sockaddr_un));
reader.sun_family = AF_UNIX;
strcpy(reader.sun_path, IPCS_PATH);

// loop di scrittura messaggi per il reader
Data my_data;
my_data.index = 0;
do {
// test index per forzare l'uscita
if (my_data.index == N_MESSAGES) {
// il processo chiude il socket ed esce per indice raggiunto
printf("processo %d terminato (text=%s index=%ld)\n",
getpid(), my_data.text, my_data.index);
close(sock);
exit(EXIT_SUCCESS);
}

// compongo il messaggio e lo invio
my_data.index++;
snprintf(my_data.text, sizeof(my_data.text), "un-messaggio-di-test:%ld",
my_data.index);
} while (sendto(sock, &my_data, sizeof(my_data), 0,
(struct sockaddr *)&reader, sizeof(reader)) != -1);

// il processo chiude il socket ed esce per altro motivo (errore)
printf("processo %d terminato con errore (%s)\n", getpid(), strerror(errno));
close(sock);
exit(EXIT_FAILURE);
}
// reader.c - main processo figlio
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <time.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <sys/un.h>
#include "data.h"

// funzione main()
int main(int argc, char *argv[])
{
// creo il socket in modo IPC e Datagram
printf("processo %d partito\n", getpid());
int sock;
if ((sock = socket(AF_UNIX, SOCK_DGRAM, 0)) == -1) {
// errore di creazione
printf("%s: non posso creare il socket (%s)\n", argv[0], strerror(errno));
exit(EXIT_FAILURE);
}

// prepara la struttura sockaddr_un per questo reader (è un server)
struct sockaddr_un reader;
memset(&reader, 0, sizeof(struct sockaddr_un));
reader.sun_family = AF_UNIX;
strcpy(reader.sun_path, IPCS_PATH);

// associa l'indirizzo del reader al socket (questo crea il file IPCS_PATH)
if (bind(sock, (struct sockaddr *)&reader, sizeof(reader)) == -1) {
// errore di bind
printf("%s: errore di bind (%s)", argv[0], strerror(errno));
exit(EXIT_FAILURE);
}

// set clock e time per calcolare il tempo di CPU e il tempo di sistema
clock_t t_start = clock();
struct timeval tv_start;
gettimeofday(&tv_start, NULL);

// loop di lettura messaggi dal writer
Data my_data;
while (recv(sock, &my_data, sizeof(my_data), 0) != -1) {
// test index per forzare l'uscita
if (my_data.index == N_MESSAGES) {
// get clock e time per calcolare il tempo di CPU e il tempo di sistema
clock_t t_end = clock();
double t_passed = ((double)(t_end - t_start)) / CLOCKS_PER_SEC;
struct timeval tv_end, tv_elapsed;
gettimeofday(&tv_end, NULL);
timersub(&tv_end, &tv_start, &tv_elapsed);

// il processo chiude e cancella il socket ed esce per indice raggiunto
printf("reader: ultimo messaggio ricevuto: %s\n", my_data.text);
printf("processo %d terminato "
"(index=%ld CPU time elapsed: %.3f - total time elapsed:%ld.%ld)\n",
getpid(), my_data.index, t_passed, tv_elapsed.tv_sec,
tv_elapsed.tv_usec / 1000);
close(sock);
unlink(IPCS_PATH);
exit(EXIT_SUCCESS);
}
}

// il processo chiude e cancella il socket ed esce per altro motivo (errore)
printf("processo %d terminato con errore (%s)\n", getpid(), strerror(errno));
close(sock);
unlink(IPCS_PATH);
exit(EXIT_FAILURE);
}

Visto? È un esempio abbastanza classico di Server + Client  come quello già mostrato in alcuni miei vecchi articoli (qui e qui), solo che in questo caso ho optato per l'uso del modo datagram-oriented che, con l'obiettivo di una comunicazione veloce in locale, è sicuramente il più adatto e fornisce le prestazioni migliori (la "reliability" in UNIX Domain è molto alta, e non c'è motivo di scegliere la versione stream-oriented). Rispetto ai vari esempi di di questo tipo che si trovano in rete faccio notare due dettagli interessanti che spesso si omettono:

  1. Nel reader  bisogna rimuovere, in chiusura, il file (corrispondente al socket) che crea automaticamente la bind(): senza questo passo le chiamate successive al reader fallirebbero (con un errore EINVAL "The socket is already bound to an address").
  2. Nel reader si può usare la recv() invece della recvFrom() (che è comune nei Server datagram-oriented) perché il nostro reader si limita a leggere e non scrive risposte, quindi non gli interessa l'indirizzo del writer (ottenibile usando la rcvfrom()). Tra l'altro si potrebbe addirittura usare la read() (con qualche accorgimento, però). Infatti, come dice il manuale della recv(2) di Linux:

The only difference between recv() and read(2) is the presence of flags. With a zero flags argument, recv() is generally equivalent to read(2) (but see NOTES). Also, the following call 
recv(sockfd, buf, len, flags); 
is equivalent to
recvfrom(sockfd, buf, len, flags, NULL, NULL);

E adesso è venuto il momento dei risultati... squillino le trombe!

sono il padre (20035): attendo la terminazione dei figli
sono il figlio 1 (20036): eseguo il nuovo processo
sono il figlio 2 (20037): eseguo il nuovo processo
processo 20036 partito
processo 20037 partito
processo 20037 terminato (text=un-messaggio-di-test:2000000 index=2000000)
reader: ultimo messaggio ricevuto: un-messaggio-di-test:2000000
processo 20036 terminato (index=2000000 CPU time elapsed: 3.607 - total time elapsed:4.552)
sono il padre (20035): figlio 20037 terminato (0)
sono il padre (20035): figlio 20036 terminato (0)
./processes: processi terminati

Ahi ahi ahi... pare che la IPC socket, sia pur veloce in assoluto (scambia un messaggio ogni 1,8 us) è un po' più lenta delle Fifo e Message queue. E vabbè, ce ne faremo una ragione: promossa ma con riserva, anche considerando che è un po' più complicata da usare.

E anche per oggi abbiamo terminato. Nel prossimo articolo parleremo dell'ultimo metodo in analisi, la Shared Memory, e, se avanzano tempo e voglia, faremo anche un interessante confronto con una versione multithread molto simile. Stay tuned!

Ciao, e al prossimo post!