Nei titoli e nei testi troverete qualche rimando cinematografico (ebbene si, sono un cinefilo). Se non vi interessano fate finta di non vederli, già che non sono fondamentali per la comprensione dei post...

Di questo blog ho mandato avanti, fino a Settembre 2018, anche una versione in Spagnolo. Potete trovarla su El arte de la programación en C. Buona lettura.

Visualizzazione post con etichetta IPC socket. Mostra tutti i post
Visualizzazione post con etichetta IPC socket. Mostra tutti i post

giovedì 11 luglio 2024

Furiosa Go
come spedire una struttura in Go - pt.2

Smeg: Chi sono?
Dementus: Qualcuno competente ed eccessivamente risentito.
Smeg: Cosa pensi che vogliano?
Dementus: Me, senza il mio equipaggio.

Ed eccoci di nuovo sul pezzo. E, visto che si tratta della seconda parte dell'articolo Fury Go, non posso non agganciarmi allo splendido prequel di Mad Max: Fury Road, e cioè a Furiosa: A Mad Max Saga diretto sempre dal Maestro George Miller. E, in effetti, anche questo articolo è un prequel: nell'altro avevo descritto come spedire una struttura in Go via IPC socket (UNIX domain socket), e il Software l'avevo scritto partendo da una mia versione "base" che inviava solo dei semplici testi. Ed ecco, in questa seconda parte vi mostrerò, come prequel, la versione base, dimostrandovi, come promesso, che le prestazioni buone ma non eccellenti del benchmark erano dovute (spoiler) solo alla codifica della struttura, perché con i soli testi il Go va come un treno!

...adesso vi faccio vedere cosa sa fare il Go...

Ok, e allora andiamo con il prequel, descrivendo la versione base che ho scritto prima di quella che usa le strutture. Siamo pronti? Si? E allora partiamo direttamente con il codice!

// reader.go - main processo figlio: è un reader (un server) su IPC socket
package main

import (
"bufio"
"fmt"
"net"
"os"
"time"
)

// funzione main
func main() {

// start ascolto sul file di scambio "myipcs" (con UNIX domain socket)
fmt.Printf("processo %d partito (reader)\n", os.Getpid())
addr := net.UnixAddr{Name: "./myipcs", Net: "unix"}
lner, err := net.ListenUnix("unix", &addr)
if err != nil {
// errore listen
fmt.Println(err)
return
}

// prenoto la chiusura del listener e rimuovo (eventualmente) il file di scambio
defer lner.Close()
defer os.Remove("./myipcs")

// accetta connessioni da un writer entrante
conn, err := lner.AcceptUnix()
if err != nil {
// errore accept
fmt.Println(err)
return
}

// set time di partenza per calcolare il tempo impiegato
start := time.Now()

// loop di lettura messaggi dal writer
n_msg := 0
connrdr := bufio.NewReader(conn) // reader sulla connessione
for {
// leggo con il conn reader
client_msg, err := connrdr.ReadString('\n')
if err != nil {
// errore di lettura
fmt.Println(err)
return
}

// test numero messaggi per forzare l'uscita
n_msg++
if n_msg == 2000000 {
// il processo chiude la connessione ed esce per numero raggiunto
fmt.Printf("reader: ultimo messaggio ricevuto: %s", client_msg)
fmt.Printf("reader: processo %d terminato (messaggi=%d tempo totale:%s)\n",
os.Getpid(), n_msg,
time.Since(start).Truncate(time.Millisecond).String())
conn.Close()
return
}
}
}
// writer.go - main processo figlio: è un writer (un client) su IPC socket
package main

import (
"fmt"
"net"
"os"
"time"
)

// funzione main
func main() {

// mi assicuro che il writer parta dopo il reader
fmt.Printf("processo %d partito (writer)\n", os.Getpid())
time.Sleep(100 * time.Millisecond)

// connessione al server remoto sul file di scambio "myipcs"
addr := net.UnixAddr{Name: "./myipcs", Net: "unix"}
conn, err := net.DialUnix("unix", nil, &addr)
if err != nil {
// errore dial
fmt.Println(err)
return
}

// loop di scrittura messaggi per il reader
var my_text string
index := 0
for {
// test index per forzare l'uscita
if index == 2000000 {
// il processo chiude la connessione ed esce per indice raggiunto
fmt.Printf("writer: processo %d terminato (text=%s messaggi=%d)\n",
os.Getpid(), my_text, index)
conn.Close()
return
}

// compongo il messaggio e lo invio
index++
my_text = fmt.Sprintf("un-messaggio-di-test:%d\n", index)

// invio il messaggio al server remoto
_, err = conn.Write([]byte(my_text))
if err != nil {
fmt.Println("errore di invio: ", err)
return
}
}
}

Come avrete notato dalla descrizione nella prima linea (e anche dal codice, spero!) ho usato anche questa volta gli IPC socket. Poi ho anche scritto la versione con i Network Socket, ma non vi mostrerò il codice perché è quasi identico. Effettivamente, per la magia del Go, il codice è semplicissimo rispetto alla analoga versione in C vista qui (in questo caso era la versione "fast"), ed è anche quasi identico alla versione con le strutture dello scorso articolo. Come sempre il codice è stra-commentato, e credo che possa essere facilmente compreso anche da chi non conosce il Go, però mi interessa, a questo punto, aggiungere qualche dettaglio per far notare le (poche) differenze rispetto a quello dello scorso articolo.

Cominciamo, allora, con il writer: in entrambe versioni si crea, inizialmente, un oggetto "connessione" conn:

// connessione al server remoto sul file di scambio "myipcs"
addr := net.UnixAddr{Name: "./myipcs", Net: "unix"}
conn, err := net.DialUnix("unix", nil, &addr)

Poi, nella versione con le strutture, si invia il messaggio attraverso il codec encoding/gob passandogli la connessione:

// set encoder e spedizione dall'encoder
encoder := gob.NewEncoder(conn)
err = encoder.Encode(message)

Invece, nella versione base si scrive, direttamente, con l'oggetto connessione creato all'inizio:

// invio il messaggio al server remoto
_, err = conn.Write([]byte(my_text))

E ora passiamo al reader: anche qui, in entrambe versioni, si crea un oggetto "connessione" conn:

// accetta connessioni da un writer entrante
conn, err := lner.AcceptUnix()

Poi, nella versione con le strutture, si riceve il messaggio attraverso il codec passandogli la connessione:

// set decoder e ricezione dal decoder
decoder := gob.NewDecoder(conn)
decoder.Decode(&message)

Invece, nella versione base, si legge con un oggetto "Reader" (della libreria bufio) creato sulla connessione vista sopra:

connrdr := bufio.NewReader(conn) // reader sulla connessione
for {
// leggo con il conn reader
client_msg, err := connrdr.ReadString('\n')

Come avrete notato le differenze sono poche ma significative. Notare anche che nella versione con le strutture il testo del messaggio è una stringa dentro la struttura Message, mentre nella versione base il messaggio è direttamente una stringa terminata con un "newline": questo è un particolare importante, perché in fase di ricezione con la funzione ReadString appena vista è necessario specificare qual'è il terminatore di stringa.

E vabbé, so che siete curiosi, è ora di passare ai risultati! Di seguito i risultati dei benchmark delle "versioni base" che inviano solo testi in Go:

aldo@Linux $ cd ../go-ipcsocketbase/
aldo@Linux $ ./processes
sono il padre (15381): attendo la terminazione dei figli
sono il figlio 1 (15382): eseguo il nuovo processo
sono il figlio 2 (15383): eseguo il nuovo processo
processo 15382 partito (reader)
processo 15383 partito (writer)
writer: processo 15383 terminato (text=un-messaggio-di-test:2000000
messaggi=2000000)
reader: ultimo messaggio ricevuto: un-messaggio-di-test:2000000
reader: processo 15382 terminato (messaggi=2000000 tempo totale:3.058s)
sono il padre (15381): figlio 15382 terminato (0)
sono il padre (15381): figlio 15383 terminato (0)
./processes: processi terminati
aldo@Linux $ cd ../go-socketbase/
aldo@Linux $ ./processes
sono il padre (15408): attendo la terminazione dei figli
sono il figlio 1 (15409): eseguo il nuovo processo
sono il figlio 2 (15410): eseguo il nuovo processo
processo 15410 partito (writer)
processo 15409 partito (reader)
writer: processo 15410 terminato (text=un-messaggio-di-test:2000000
messaggi=2000000)
reader: ultimo messaggio ricevuto: un-messaggio-di-test:2000000
reader: processo 15409 terminato (messaggi=2000000 tempo totale:8.46s)
sono il padre (15408): figlio 15409 terminato (0)
sono il padre (15408): figlio 15410 terminato (0)
./processes: processi terminati

A questo punto i risultati parlano senza temi di smentite: i tempi realizzati sono decisamente migliori di quelli mostrati per le versioni che spedivano strutture complesse, quindi è evidente che la maggior parte del tempo di CPU se la mangiava la libreria specializzata encoding/gob, che funziona bene però, a quanto pare, non è un fulmine. E, grazie ai test appena mostrati sopra si può affermare (come anticipato nella prima parte dell'articolo) che il Go è un linguaggio notevolmente veloce, alla faccia di chi pensa il contrario... Notare che la versione IPC con i suoi 3.058s è addirittura veloce come la versione in C (che impiegava 3.309s ma con messaggi leggermente più lunghi a causa della presenza dell'indice)! Anche in questo caso (come già nello scorso articolo) la versione con i Network socket è un po' più lenta (8.46s) ma è, comunque, sufficientemente veloce.

Ok, credo che, per il momento si può chiudere la parentesi Go sulla comunicazione tra processi: credo che i risultati siano stati interessanti, specialmente per il fatto di avere confrontato codici analoghi per C e Go. Non so di cosa parlerò` prossimamente: in questo momento ho in mente solo le prossime (meritate) vacanze. Ci sentiremo più avanti, ben rilassati e pronti per nuove avventure in C (o in Go...)!

Ciao, e al prossimo post!

sabato 22 giugno 2024

Fury Go
come spedire una struttura in Go - pt.1

Toast: Che stai facendo?
Dag: Prego.
Toast: Chi preghi?
Dag: Chiunque ci ascolti.

Nell'ultimo articolo avevo giurato che l'argomento Fast IPC era, "almeno momentaneamente", chiuso. Poi, mentre riguardavo, per l'ennesima volta, lo stupendo Mad Max: Fury Road del Maestro George Miller, ho avuto un flash (si, ma non vi preoccupate, è durato solo un microsecondo, mentre guardo un film sono sempre molto concentrato). Il flash era questo: "E se ripetessi i test IPC socket usando il Go?" L'idea era intrigante e, alla fine, ho ceduto a me stesso. E così ho anche capito che riesco, con grande facilità, a non mantenere le promesse... avrò mica un gran futuro come politico? ah ah ah.

...e se provassi a farlo con il Go?...

E allora veniamo al dunque: il titolo qui sopra "come spedire una struttura in Go" è un po' fuorviante riguardo alla premessa iniziale: l'idea è ripetere alcuni benchmark usando il Go (Golang per gli amici) però per farlo bisogna scontrarsi un po' con una delle differenze che ha questo linguaggio rispetto al C (e al C++); come ricorderete (e se no potreste fare una rapida rilettura di quel vecchio post) i test erano basati sull'invio "a raffica" di molti messaggi (2000000!) composti così:

// struttura Data per i messaggi
typedef struct {
unsigned long index; // indice dei dati
char text[1024]; // testo dei dati
} Data;

La scelta di questa struttura non era casuale: avrei potuto, più semplicemente, inviare solo dei messaggi di testo, ma avevo deciso di inviare dati complessi ("Data" contiene solo due campi ma potrebbe contenerne moltissimi) per rimarcare che con il C è usuale trattare dati di qualsiasi tipo, e chi riceve un messaggio "complesso" lo può ricostruire semplicemente depositandolo in una variabile dello stesso tipo (ah, la potenza del C...). E poi, grazie alla struttura "Data" ho potuto anche mostrare come gestire un indice dei messaggi, il che non guasta mai.

E il Go? Come ben sapete (e ne ho parlato qui) il Go è un vero linguaggio ad alto livello, con tutti i pro e i contro che questo comporta. Tra i pro c'è, ovviamente, il fatto che è possibile scrivere applicazioni anche complesse con notevole semplicità e compattezza, sicuramente più del C (e C++). Però, quando si tratta di maneggiare dati a livello base (o meglio binario) il Go entra un po' in difficoltà, e questo è il caso che stiamo trattando: spedire (e ricevere) messaggi complessi (strutture) non è per nulla semplice e scontato come lo è per il nostro amato C. Ma è, comunque, possibile: e tra poco vedremo come e con quali prestazioni.

E ora, bando alle ciance, facciamo cantare il codice! Vediamo come sono i nostri reader.go e writer.go (equivalenti, più o meno, ai reader.c e writer.c visti qui). Per eseguire il benchmark è presente anche l'onnipresente processes.c, che vi risparmio perché è rimasto invariato. Vai col codice!

// reader.go - main processo figlio: è un reader (un server) su IPC socket
package main

import (
"encoding/gob"
"fmt"
"net"
"os"
"time"
)

// struttura Message per i messaggi
type Message struct {
Index int // indice dei dati
Text string // testo dei dati
}

// funzione main
func main() {

// start ascolto sul file di scambio "myipcs" (con UNIX domain socket)
fmt.Printf("processo %d partito (reader)\n", os.Getpid())
addr := net.UnixAddr{Name: "./myipcs", Net: "unix"}
lner, err := net.ListenUnix("unix", &addr)
if err != nil {
// errore listen
fmt.Println(err)
return
}

// prenoto la chiusura del listener e rimuovo (eventualmente) il file di scambio
defer lner.Close()
defer os.Remove("./myipcs")

// accetta connessioni da un writer entrante
conn, err := lner.AcceptUnix()
if err != nil {
// errore accept
fmt.Println(err)
return
}

// set time di partenza per calcolare il tempo impiegato
start := time.Now()

// loop di lettura messaggi dal writer
n_msg := 0
var message Message
for {
// set decoder e ricezione dal decoder
decoder := gob.NewDecoder(conn)
decoder.Decode(&message)

// test numero messaggi per forzare l'uscita
n_msg++
if n_msg == 2000000 {
// il processo chiude la connessione ed esce per numero raggiunto
fmt.Printf("reader: ultimo messaggio ricevuto: %s\n", message.Text)
fmt.Printf("reader: processo %d terminato (messaggi=%d tempo totale:%s)\n",
os.Getpid(), n_msg,
time.Since(start).Truncate(time.Millisecond).String())
conn.Close()
return
}
}
}
// writer.go - main processo figlio: è un writer (un client) su IPC socket
package main

import (
"encoding/gob"
"fmt"
"net"
"os"
"time"
)

// struttura Message per i messaggi
type Message struct {
Index int // indice dei dati
Text string // testo dei dati
}

// funzione main
func main() {

// mi assicuro che il writer parta dopo il reader
fmt.Printf("processo %d partito (writer)\n", os.Getpid())
time.Sleep(100 * time.Millisecond)

// connessione al server remoto sul file di scambio "myipcs"
addr := net.UnixAddr{Name: "./myipcs", Net: "unix"}
conn, err := net.DialUnix("unix", nil, &addr)
if err != nil {
// errore dial
fmt.Println(err)
return
}

// loop di scrittura messaggi per il reader
var message Message
message.Index = 0
for {
// test index per forzare l'uscita
if message.Index == 2000000 {
// il processo chiude la connessione ed esce per indice raggiunto
fmt.Printf("writer: processo %d terminato (text=%s messaggi=%d)\n",
os.Getpid(), message.Text, message.Index)
conn.Close()
return
}

// compongo il messaggio e lo invio
message.Index++
message.Text = fmt.Sprintf("un-messaggio-di-test:%d", message.Index)

// set encoder e spedizione dall'encoder
encoder := gob.NewEncoder(conn)
err = encoder.Encode(message)
if err != nil {
fmt.Println("errore di codifica: ", err)
return
}
}
}

Come avrete notato dalla descrizione nella prima linea (e anche dal codice, spero!) ho usato per il test gli IPC socket (UNIX domain socket). Poi ho ripetuto anche con i Network Socket, ma non mostrerò il codice perché è quasi identico. Effettivamente, per la magia del Go, il codice è semplicissimo rispetto alla analoga versione in C citata (che in questo caso era la versione "fast").

Però la complessità dell'operazione di spedire strutture complesse è mascherata dall'uso di un libreria specializzata, la encoding/gob, senza la quale il codice sarebbe molto più complesso (ebbene si, una libreria specializzata per una operazione semplice per il C ma complicata per il Go). E, come vedremo tra poco, le prestazioni non sono eccellenti come ci si aspetterebbe (spoiler: per colpa della encoding/gob). Comunque il codice è stra-commentato, e credo che possa essere facilmente compreso anche da chi non conosce il Go, per cui non mi dilungherò in spiegazioni superflue.

E vabbé, so che siete curiosi, è ora di passare ai risultati! Di seguito i risultati del benchmark in Go e, per comparazione, vi riporto anche i risultati della versione C:

aldo@Linux $ cd go-fastipcsocket/
aldo@Linux $ ./processes
sono il padre (18903): attendo la terminazione dei figli
sono il figlio 1 (18904): eseguo il nuovo processo
sono il figlio 2 (18905): eseguo il nuovo processo
processo 18905 partito (writer)
processo 18904 partito (reader)
writer: processo 18905 terminato (text=un-messaggio-di-test:2000000 messaggi=2000000)
sono il padre (18903): figlio 18905 terminato (0)
reader: ultimo messaggio ricevuto: un-messaggio-di-test:2000000
reader: processo 18904 terminato (messaggi=2000000 tempo totale:13.081s)
sono il padre (18903): figlio 18904 terminato (0)
./processes: processi terminati
aldo@Linux $ cd fastipcsocket/
aldo@Linux $ ./processes
sono il padre (14990): attendo la terminazione dei figli
sono il figlio 1 (14991): eseguo il nuovo processo
sono il figlio 2 (14992): eseguo il nuovo processo
processo 14991 partito (reader)
processo 14992 partito (writer)
writer: processo 14992 terminato (text=un-messaggio-di-test:2000000 messaggi=2000000)
sono il padre (14990): figlio 14992 terminato (0)
reader: ultimo messaggio ricevuto: un-messaggio-di-test:2000000
reader: processo 14991 terminato (messaggi=2000000 tempo CPU: 3.309 - tempo totale:3.309s)
sono il padre (14990): figlio 14991 terminato (0)
./processes: processi terminati

Ebbene si, per trattare 2000000 (!) di messaggi la versione C ha bisogno di 10 secondi in meno! (13.081s vs 3.309s). Però, a questo punto, bisogna fare qualche considerazione:

  1. Come versione di riferimento in C ho usato quella "fast", visto che il meccanismo della versione Go è a size variabile ed è, quindi, somigliante. Comunque anche usando la versione C "normal" la differenza è alta: 8 secondi (13.081s vs 4.823s). (Ho scritto un sacco di benchmark... ma non ve li mostro tutti per non farvi addormentare, ah ah ah).
  2. Vi riporto, per curiosità, i risultati delle versioni con i Network Socket: 26.794s per il Go e 3.88s per il C. Questo era previsto, gli IPC socket essendo "locali" sono mediamente più veloci dei Network Socket, anche se il peggioramento della differenza Go vs C un po' sorprende.
  3. Comunque, non fatevi ingannare dalle prestazioni: in termini assoluti 13.081s (e 26.794s) per 2000000 di messaggi sono, comunque, pochi! Il Go è un linguaggio veloce!

E, riguardo al punto 3 appena mostrato qui sopra, vi cito lo spoiler accennato poco fa (...per colpa della "encoding/gob"...): nella seconda parte dell'articolo (in arrivo prossimamente su questi schermi) vi faro vedere di che cosa è capace il Go quando maneggia solo testi.

Ok, per oggi può bastare: per il momento vi saluto, e vi raccomando, come sempre, di non trattenere il respiro in attesa della seconda parte (potrebbe nuocere gravemente alla vostra salute, ah ah ah).

Ciao, e al prossimo post!

martedì 21 maggio 2024

The Fastsocket
come velocizzare le POSIX IPC in C - pt.2

Enigmista: Enigma numero due! Se sei giustizia, non mentire è tuo dovere. Qual è il tuo prezzo per fingere di non vedere?
Colson: "Il prezzo"?
Batman: Le mazzette. Le ha chiesto quanto ha preso per chiudere un occhio.

E rieccoci qui, siamo di nuovo sul pezzo. Devo dirlo, non avevo previsto una seconda parte per l'articolo sulle Fastpipe; però la chiusura che avevo scritto, quella sulla (presunta) complessità si usare il meccanismo dei messaggi a lunghezza variabile su altre POSIX IPC (oltre alle Named Pipe) mi ha fatto sentire un po' in colpa, e quindi ho deciso di estendere un il discorso (prima che qualcuno mi facesse notare che avevo esagerato sulle difficoltà, ah ah ah). E quindi rispolveriamo il nostro film ispiratore della prima parte, il bel bel The Batman, del bravo Matt Reeves. La frase citata sopra è molto attuale (e vabbè, più che attuale direi che è, ahimè, quasi una costante della storia...) ma, per quanto riguarda l'articolo precedente, vi assicuro che nessuno mi ha pagato per dire che era complicata una cosa che non lo è: è stata solo una svista (o uno scherzetto da Enigmista, fate voi, ah ah ah).

...tu non hai idea di quanto sia veloce un Fastsocket...

Ok: per riprendere il discorso ho deciso di mostrare come si può applicare il meccanismo dei messaggi a lunghezza variabile sui socket: visto che si parla di POSIX IPC l'ho fatto con gli IPC socket (UNIX domain socket, per gli amici), anche se sarebbe stato più comodo farlo direttamente con i Network socket (Internet domain socket, per gli amici), tanto sono praticamente la stessa cosa (si, non storcete il naso, è così).

Vi ricordo quale era il problema dei socket che rendeva impossibile al primo colpo l'implementazione usata con le pipe: la frammentazione dei messaggi tipica dei protocolli usati (il TCP/IP per i Network socket, e un protocollo simile per gli IPC socket che usano i Kernel socket  invece dei classici socket di rete). Questo problemino si può risolvere, in realtà, abbastanza semplicemente (e spoiler: anche in modo ultra-semplice, ma questo alla fine dell'articolo. Aspettate senza trattenere il fiato, mi raccomando!).

Bene, proseguiamo. L'obbiettivo è, quindi, fare un nuovo benchmark per confrontare le prestazioni degli IPC socket in modo "classic" con quelle del modo "fast". Vi mostrerò, direttamente, la versione "fast",  visto che la versione "classic" si differenzia solo nell'uso delle funzioni send(2) e recv(2) al posto, rispettivamente, delle nostre fastWrite() e fastRead(), più l'incapsulamento dei dati nel tipo Message (insomma, le differenze sono le stesse che ci sono tra pipe e Fastpipe  come visto nell'ultimo articolo). Vai col codice!

// processes.c - main processo padre
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/wait.h>
#include "data.h"

// funzione main()
int main(int argc, char* argv[])
{
// crea i processi figli
pid_t pid1, pid2;
(pid1 = fork()) && (pid2 = fork());

// test pid processi
if (pid1 == 0) {
// sono il figlio 1
printf("sono il figlio 1 (%d): eseguo il nuovo processo\n", getpid());
char *pathname = "reader";
char *newargv[] = { pathname, NULL };
execv(pathname, newargv);
exit(EXIT_FAILURE); // exec non ritorna mai
}
else if (pid2 == 0) {
// sono il figlio 2
printf("sono il figlio 2 (%d): eseguo il nuovo processo\n", getpid());
char *pathname = "writer";
char *newargv[] = { pathname, NULL };
execv(pathname, newargv);
exit(EXIT_FAILURE); // exec non ritorna mai
}
else if (pid1 > 0 && pid2 > 0) {
// sono il padre
printf("sono il padre (%d): attendo la terminazione dei figli\n", getpid());
int status;
pid_t wpid;
while ((wpid = wait(&status)) > 0)
printf("sono il padre (%d): figlio %d terminato (%d)\n", getpid(),
(int)wpid, status);

// esco
printf("%s: processi terminati\n", argv[0]);
exit(EXIT_SUCCESS);
}
else {
// errore nella fork(): esco
printf("%s: fork error (%s)\n", argv[0], strerror(errno));
exit(EXIT_FAILURE);
}
}
// writer.c - main processo figlio
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include "data.h"
#include "message.h"

// funzione main()
int main(int argc, char *argv[])
{
// creo il socket in modo IPC e Stream
printf("processo %d partito (writer)\n", getpid());
int sock;
if ((sock = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
// errore di creazione
printf("%s: non posso creare il socket (%s)\n", argv[0], strerror(errno));
exit(EXIT_FAILURE);
}

// prepara la struttura sockaddr_un per il reader (è un server) remoto
struct sockaddr_un reader;
memset(&reader, 0, sizeof(reader));
reader.sun_family = AF_UNIX;
strcpy(reader.sun_path, IPCS_PATH);

// connessione al server remoto
if (connect(sock, (struct sockaddr *)&reader, sizeof(reader)) < 0) {
// errore connect
fprintf(stderr, "%s: errore connect (%s)\n", argv[0], strerror(errno));
close(sock);
return EXIT_FAILURE;
}

// loop di scrittura messaggi per il reader
Message message;
Data *my_data = &message.data;
my_data->index = 0;
do {
// test index per forzare l'uscita
if (my_data->index == N_MESSAGES) {
// il processo chiude il socket ed esce per indice raggiunto
printf("processo %d terminato (text=%s messaggi=%ld)\n",
getpid(), my_data->text, my_data->index);
close(sock);
exit(EXIT_SUCCESS);
}

// compongo il messaggio e lo invio
my_data->index++;
snprintf(my_data->text, sizeof(my_data->text), "un-messaggio-di-test:%ld",
my_data->index);
} while (fastWrite(sock, &message) != -1);

// il processo chiude il socket ed esce per altro motivo (errore)
printf("processo %d terminato con errore (%s)\n", getpid(), strerror(errno));
close(sock);
exit(EXIT_FAILURE);
}
// reader.c - main processo figlio
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <time.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <sys/un.h>
#include "data.h"
#include "message.h"

#define BACKLOG 10 // per listen()

// funzione main()
int main(int argc, char *argv[])
{
// creo il socket in modo IPC e Stream
printf("processo %d partito (reader)\n", getpid());
int sock;
if ((sock = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
// errore di creazione
printf("%s: non posso creare il socket (%s)\n", argv[0], strerror(errno));
exit(EXIT_FAILURE);
}

// prepara la struttura sockaddr_un per questo reader (è un server)
struct sockaddr_un reader;
memset(&reader, 0, sizeof(reader));
reader.sun_family = AF_UNIX;
strcpy(reader.sun_path, IPCS_PATH);

// associa l'indirizzo del reader al socket (questo crea il file IPCS_PATH)
unlink(IPCS_PATH); // rimuovo un eventuale file già creato
if (bind(sock, (struct sockaddr *)&reader, sizeof(reader)) == -1) {
// errore bind
printf("%s: errore bind (%s)\n", argv[0], strerror(errno));
exit(EXIT_FAILURE);
}

// start ascolto con una coda di max BACKLOG connessioni
if (listen(sock, BACKLOG) < 0) {
// errore listen
printf("%s: errore listen (%s)\n", argv[0], strerror(errno));
close(sock);
exit(EXIT_FAILURE);
}

// accetta connessioni da un writer entrante
socklen_t socksize = sizeof(struct sockaddr_un);
struct sockaddr_un writer; // (remote) writer socket info (è un client)
int writer_sock;
if ((writer_sock = accept(sock, (struct sockaddr *)&writer, &socksize)) < 0) {
// errore accept
printf("%s: errore accept (%s)\n", argv[0], strerror(errno));
close(sock);
exit(EXIT_FAILURE);
}

// chiude il socket non più in uso
close(sock);

// set clock e time per calcolare il tempo di CPU e il tempo di sistema
clock_t t_start = clock();
struct timeval tv_start;
gettimeofday(&tv_start, NULL);

// loop di lettura messaggi dal writer
int n_msg = 0;
Message message;
Data *my_data = &message.data;
while (fastRead(writer_sock, &message) != -1) {
// test numero messaggi per forzare l'uscita
if (++n_msg == N_MESSAGES) {
// get clock e time per calcolare il tempo di CPU e il tempo di sistema
clock_t t_end = clock();
double t_passed = ((double)(t_end - t_start)) / CLOCKS_PER_SEC;
struct timeval tv_end, tv_elapsed;
gettimeofday(&tv_end, NULL);
timersub(&tv_end, &tv_start, &tv_elapsed);

// il processo chiude il socket, cancella il file ed esce per numero raggiunto
printf("reader: ultimo messaggio ricevuto: %s\n", my_data->text);
printf("reader: processo %d terminato (messaggi=%d tempo CPU: %.3f - "
"tempo totale:%ld.%ld)\n",
getpid(), n_msg, t_passed, tv_elapsed.tv_sec,
tv_elapsed.tv_usec / 1000);
close(writer_sock);
unlink(IPCS_PATH);
exit(EXIT_SUCCESS);
}
}

// il processo chiude il socket, cancella il file ed esce per altro motivo (errore)
printf("processo %d terminato con errore (%s)\n", getpid(), strerror(errno));
close(writer_sock);
unlink(IPCS_PATH);
exit(EXIT_FAILURE);
}
// data.h - header per dati per mini-libreria IPC con IPC socket
#ifndef DATA_H
#define DATA_H

// path del file per ipc socket
#define IPCS_PATH "myipcs"

// numero di messaggi da scambiare per il benchmark
#define N_MESSAGES 2000000

// struttura Data per i messaggi
typedef struct {
unsigned long index; // indice dei dati
char text[16384]; // testo dei dati
} Data;

#endif // DATA_H
// message.c - implementazione per read/write per mini-libreria IPC con IPC socket
#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/socket.h>
#include "data.h"
#include "message.h"

// prototipi locali
ssize_t myRecv(int sockfd, void *buf, size_t len, int flags);

// fastwrite - scrittura con size
ssize_t fastWrite(
int fd, // il socket descriptor per la send() della libc
Message *buf) // il buffer che contiene i byte da trasmettere
{
// set size reale (somma di tutte le dimensioni dei membri del
// tipo Message (eccetto il membro size))
buf->size = sizeof(buf->data.index) + strlen(buf->data.text);

// invio il messaggio completo: size + real-size (size + somma di
// tutte le dimensioni dei membri del tipo Message (eccetto il membro size))
return send(fd, buf, SIZEOFP(buf), 0);
}

// fastread - lettura con size
ssize_t fastRead(
int fd, // il socket descriptor per la recv() della libc
Message *buf) // il buffer su cui scrivere i byte ricevuti
{
// legge il size da usare nella successiva read
ssize_t size_rcvd;
if ((size_rcvd = myRecv(fd, &buf->size, sizeof(size_t), 0)) > 0) {
// return la read successiva
return myRecv(fd, &buf->data, buf->size, 0);
}

// ritorna nessun byte letto o errore
return size_rcvd;
}

// myRecv - una recv() speciale per pacchetti frammentati
ssize_t myRecv(
int sockfd, // il socket descriptor per la recv() della libc
void *buf, // il buffer su cui scivere i byte ricevuti
size_t len, // la quantità di byte da ricevere
int flags) // i flag per la recv() della libc
{
// loop per ricevere completamente un messaggio (forse) spezzettato
int bytes_recvd = 0; // byte totali ricevuti
int bytes_pending = len; // byte totali mancanti
char* address = buf;
while (bytes_pending > 0) {
// ricevo un buffer (normalmente il primo e unico buffer che contiene
// il messaggio intero)
address += bytes_recvd;
int tmp_recvd; // byte ricevuti
if ((tmp_recvd = recv(sockfd, address, bytes_pending, flags)) > 0) {
// aggiorno i contatori
bytes_recvd += tmp_recvd;
bytes_pending -= tmp_recvd;
}
}

return bytes_recvd;
}
// message.h - header per read/write per mini-libreria IPC con IPC socket
#ifndef MESSAGE_H
#define MESSAGE_H

#include "data.h"

// struttura Data per i messaggi
typedef struct {
size_t size; // size reale (somma di tutte le dimensioni dei membri del
// tipo Message (eccetto il membro size))
Data data; // campo dati del messaggio
} Message;

// size reale del messaggio
#define SIZEOFP(X) (sizeof(size_t) + sizeof(X->data.index) + strlen(X->data.text))

// prototipi globali
ssize_t fastRead(int fd, Message *buf);
ssize_t fastWrite(int fd, Message *buf);

#endif // MESSAGE_H

Ok, il nuovo codice è ampiamente commentato (come sempre), ma comunque è il caso di aggiungere qualche dettaglio. È formato da sei file, quindi due in più di quello di riferimento, per cui la struttura è questa:

  1. Il main di un processo padre: processes.c. Crea ed esegue due processi figli con fork + exec. I due processi figli si chiameranno writer e reader.
  2. Il main del processo writer: writer.c.
  3. Il main del processo reader: reader.c.
  4. Un header file per reader e writer: data.h.
  5. Un nuovo sorgente che serve a gestire la trasmissione di messaggi con lunghezza variabile: message.c.
  6. Un nuovo header file per il file message.c: message.h.

A questo punto vi risparmio ulteriori descrizioni che sarebbero identiche a quelle dello scorso articolo (e i più attenti avranno notato che qui sopra ho fatto un copia-e-incolla di intere frasi, ah ah ah).

È il caso, invece, di soffermarsi sull'unica differenza reale rispetto alla implementazione delle Fastpipe: nel file message.c la funzione fastRead() invece di chiamare internamente la recv(2) della libc chiama una nuova funzione locale, la myRecv(), delegando a quest'ultima le chiamate alla recv(2). E cosa fa di speciale questa funzione? Semplice, visto che il problema da risolvere è lo "spezzettamento" (o frammentazione, come preferite) dei messaggi in arrivo, la myRecv() si occupa di ricevere tutti i frammenti di un messaggio e di restituire al chiamante il messaggio intero... problema risolto!

E come lavora la myRecv()? Direi che il codice è sufficientemente compatto e lineare da rendere l'idea a prima vista: viene fatto un loop di recv(2) e, di volta in volta vengono aggiornati dei contatori di byte ricevuti e di byte mancanti alla ricezione completa del messaggio (evidentemente la ricezione è completa quando si ricevono <len> byte). Nella migliore delle ipotesi il loop interno non farà nulla: alla prima ricezione si ottiene già il messaggio intero e lo si ritorna al chiamante, però in alcuni casi (magari frequenti) il messaggio verrà costruito chiamando più volta la recv(2) nel loop. Semplice, no?

E i risultati del test IPC socket vs IPC Fastsocket quali sono? Anche questa volta (come per le pipe) erano abbastanza scontati, ma è, comunque, il caso di mostrarli:

aldo@Linux $ cd ipcsocket/
aldo@Linux $ ./processes
sono il padre (17317): attendo la terminazione dei figli
sono il figlio 1 (17318): eseguo il nuovo processo
sono il figlio 2 (17319): eseguo il nuovo processo
processo 17318 partito (reader)
processo 17319 partito (writer)
processo 17319 terminato (text=un-messaggio-di-test:2000000 messaggi=2000000)
reader: ultimo messaggio ricevuto: un-messaggio-di-test:2000000
reader: processo 17318 terminato (messaggi=2000000 tempo CPU: 4.803 - tempo totale:4.823)
sono il padre (17317): figlio 17318 terminato (0)
sono il padre (17317): figlio 17319 terminato (0)
./processes: processi terminati

aldo@Linux $ cd ../fastipcsocket/
aldo@Linux $ ./processes
sono il figlio 1 (17325): eseguo il nuovo processo
sono il padre (17324): attendo la terminazione dei figli
sono il figlio 2 (17326): eseguo il nuovo processo
processo 17325 partito (reader)
processo 17326 partito (writer)
processo 17326 terminato (text=un-messaggio-di-test:2000000 messaggi=2000000)
reader: ultimo messaggio ricevuto: un-messaggio-di-test:2000000
reader: processo 17325 terminato (messaggi=2000000 tempo CPU: 3.456 - tempo totale:3.456)
sono il padre (17324): figlio 17326 terminato (0)
sono il padre (17324): figlio 17325 terminato (0)
./processes: processi terminati

Il miglioramento nell'invio di 2000000 messaggi è buono, 3.456 secondi invece di 4.823 secondi, ed era, come detto sopra, scontato, visto che invece di trattare pacchetti (a lunghezza fissa) di 16 KB trattiamo pacchetti di qualche decina di byte; magari ci si poteva aspettare qualcosa di più, ma è evidente che in un benchmark di questo tipo anche solo l'avvio di una operazione di read o write ha il suo peso, e contribuisce al tempo totale a prescindere dalla quantità di dati trattati (oops... un altro copia-e-incolla, ah ah ah).

E aggiungo, solo come curiosità: ho fatto la stessa operazione anche coi Network socket (vi risparmio il codice, non voglio dilungarmi troppo) e il miglioramento "fast" vs "classic" nell'invio di 2000000 messaggi è superiore a quello ottenuto con gli IPC Socket: 3.114 secondi invece di 6.899 secondi.

Siamo arrivati alla fine dell'articolo. Manca qualcosa? Ah, si, dimenticavo, avevo spoilerato un trucco finale! Ecco, dovete sapere che la fastRead() in realtà si può scrivere in maniera decisamente più semplice, rinunciando addirittura all'uso della myRead(), vediamo come:

// fastread - lettura con size
ssize_t fastRead(
int fd, // il socket descriptor per la recv() della libc
Message *buf) // il buffer su cui scrivere i byte ricevuti
{
// legge il size da usare nella successiva read
ssize_t size_rcvd;
if ((size_rcvd = recv(fd, &buf->size, sizeof(size_t), MSG_WAITALL)) > 0) {
// return la read successiva
return recv(fd, &buf->data, buf->size, MSG_WAITALL);
}

// ritorna nessun byte letto o errore
return size_rcvd;
}

Come potete ben vedere, questa nuova versione della fastRead() è ultra semplice, ed è quasi identica a quella usata con la Fastpipe. Sfrutta il flag MSG_WAITALL della recv(2) (da passare nell'apposito campo <flags> della funzione; con questo flag attivo la ricezione si blocca fino a quando non sono arrivati tutti i blocchi che compongono il messaggio: fa esattamente il lavoro della myRead() vista sopra, però lo fa già a livello interno di libreria, quindi è sicuramente il metodo preferente. Comunque ho implementato la myRead() non per perdere tempo, ma per mostrare come funzionano internamente i misteriosi flag delle funzioni di libreria, per cui scriverla è stato un utile esercizio (ma nel Software di produzione usate MSG_WAITALL, mi raccomando!).

E questo ultimo trucchetto ci porta indirettamente a un altro quesito: e se volessimo implementare Fastsocket usando Datagram (ossia aprendo il socket in modo SOCK_DGRAM invece che SOCK_STREAM)? Ecco, in questo caso, come per le POSIX Message Queue citate nell'ultimo articolo, l'esercizio non è possibile: per le MQ il problema era il meccanismo di base che è a lunghezza fissa, mentre per i Datagram (o modo UDP, per i Network socket) il meccanismo di lettura in due passi (prima la lunghezza del buffer e dopo il buffer stesso) non può funzionare, perché con i Datagram non viene garantito né l'arrivo né l'ordine di arrivo dei messaggi (quindi funziona solo se siamo molto ma molto fortunati, ah ah ah). Ah, guarda caso, come dice il manuale della recv(2), il flag MSG_WAITALL non è disponibile per i Datagram socket (oh, che sorpresa!).

E per oggi può bastare, abbiamo dimostrato che il metodo "fast" si può applicare anche agli IPC socket e ai Network socket (ma non coi Datagram, occhio!). Con questa seconda parte dichiaro concluso, almeno momentaneamente, l'argomento Fast IPC. Cosa ci riserverà il futuro? Boh, non lo so ancora, ma vi garantisco che sarà interessante!

Ciao, e al prossimo post!