Enigmista: Enigma numero due! Se sei giustizia, non mentire è tuo dovere. Qual è il tuo prezzo per fingere di non vedere?
Colson: "Il prezzo"?
Batman: Le mazzette. Le ha chiesto quanto ha preso per chiudere un occhio.
E rieccoci qui, siamo di nuovo sul pezzo. Devo dirlo, non avevo previsto una seconda parte per l'articolo sulle Fastpipe; però la chiusura che avevo scritto, quella sulla (presunta) complessità si usare il meccanismo dei messaggi a lunghezza variabile su altre POSIX IPC (oltre alle Named Pipe) mi ha fatto sentire un po' in colpa, e quindi ho deciso di estendere un il discorso (prima che qualcuno mi facesse notare che avevo esagerato sulle difficoltà, ah ah ah). E quindi rispolveriamo il nostro film ispiratore della prima parte, il bel bel The Batman, del bravo Matt Reeves. La frase citata sopra è molto attuale (e vabbè, più che attuale direi che è, ahimè, quasi una costante della storia...) ma, per quanto riguarda l'articolo precedente, vi assicuro che nessuno mi ha pagato per dire che era complicata una cosa che non lo è: è stata solo una svista (o uno scherzetto da Enigmista, fate voi, ah ah ah).
...tu non hai idea di quanto sia veloce un Fastsocket... |
Ok: per riprendere il discorso ho deciso di mostrare come si può applicare il meccanismo dei messaggi a lunghezza variabile sui socket: visto che si parla di POSIX IPC l'ho fatto con gli IPC socket (UNIX domain socket, per gli amici), anche se sarebbe stato più comodo farlo direttamente con i Network socket (Internet domain socket, per gli amici), tanto sono praticamente la stessa cosa (si, non storcete il naso, è così).
Vi ricordo quale era il problema dei socket che rendeva impossibile al primo colpo l'implementazione usata con le pipe: la frammentazione dei messaggi tipica dei protocolli usati (il TCP/IP per i Network socket, e un protocollo simile per gli IPC socket che usano i Kernel socket invece dei classici socket di rete). Questo problemino si può risolvere, in realtà, abbastanza semplicemente (e spoiler: anche in modo ultra-semplice, ma questo alla fine dell'articolo. Aspettate senza trattenere il fiato, mi raccomando!).
Bene, proseguiamo. L'obbiettivo è, quindi, fare un nuovo benchmark per confrontare le prestazioni degli IPC socket in modo "classic" con quelle del modo "fast". Vi mostrerò, direttamente, la versione "fast", visto che la versione "classic" si differenzia solo nell'uso delle funzioni send(2) e recv(2) al posto, rispettivamente, delle nostre fastWrite() e fastRead(), più l'incapsulamento dei dati nel tipo Message (insomma, le differenze sono le stesse che ci sono tra pipe e Fastpipe come visto nell'ultimo articolo). Vai col codice!
// processes.c - main processo padre#include <stdio.h>#include <stdlib.h>#include <string.h>#include <errno.h>#include <unistd.h>#include <sys/wait.h>#include "data.h"// funzione main()int main(int argc, char* argv[]){// crea i processi figlipid_t pid1, pid2;(pid1 = fork()) && (pid2 = fork());// test pid processiif (pid1 == 0) {// sono il figlio 1printf("sono il figlio 1 (%d): eseguo il nuovo processo\n", getpid());char *pathname = "reader";char *newargv[] = { pathname, NULL };execv(pathname, newargv);exit(EXIT_FAILURE); // exec non ritorna mai}else if (pid2 == 0) {// sono il figlio 2printf("sono il figlio 2 (%d): eseguo il nuovo processo\n", getpid());char *pathname = "writer";char *newargv[] = { pathname, NULL };execv(pathname, newargv);exit(EXIT_FAILURE); // exec non ritorna mai}else if (pid1 > 0 && pid2 > 0) {// sono il padreprintf("sono il padre (%d): attendo la terminazione dei figli\n", getpid());int status;pid_t wpid;while ((wpid = wait(&status)) > 0)printf("sono il padre (%d): figlio %d terminato (%d)\n", getpid(),(int)wpid, status);// escoprintf("%s: processi terminati\n", argv[0]);exit(EXIT_SUCCESS);}else {// errore nella fork(): escoprintf("%s: fork error (%s)\n", argv[0], strerror(errno));exit(EXIT_FAILURE);}}
// writer.c - main processo figlio#include <stdio.h>#include <stdlib.h>#include <string.h>#include <errno.h>#include <unistd.h>#include <sys/socket.h>#include <sys/un.h>#include "data.h"#include "message.h"// funzione main()int main(int argc, char *argv[]){// creo il socket in modo IPC e Streamprintf("processo %d partito (writer)\n", getpid());int sock;if ((sock = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {// errore di creazioneprintf("%s: non posso creare il socket (%s)\n", argv[0], strerror(errno));exit(EXIT_FAILURE);}// prepara la struttura sockaddr_un per il reader (è un server) remotostruct sockaddr_un reader;memset(&reader, 0, sizeof(reader));reader.sun_family = AF_UNIX;strcpy(reader.sun_path, IPCS_PATH);// connessione al server remotoif (connect(sock, (struct sockaddr *)&reader, sizeof(reader)) < 0) {// errore connectfprintf(stderr, "%s: errore connect (%s)\n", argv[0], strerror(errno));close(sock);return EXIT_FAILURE;}// loop di scrittura messaggi per il readerMessage message;Data *my_data = &message.data;my_data->index = 0;do {// test index per forzare l'uscitaif (my_data->index == N_MESSAGES) {// il processo chiude il socket ed esce per indice raggiuntoprintf("processo %d terminato (text=%s messaggi=%ld)\n",getpid(), my_data->text, my_data->index);close(sock);exit(EXIT_SUCCESS);}// compongo il messaggio e lo inviomy_data->index++;snprintf(my_data->text, sizeof(my_data->text), "un-messaggio-di-test:%ld",my_data->index);} while (fastWrite(sock, &message) != -1);// il processo chiude il socket ed esce per altro motivo (errore)printf("processo %d terminato con errore (%s)\n", getpid(), strerror(errno));close(sock);exit(EXIT_FAILURE);}
// reader.c - main processo figlio#include <stdio.h>#include <stdlib.h>#include <string.h>#include <errno.h>#include <unistd.h>#include <time.h>#include <sys/time.h>#include <sys/socket.h>#include <sys/un.h>#include "data.h"#include "message.h"#define BACKLOG 10 // per listen()// funzione main()int main(int argc, char *argv[]){// creo il socket in modo IPC e Streamprintf("processo %d partito (reader)\n", getpid());int sock;if ((sock = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {// errore di creazioneprintf("%s: non posso creare il socket (%s)\n", argv[0], strerror(errno));exit(EXIT_FAILURE);}// prepara la struttura sockaddr_un per questo reader (è un server)struct sockaddr_un reader;memset(&reader, 0, sizeof(reader));reader.sun_family = AF_UNIX;strcpy(reader.sun_path, IPCS_PATH);// associa l'indirizzo del reader al socket (questo crea il file IPCS_PATH)unlink(IPCS_PATH); // rimuovo un eventuale file già creatoif (bind(sock, (struct sockaddr *)&reader, sizeof(reader)) == -1) {// errore bindprintf("%s: errore bind (%s)\n", argv[0], strerror(errno));exit(EXIT_FAILURE);}// start ascolto con una coda di max BACKLOG connessioniif (listen(sock, BACKLOG) < 0) {// errore listenprintf("%s: errore listen (%s)\n", argv[0], strerror(errno));close(sock);exit(EXIT_FAILURE);}// accetta connessioni da un writer entrantesocklen_t socksize = sizeof(struct sockaddr_un);struct sockaddr_un writer; // (remote) writer socket info (è un client)int writer_sock;if ((writer_sock = accept(sock, (struct sockaddr *)&writer, &socksize)) < 0) {// errore acceptprintf("%s: errore accept (%s)\n", argv[0], strerror(errno));close(sock);exit(EXIT_FAILURE);}// chiude il socket non più in usoclose(sock);// set clock e time per calcolare il tempo di CPU e il tempo di sistemaclock_t t_start = clock();struct timeval tv_start;gettimeofday(&tv_start, NULL);// loop di lettura messaggi dal writerint n_msg = 0;Message message;Data *my_data = &message.data;while (fastRead(writer_sock, &message) != -1) {// test numero messaggi per forzare l'uscitaif (++n_msg == N_MESSAGES) {// get clock e time per calcolare il tempo di CPU e il tempo di sistemaclock_t t_end = clock();double t_passed = ((double)(t_end - t_start)) / CLOCKS_PER_SEC;struct timeval tv_end, tv_elapsed;gettimeofday(&tv_end, NULL);timersub(&tv_end, &tv_start, &tv_elapsed);// il processo chiude il socket, cancella il file ed esce per numero raggiuntoprintf("reader: ultimo messaggio ricevuto: %s\n", my_data->text);printf("reader: processo %d terminato (messaggi=%d tempo CPU: %.3f - ""tempo totale:%ld.%ld)\n",getpid(), n_msg, t_passed, tv_elapsed.tv_sec,tv_elapsed.tv_usec / 1000);close(writer_sock);unlink(IPCS_PATH);exit(EXIT_SUCCESS);}}// il processo chiude il socket, cancella il file ed esce per altro motivo (errore)printf("processo %d terminato con errore (%s)\n", getpid(), strerror(errno));close(writer_sock);unlink(IPCS_PATH);exit(EXIT_FAILURE);}
// data.h - header per dati per mini-libreria IPC con IPC socket#ifndef DATA_H#define DATA_H// path del file per ipc socket#define IPCS_PATH "myipcs"// numero di messaggi da scambiare per il benchmark#define N_MESSAGES 2000000// struttura Data per i messaggitypedef struct {unsigned long index; // indice dei datichar text[16384]; // testo dei dati} Data;#endif // DATA_H
// message.c - implementazione per read/write per mini-libreria IPC con IPC socket#include <stdio.h>#include <string.h>#include <fcntl.h>#include <unistd.h>#include <sys/socket.h>#include "data.h"#include "message.h"// prototipi localissize_t myRecv(int sockfd, void *buf, size_t len, int flags);// fastwrite - scrittura con sizessize_t fastWrite(int fd, // il socket descriptor per la send() della libcMessage *buf) // il buffer che contiene i byte da trasmettere{// set size reale (somma di tutte le dimensioni dei membri del// tipo Message (eccetto il membro size))buf->size = sizeof(buf->data.index) + strlen(buf->data.text);// invio il messaggio completo: size + real-size (size + somma di// tutte le dimensioni dei membri del tipo Message (eccetto il membro size))return send(fd, buf, SIZEOFP(buf), 0);}// fastread - lettura con sizessize_t fastRead(int fd, // il socket descriptor per la recv() della libcMessage *buf) // il buffer su cui scrivere i byte ricevuti{// legge il size da usare nella successiva readssize_t size_rcvd;if ((size_rcvd = myRecv(fd, &buf->size, sizeof(size_t), 0)) > 0) {// return la read successivareturn myRecv(fd, &buf->data, buf->size, 0);}// ritorna nessun byte letto o errorereturn size_rcvd;}// myRecv - una recv() speciale per pacchetti frammentatissize_t myRecv(int sockfd, // il socket descriptor per la recv() della libcvoid *buf, // il buffer su cui scivere i byte ricevutisize_t len, // la quantità di byte da ricevereint flags) // i flag per la recv() della libc{// loop per ricevere completamente un messaggio (forse) spezzettatoint bytes_recvd = 0; // byte totali ricevutiint bytes_pending = len; // byte totali mancantichar* address = buf;while (bytes_pending > 0) {// ricevo un buffer (normalmente il primo e unico buffer che contiene// il messaggio intero)address += bytes_recvd;int tmp_recvd; // byte ricevutiif ((tmp_recvd = recv(sockfd, address, bytes_pending, flags)) > 0) {// aggiorno i contatoribytes_recvd += tmp_recvd;bytes_pending -= tmp_recvd;}}return bytes_recvd;}
// message.h - header per read/write per mini-libreria IPC con IPC socket#ifndef MESSAGE_H#define MESSAGE_H#include "data.h"// struttura Data per i messaggitypedef struct {size_t size; // size reale (somma di tutte le dimensioni dei membri del// tipo Message (eccetto il membro size))Data data; // campo dati del messaggio} Message;// size reale del messaggio#define SIZEOFP(X) (sizeof(size_t) + sizeof(X->data.index) + strlen(X->data.text))// prototipi globalissize_t fastRead(int fd, Message *buf);ssize_t fastWrite(int fd, Message *buf);#endif // MESSAGE_H
Ok, il nuovo codice è ampiamente commentato (come sempre), ma comunque è il caso di aggiungere qualche dettaglio. È formato da sei file, quindi due in più di quello di riferimento, per cui la struttura è questa:
- Il main di un processo padre: processes.c. Crea ed esegue due processi figli con fork + exec. I due processi figli si chiameranno writer e reader.
- Il main del processo writer: writer.c.
- Il main del processo reader: reader.c.
- Un header file per reader e writer: data.h.
- Un nuovo sorgente che serve a gestire la trasmissione di messaggi con lunghezza variabile: message.c.
- Un nuovo header file per il file message.c: message.h.
A questo punto vi risparmio ulteriori descrizioni che sarebbero identiche a quelle dello scorso articolo (e i più attenti avranno notato che qui sopra ho fatto un copia-e-incolla di intere frasi, ah ah ah).
È il caso, invece, di soffermarsi sull'unica differenza reale rispetto alla implementazione delle Fastpipe: nel file message.c la funzione fastRead() invece di chiamare internamente la recv(2) della libc chiama una nuova funzione locale, la myRecv(), delegando a quest'ultima le chiamate alla recv(2). E cosa fa di speciale questa funzione? Semplice, visto che il problema da risolvere è lo "spezzettamento" (o frammentazione, come preferite) dei messaggi in arrivo, la myRecv() si occupa di ricevere tutti i frammenti di un messaggio e di restituire al chiamante il messaggio intero... problema risolto!
E come lavora la myRecv()? Direi che il codice è sufficientemente compatto e lineare da rendere l'idea a prima vista: viene fatto un loop di recv(2) e, di volta in volta vengono aggiornati dei contatori di byte ricevuti e di byte mancanti alla ricezione completa del messaggio (evidentemente la ricezione è completa quando si ricevono <len> byte). Nella migliore delle ipotesi il loop interno non farà nulla: alla prima ricezione si ottiene già il messaggio intero e lo si ritorna al chiamante, però in alcuni casi (magari frequenti) il messaggio verrà costruito chiamando più volta la recv(2) nel loop. Semplice, no?
E i risultati del test IPC socket vs IPC Fastsocket quali sono? Anche questa volta (come per le pipe) erano abbastanza scontati, ma è, comunque, il caso di mostrarli:
aldo@Linux $ cd ipcsocket/aldo@Linux $ ./processessono il padre (17317): attendo la terminazione dei figlisono il figlio 1 (17318): eseguo il nuovo processosono il figlio 2 (17319): eseguo il nuovo processoprocesso 17318 partito (reader)processo 17319 partito (writer)processo 17319 terminato (text=un-messaggio-di-test:2000000 messaggi=2000000)reader: ultimo messaggio ricevuto: un-messaggio-di-test:2000000reader: processo 17318 terminato (messaggi=2000000 tempo CPU: 4.803 - tempo totale:4.823)sono il padre (17317): figlio 17318 terminato (0)sono il padre (17317): figlio 17319 terminato (0)./processes: processi terminatialdo@Linux $ cd ../fastipcsocket/aldo@Linux $ ./processessono il figlio 1 (17325): eseguo il nuovo processosono il padre (17324): attendo la terminazione dei figlisono il figlio 2 (17326): eseguo il nuovo processoprocesso 17325 partito (reader)processo 17326 partito (writer)processo 17326 terminato (text=un-messaggio-di-test:2000000 messaggi=2000000)reader: ultimo messaggio ricevuto: un-messaggio-di-test:2000000reader: processo 17325 terminato (messaggi=2000000 tempo CPU: 3.456 - tempo totale:3.456)sono il padre (17324): figlio 17326 terminato (0)sono il padre (17324): figlio 17325 terminato (0)./processes: processi terminati
Il miglioramento nell'invio di 2000000 messaggi è buono, 3.456 secondi invece di 4.823 secondi, ed era, come detto sopra, scontato, visto che invece di trattare pacchetti (a lunghezza fissa) di 16 KB trattiamo pacchetti di qualche decina di byte; magari ci si poteva aspettare qualcosa di più, ma è evidente che in un benchmark di questo tipo anche solo l'avvio di una operazione di read o write ha il suo peso, e contribuisce al tempo totale a prescindere dalla quantità di dati trattati (oops... un altro copia-e-incolla, ah ah ah).
E aggiungo, solo come curiosità: ho fatto la stessa operazione anche coi Network socket (vi risparmio il codice, non voglio dilungarmi troppo) e il miglioramento "fast" vs "classic" nell'invio di 2000000 messaggi è superiore a quello ottenuto con gli IPC Socket: 3.114 secondi invece di 6.899 secondi.
Siamo arrivati alla fine dell'articolo. Manca qualcosa? Ah, si, dimenticavo, avevo spoilerato un trucco finale! Ecco, dovete sapere che la fastRead() in realtà si può scrivere in maniera decisamente più semplice, rinunciando addirittura all'uso della myRead(), vediamo come:
// fastread - lettura con sizessize_t fastRead(int fd, // il socket descriptor per la recv() della libcMessage *buf) // il buffer su cui scrivere i byte ricevuti{// legge il size da usare nella successiva readssize_t size_rcvd;if ((size_rcvd = recv(fd, &buf->size, sizeof(size_t), MSG_WAITALL)) > 0) {// return la read successivareturn recv(fd, &buf->data, buf->size, MSG_WAITALL);}// ritorna nessun byte letto o errorereturn size_rcvd;}
Come potete ben vedere, questa nuova versione della fastRead() è ultra semplice, ed è quasi identica a quella usata con la Fastpipe. Sfrutta il flag MSG_WAITALL della recv(2) (da passare nell'apposito campo <flags> della funzione; con questo flag attivo la ricezione si blocca fino a quando non sono arrivati tutti i blocchi che compongono il messaggio: fa esattamente il lavoro della myRead() vista sopra, però lo fa già a livello interno di libreria, quindi è sicuramente il metodo preferente. Comunque ho implementato la myRead() non per perdere tempo, ma per mostrare come funzionano internamente i misteriosi flag delle funzioni di libreria, per cui scriverla è stato un utile esercizio (ma nel Software di produzione usate MSG_WAITALL, mi raccomando!).
E questo ultimo trucchetto ci porta indirettamente a un altro quesito: e se volessimo implementare Fastsocket usando Datagram (ossia aprendo il socket in modo SOCK_DGRAM invece che SOCK_STREAM)? Ecco, in questo caso, come per le POSIX Message Queue citate nell'ultimo articolo, l'esercizio non è possibile: per le MQ il problema era il meccanismo di base che è a lunghezza fissa, mentre per i Datagram (o modo UDP, per i Network socket) il meccanismo di lettura in due passi (prima la lunghezza del buffer e dopo il buffer stesso) non può funzionare, perché con i Datagram non viene garantito né l'arrivo né l'ordine di arrivo dei messaggi (quindi funziona solo se siamo molto ma molto fortunati, ah ah ah). Ah, guarda caso, come dice il manuale della recv(2), il flag MSG_WAITALL non è disponibile per i Datagram socket (oh, che sorpresa!).
E per oggi può bastare, abbiamo dimostrato che il metodo "fast" si può applicare anche agli IPC socket e ai Network socket (ma non coi Datagram, occhio!). Con questa seconda parte dichiaro concluso, almeno momentaneamente, l'argomento Fast IPC. Cosa ci riserverà il futuro? Boh, non lo so ancora, ma vi garantisco che sarà interessante!
Ciao, e al prossimo post!
Nessun commento:
Posta un commento