Saturday, February 18, 2017

Pipelines in Rust

A classic Operating Systems assignment is to implement a Unix shell. It's a great assignment for several reasons. Students have to:
  • Get a solid understanding of the concept of a process.
  • Master some of the more subtle aspects of the file system.
  • Make direct system calls with subtle semantics.
Since I am teaching my course in Rust, I needed to figure out how I wanted the students to implement pipelines and I/O redirection. The options are:
  • Rely entirely on the standard library.
  • Interact directly with libc.
The benefit of relying on the standard library is, first of all, that the resulting code is portable across platforms. Second, there is no need for unsafe blocks.  The problem is that the pipe from the standard library only connects a parent and child. The Command object is great, but it doesn't allow for the separation between fork() and execv() that is essential for implementing proper I/O redirection. 

The trouble with the standard libc crate is that all of the function calls are unsafe. As far as I am concerned, this is bad news pedagogically because I do not want my students using unsafe blocks unless absolutely necessary. They are still learning the language, and I want them to learn to operate within the safety parameters it gives. Unsafe blocks, while necessary, are for experts, and my students will not be experts after one semester.

Fortunately, I recently discovered the nix crate. (Update: Starting in version 0.19.0, fork() is now marked unsafe. The code below works unmodified up to version 0.18.0. The nature of POSIX requirements for these functions makes them intrinsically unsafe in Rust.) It is a safe alternative to libc that provides everyone's favorite system calls in a Rust-friendly form.  The program below executes a simple three-command pipeline.  Here are the key concepts for understanding the program:

  • The fork() system call creates a new process. The new process is a perfect duplicate of the original process. The only difference between them is that fork() returns a different value if it is in the child process. If it is in the parent process, it returns the child's PID. 
  • A parent and child process are connected using a pipe. A pipe is accessed using two file descriptors: one for the pipe's input, one for the pipe's output. The output of the pipe serves as an input stream for a process, and likewise the input of the pipe serves as an output stream for a process.
  • When a parent and child are connected by a pipe, the parent will typically be receiving data from the child. Therefore, the parent's input will be the pipe's output, and the child's output will be the pipe's input. 
  • When the child is finished generating output, it exits. Then, the parent can finish processing the data it received from the child before it in turn exits as well. It is essential that the child exit first; if the parent exits first, unless other provisions are made both processes end.
  • To redirect standard input from the pipe instead of from the keyboard, we need to close the file descriptor for the keyboard (always file descriptor 0) and replace it with the file descriptor for the output of the pipe. The dup2() system call accomplishes this.
  • Similarly, to redirect standard output into the pipe instead of to the monitor, we need to close the monitor's file descriptor (always file descriptor 1) and replace it with the file descriptor for the input of the pipe. Again, we use dup2() for this purpose.
  • Remember that fork() completely duplicates a process. This includes its open file descriptors. In our case, that includes both ends of the pipe. We need to explicitly close the end of the pipe that the process is not using.
  • The execvp() system call loads a program and executes it. In doing so, the data for the current process is overwritten completely (except for the open file descriptors). The first argument is the name of the program. The PATH environment variable is searched for that program. The second argument is the complete list of command-line arguments, including the program name.
  • These parameters to execvp() need to be formatted as fixed-size arrays of C-style strings (which are very different from Rust strings). The program below shows how this conversion can be done.

If you'd like an alternative explanation of how the program works, feel free to view a 15-minute video in which I explain the details of this program.

use nix::unistd::{pipe, fork, ForkResult, close, dup2, execvp};
use std::ffi::CString;
use nix::sys::wait::waitpid;

// Add dependency nix = "0.19" to Cargo.toml

fn main() {
    println!("Executes ls -l | grep Cargo | wc");

    let ls:   Vec<CString> = vec![CString::new("ls").unwrap(),
                         CString::new("-l").unwrap()];
    let grep: Vec<CString> = vec![CString::new("grep").unwrap(),
                         CString::new("Cargo").unwrap()];
    let wc:   Vec<CString> = vec![CString::new("wc").unwrap()];

    match unsafe {fork()}.unwrap() {
        ForkResult::Parent{child} => {
            println!("wc pid is {}", child);
            waitpid(child, Option::None).unwrap();
            println!("Finished! Exiting...");
        },

        ForkResult::Child => {
            let (wc_in, grep_out) = pipe().unwrap();

            match unsafe {fork()}.unwrap() {
                ForkResult::Parent{child} => {
                    close(grep_out).unwrap();

                    println!("grep pid is {}", child);
                    dup2(wc_in, 0).unwrap();
                    let array = wc.into_boxed_slice();
                    execvp(&array[0], &*array).unwrap();
                },

                ForkResult::Child => {
                    close(wc_in).unwrap();

                    let (grep_in, ls_out) = pipe().unwrap();

                    match unsafe {fork()}.unwrap() {
                        ForkResult::Parent {child} => {
                            close(ls_out).unwrap();

                            println!("ls pid is {}", child);
                            dup2(grep_out, 1).unwrap();
                            dup2(grep_in, 0).unwrap();
                            let array = grep.into_boxed_slice();
                            execvp(&array[0], &*array).unwrap();
                        },

                        ForkResult::Child => {
                            close(grep_in).unwrap();

                            dup2(ls_out, 1).unwrap();
                            let array = ls.into_boxed_slice();
                            execvp(&array[0], &*array).unwrap();
                        }
                    }
                }
            }
        }
    }
}

No comments:

Post a Comment