Problème de performance en Rust

Server HTTP et gros fichiers

L’auteur de ce sujet a trouvé une solution à son problème.
Auteur du sujet

Bonjour,

J’ai un petit problème en Rust. Je cherche à obtenir les même performances que j’ai avec un programme en Go avec un nouveau programme écrit en Rust.
Le problème, c’est que c’est assez difficile.

Le programme est un serveur Web qui distribue de gros fichiers (ici 6.6Gio). J’utilise tokio et hyper.

Mon premier code ressemble à ça :

fn main() {
    let addr = "[::1]:1337".parse().unwrap();

    let service = || {
        service_fn_ok( |_| {
           let task = tokio::fs::File::open("big")
               .and_then( |file|  {
                  Response::new(Body::wrap_stream(TokioStream{ file: file,
                       buf: vec!(0; BUF_SIZE)
                   }))
               })
            match task.wait() {
                Err(_) => Response::new(Body::from("Please (╥_╥)")),
                Ok(s) => Response::new(Body::wrap_stream(s)),
            }
        })
    };

    let server = Server::bind(&addr)
        .serve(service)
        .map_err(|e| eprintln!("Error: {}", e));

    hyper::rt::run(server);
}

Les performances étaient correctes. Mais bien en deçà de ce qu’offrait le programme Go.
J’ai un petit PC et les performances étaient de 230MB/s pour Rust contre ~420MB/s pour le programme en Go.

Après avoir cherché beaucoup d’amélioration, j’ai fini par trouver que sur le PC où je suis 420MB/s est à peu près la vitesse maximale obtenue à partir du disque dur (450MB/s) alors que le PC est capable d’envoyer virtuellement sur l’interface loopback @600MB/s.

J’ai codé un programme capable de faire les deux, lire à la vitesse maximale et envoyer @600MB/s.

J’aimerais idéalement que les deux arrivent à se synchroniser sans pour autant perdre en performance. Voilà le programme qui benchmark un peu les différentes ressources systèmes.

const BUF_SIZE : usize = 1<<19;
static mut BUF : [u8; BUF_SIZE] = [0; BUF_SIZE];

struct TokioStream {
}
impl Stream for TokioStream {
    type Item = &'static[u8];
    type Error = Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        unsafe {
            Ok(Async::Ready(Some(&BUF[..])))
        }
    }
}

static mut a :i64 = 0;
fn main() {
    let start = PreciseTime::now();
    let task = tokio::fs::File::open("big")
        .and_then(|mut file| unsafe {
            loop {
                let r = file.poll_read(&mut BUF);
                match r {
                    Ok(Async::Ready(0)) => {
                        break r
                    },
                    Ok(Async::Ready(size)) => {
                        a += size as i64;
                        //Ok(Async::Ready(Some(&BUF[..])));
                        continue
                    },
                    Ok(Async::NotReady) => {
                        break r
                    },
                    Err(_) => {
                        break r
                    },
                }
            }
        }
        ).map(|_| {
            ;
        }).map_err(|err| eprintln!("IO error: {:?}", err));

    tokio::run(task);
    let end = PreciseTime::now();
    unsafe {
        let dur : f64 = start.to(end).num_milliseconds() as f64 / 1000.;
        let size : f64 = a as f64;
        println!("{:.3}MB/s", (size / dur)/(1024.*1024.));
    }

    let addr = "[::1]:1337".parse().unwrap();

    let service = || {
        service_fn( |_| {
            tokio::fs::File::open("big")
                .and_then( |_|  {
                    let fs = TokioStream{};
                    Ok(Response::new(Body::wrap_stream(fs)))
                })
        })
    };

    let server = Server::bind(&addr)
        .serve(service)
        .map_err(|e| eprintln!("Error: {}", e));

    hyper::rt::run(server);
}

Je ne sais pas comment faire pour arriver à conserver les performances de chaque composant de manière à être limité par le moins rapide. Plusieurs buffers avec des Mutex le tous séparer dans des threads différents ? C’est basique, mais comment faire ? Il n’y a rien de prévu pour ça déjà ?

Merci et à bientôt \o

PS: Il est question d’un PC peu performant mais le disque dur est un SSD donc les performances de lecture sont tout à fait acceptable. Cependant, pour ce qui est du réseau, généralement en loopback, on trouvera plus souvent du GB/s. Un PC moderne et performant pourrait également obtenir des performances de l’ordre de 10GB/s si des phénomènes de cache interviennent.

Édité par ache

ache.one                 🦹         👾                                🦊

+0 -0
Auteur du sujet

Bon …

Après de très nombreuses tentatives. Je me suis décidé à utiliser des Mutex et un canal.
J’obtiens des performances assez décevantes de 300MB/s. Avec tous le travail investi je m’attendais plus à du 400MB/s. Je suppose que ça reste une base pas trop mauvaise à améliorer.
N’empêche que pour l’instant, c’est encore lent …

Voici mon code :

L’idée c’est que un worker va envoyer les Chunks par messages au Stream, en permanence dans un nouveau thread. Il se bloque si le canal est plein.
De l’autre coté, dès que le Stream poll, il va retirer un message du canal.

extern crate tokio;
extern crate futures;
extern crate hyper;

use std::thread;
use std::sync::mpsc;
use std::sync::Mutex;
use std::io::Read;

use futures::{Poll, Async, Future};
use futures::stream::Stream;
use std::io::Error;

use hyper::{Body, Response, Server, service::service_fn_ok};

const BUF_SIZE : usize = 1<<21;


struct TokioWorker {
    file: Mutex<std::fs::File>,
    buf: Mutex<Vec<Vec<u8>>>,
}
impl TokioWorker {
    fn new(file: std::fs::File, buf: Vec<Vec<u8>> ) -> TokioWorker {
        TokioWorker {
            file: Mutex::new(file),
            buf: Mutex::new(buf),
        }
    }
    fn read(&self, i: usize) -> Result<usize, Error> {
        self.file.lock().unwrap().read(&mut self.buf.lock().unwrap()[i])
    }
}
struct TokioStream {
    rx: mpsc::Receiver<Vec<u8>>,
}
impl TokioStream {
    fn new(file: std::fs::File, buf: Vec<Vec<u8>>) -> TokioStream {
        let (tx, rx) = mpsc::sync_channel(buf.len());

        let worker = TokioWorker::new(file, buf);
        thread::spawn(move || {
            let c = 0 as usize;
            loop {
                let r = worker.read(c);
                match r {
                    Ok(0) => break,
                    Ok(r) => tx.send(worker.buf.lock().unwrap()[c][..r].to_owned()).unwrap(),
                    Err(_) => break,
                }
            }
        });
        TokioStream{ rx: rx }
    }
}
impl Stream for TokioStream {
    type Item = Vec<u8>;
    type Error = Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self.rx.recv() {
            Ok(j) => Ok(Async::Ready(Some(j))),
            Err(_) => Ok(Async::Ready(None)),
        }
    }
}

fn main() {
    let addr = "[::1]:1337".parse().unwrap();
    let service = || {
        service_fn_ok( |_| {
            let file = std::fs::File::open("big");
            let buf = vec![
                vec![0u8; BUF_SIZE],
                vec![0u8; BUF_SIZE],
                vec![0u8; BUF_SIZE],
                vec![0u8; BUF_SIZE],
                vec![0u8; BUF_SIZE],
                vec![0u8; BUF_SIZE],
                vec![0u8; BUF_SIZE],
            ];
            let stream = TokioStream::new(file.unwrap(), buf);
            Response::new(Body::wrap_stream(stream))
        })
    };

    let server = Server::bind(&addr)
        .serve(service)
        .map_err(|e| eprintln!("Error: {}", e));

    hyper::rt::run(server);
}

J’en ai profiter pour tester des frameworks comme warp. La promesse de simplicité est au prix d’énorme perte de performance ! Je tombe @75MB/s. C’est ridiculement faible.

J’en conclue que Rust n’est pas encore près pour le Web même si il est en bonne voie.

Édité par ache

ache.one                 🦹         👾                                🦊

+0 -0
Auteur du sujet

Bon après étude du problème de manière approfondie. Pour obtenir des performances de Go (~420MB/s) il faut utiliser sendfile qui est une optimisation surtout fonctionnel sous les système Unix.

Elle est présente dans la libc de Rust mais pas adapté à Hyper. Je vais voir si je peux pas faire quelque chose mais je pense que non.

ache.one                 🦹         👾                                🦊

+2 -0
Vous devez être connecté pour pouvoir poster un message.
Connexion

Pas encore inscrit ?

Créez un compte en une minute pour profiter pleinement de toutes les fonctionnalités de Zeste de Savoir. Ici, tout est gratuit et sans publicité.
Créer un compte