Multiprocessing and Pipes in C

11,128

Solution 1

Try this code for size. It uses a fixed number of child processes, but you can change that number by adjusting the enum MAX_KIDS (it was mostly tested with that set at 3; I later changed it to 5 just to make sure).

#include <signal.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/wait.h>
#include <unistd.h>

typedef struct Child
{
    FILE *fp_to;
    FILE *fp_from;
    pid_t pid;
} Child;

enum { P_READ, P_WRITE };   /* Read, write descriptor of a pipe */
enum { MAX_LINE = 4096 };

static void be_childish(void);
static void distribute(size_t nkids, Child *kids);
static void err_exit(const char *fmt, ...);
static void merge(size_t nkids, Child *kids);
static void wait_for_kids(size_t nkids, Child *kids);

static int make_kid(Child *kid)
{
    int pipe1[2];   /* From parent to child */
    int pipe2[2];   /* From child to parent */
    if (pipe(pipe1) != 0)
        return -1;
    if (pipe(pipe2) != 0)
    {
        close(pipe1[P_READ]);
        close(pipe1[P_WRITE]);
        return -1;
    }
    if ((kid->pid = fork()) < 0)
    {
        close(pipe1[P_READ]);
        close(pipe1[P_WRITE]);
        close(pipe2[P_READ]);
        close(pipe2[P_WRITE]);
        return -1;
    }
    else if (kid->pid == 0)
    {
        dup2(pipe1[P_READ], STDIN_FILENO);
        dup2(pipe2[P_WRITE], STDOUT_FILENO);
        close(pipe1[P_READ]);
        close(pipe1[P_WRITE]);
        close(pipe2[P_READ]);
        close(pipe2[P_WRITE]);
        /* Reads standard input from parent; writes standard output to parent */
        be_childish();
        exit(0);
    }
    else
    {
        kid->fp_to   = fdopen(pipe1[P_WRITE], "w");
        kid->fp_from = fdopen(pipe2[P_READ], "r");
        close(pipe1[P_READ]);
        close(pipe2[P_WRITE]);
        return 0;
    }
}

int main(void)
{
    enum { NUM_KIDS = 5 };
    Child kids[NUM_KIDS];
    struct sigaction act;

    sigfillset(&act.sa_mask);
    act.sa_flags   = 0;
    act.sa_handler = SIG_DFL;
    sigaction(SIGCHLD, &act, 0);

    for (int i = 0; i < NUM_KIDS; i++)
    {
        if (make_kid(&kids[i]) != 0)
            err_exit("Fault starting child %d\n", i);
    }

    distribute(NUM_KIDS, kids);
    merge(NUM_KIDS, kids);

    wait_for_kids(NUM_KIDS, kids);
    return(0);
}

static void err_exit(const char *fmt, ...)
{
    va_list args;
    va_start(args, fmt);
    vfprintf(stderr, fmt, args);
    va_end(args);
    exit(1);
}

static int qs_compare(const void *v1, const void *v2)
{
    const char *s1 = *(char **)v1;
    const char *s2 = *(char **)v2;
    return(strcmp(s1, s2));
}

static char *estrdup(const char *str)
{
    size_t len = strlen(str) + 1;
    char *out = malloc(len);
    if (out == 0)
        err_exit("Out of memory!\n");
    memmove(out, str, len);
    return(out);
}

static void be_childish(void)
{
    char **lines = 0;
    size_t num_lines = 0;
    size_t max_lines = 0;
    char input[MAX_LINE];

    while (fgets(input, sizeof(input), stdin) != 0)
    {
        if (num_lines >= max_lines)
        {
            size_t n = (2 * max_lines + 2);
            void *space = realloc(lines, n * sizeof(char *));
            if (space == 0)
                err_exit("Out of memory!\n");
            lines = space;
            max_lines = n;
        }
        lines[num_lines++] = estrdup(input);
    }

    qsort(lines, num_lines, sizeof(char *), qs_compare);

    for (size_t i = 0; i < num_lines; i++)
    {
        if (fputs(lines[i], stdout) == EOF)
            err_exit("Short write to parent from %d\n", (int)getpid());
    }

    exit(0);
}

static void distribute(size_t nkids, Child *kids)
{
    char   input[MAX_LINE];
    size_t n = 0;

    while (fgets(input, sizeof(input), stdin) != 0)
    {
        if (fputs(input, kids[n].fp_to) == EOF)
            err_exit("Short write to child %d\n", (int)kids[n].pid);
        if (++n >= nkids)
            n = 0;
    }

    /* Close pipes to children - let's them get on with sorting */
    for (size_t i = 0; i < nkids; i++)
    {
        fclose(kids[i].fp_to);
        kids[i].fp_to = 0;
    }
}

static void read_line(Child *kid, char *buffer, size_t maxlen, int *length)
{
    if (fgets(buffer, maxlen, kid->fp_from) != 0)
    {
        *length = strlen(buffer);
        buffer[*length] = '\0';
    }
    else
    {
        buffer[0] = '\0';
        *length = -1;
        fclose(kid->fp_from);
        kid->fp_from = 0;
    }
}

static int not_all_done(size_t nkids, int *lengths)
{
    for (size_t i = 0; i < nkids; i++)
    {
        if (lengths[i] > 0)
            return 1;
    }
    return 0;
}

static void min_line(size_t nkids, int *len, char **lines, size_t maxlen,
                     Child *kids, char *output)
{
    size_t  min_kid = 0;
    char   *min_str = 0;
    for (size_t i = 0; i < nkids; i++)
    {
        if (len[i] <= 0)
            continue;
        if (min_str == 0 || strcmp(min_str, lines[i]) > 0)
        {
            min_str = lines[i];
            min_kid = i;
        }
    }
    strcpy(output, min_str);
    read_line(&kids[min_kid], lines[min_kid], maxlen, &len[min_kid]);
}

static void merge(size_t nkids, Child *kids)
{
    char line_data[nkids][MAX_LINE];
    char *lines[nkids];
    int  len[nkids];
    char output[MAX_LINE];

    for (size_t i = 0; i < nkids; i++)
        lines[i] = line_data[i];

    /* Preload first line from each kid */
    for (size_t i = 0; i < nkids; i++)
        read_line(&kids[i], lines[i], MAX_LINE, &len[i]);

    while (not_all_done(nkids, len))
    {
        min_line(nkids, len, lines, MAX_LINE, kids, output);
        fputs(output, stdout);
    }
}

static void wait_for_kids(size_t nkids, Child *kids)
{
    int pid;
    int status;

    while ((pid = waitpid(-1, &status, 0)) != -1)
    {
        for (size_t i = 0; i < nkids; i++)
        {
            if (pid == kids[i].pid)
                kids[i].pid = -1;
        }
    }

    /* This check loop is not really necessary */
    for (size_t i = 0; i < nkids; i++)
    {
        if (kids[i].pid != -1)
            err_exit("Child %d died without being tracked\n", (int)kids[i].pid);
    }
}

Solution 2

The overall picture is usually:

pid_t pids[3];
int fd[3][2];

int i;
for (i = 0; i < 3; ++i) {
    /* create the pipe */
    if (pipe(fd[i]) < 0) {
            perror("pipe error");
            exit(1);
    }

   /* fork the child */
   pid[i] = fork();
   if (pid[i] < 0) {
       perror("fork error");
   } else if (pid[i] > 0) {
       /* in parent process */
       /* close reading end */
       close(fd[i][0]);
   } else {
       /* in child process */
       /* close writing end */
       close(fd[i][1]);
       /* read from parent */
       read(fd[i][0], line, max);
       ...
    }
}

/* in parent process */
char words[100][10] = {...};
int j, child = 0;
/* for all words */
for (j = 0; j < 100; ++j) {
    /* write to child */
    write(fd[child][1], words[j], strlen(words[j]));
    ...
    ++child;
    if (child >= 3)
        child = 0;
}

Duplicate the pipe part for communicating back from child to parent. Be careful not to deadlock when parent and child are trying to communicate in both directions simultaneously.

Solution 3

There's nothing magical about pipes - they are just a communication medium with two endpoints. The logic goes approximately:

Create 3 pipes and hold onto one endpoint. Fork three times, and get each of those forked children to hold onto the other end of the pipe. The child then goes into a read loop, waiting for input and writing back output. The parent can round-robin all outputs, then do a round-robin read of inputs. This isn't the nicest strategy, but it's by far the simplest. i.e.,

while there is work left to do:
   for i in 1..3
       write current work unit to pipe[i]

   for i in 1..3
       read back response from pipe[i]

A given child just looks like this:

while(input = read from pipe)
    result = do work on input
    write result to pipe

The next step would be doing your read-back in the parent process in an asynchronous, non-blocking manner (possibly using select, or just a busy-wait polling loop). This requires the children to report back which task they are returning a result for, because the ordering could get messy (i.e., you can no longer rely on the first work unit you send being the first response you get). Welcome to the fun world of concurrency bugs.

Given the somewhat underspecified nature of your question, I hope this is somehow useful.

Share:
11,128
PoweredByOrange
Author by

PoweredByOrange

Updated on June 23, 2022

Comments

  • PoweredByOrange
    PoweredByOrange almost 2 years

    I'm trying to learn how to work with fork() to create new processes and pipes to communicate with each process. Let's say I have a list that contains 20 words, and I create 3 processes. Now, I need to distribute the words between the processes using pipes, and the each process will sort the list of words it receives. The way I want to achieve this is like this:

    Word1 => Process1
    Word2 => Process2
    Word3 => Process3
    Word4 => Process1
    Word5 => Process2
    Word6 => Process3
    .
    .
    .
    

    So each process will have a list of words to sort, and eventually I'll use MergeSort to merge all the sorted lists returned by each process. I'm not sure how to use pipes to communicate with each process (e.g. feed each process with a word). Any help that would put me on the right track would be appreciated.