/*
* crew.c
*
* Demonstrate a work crew implementing a simple parallel search
* through a directory tree.
*
* Special notes: On a Solaris 2.5 uniprocessor, this test will
* not produce interleaved output unless extra LWPs are created
* by calling thr_setconcurrency(), because threads are not
* timesliced.
*/
#include <sys/types.h>
#include <pthread.h>
#include <sys/stat.h>
#include <dirent.h>
#include "errors.h"
#define CREW_SIZE 4
/*
* Queued items of work for the crew. One is queued by
* crew_start, and each worker may queue additional items.
*/
typedef struct work_tag {
struct work_tag *next; /* Next work item */
char *path; /* Directory or file */
char *string; /* Search string */
} work_t, *work_p;
/*
* One of these is initialized for each worker thread in the
* crew. It contains the "identity" of each worker.
*/
typedef struct worker_tag {
int index; /* Thread's index */
pthread_t thread; /* Thread for stage */
struct crew_tag *crew; /* Pointer to crew */
} worker_t, *worker_p;
/*
* The external "handle" for a work crew. Contains the
* crew synchronization state and staging area.
*/
typedef struct crew_tag {
int crew_size; /* Size of array */
worker_t crew[CREW_SIZE];/* Crew members */
long work_count; /* Count of work items */
work_t *first, *last; /* First & last work item */
pthread_mutex_t mutex; /* Mutex for crew data */
pthread_cond_t done; /* Wait for crew done */
pthread_cond_t go; /* Wait for work */
} crew_t, *crew_p;
size_t path_max; /* Filepath length */
size_t name_max; /* Name length */
/*
* The thread start routine for crew threads. Waits until "go"
* command, processes work items until requested to shut down.
*/
void *worker_routine (void *arg)
{
worker_p mine = (worker_t*)arg;
crew_p crew = mine->crew;
work_p work, new_work;
struct stat filestat;
struct dirent *entry;
int status;
/*
* "struct dirent" is funny, because POSIX doesn't require
* the definition to be more than a header for a variable
* buffer. Thus, allocate a "big chunk" of memory, and use
* it as a buffer.
*/
entry = (struct dirent*)malloc (
sizeof (struct dirent) + name_max);
if (entry == NULL)
errno_abort ("Allocating dirent");
status = pthread_mutex_lock (&crew->mutex);
if (status != 0)
err_abort (status, "Lock crew mutex");
/*
* There won't be any work when the crew is created, so wait
* until something's put on the queue.
*/
while (crew->work_count == 0) {
status = pthread_cond_wait (&crew->go, &crew->mutex);
if (status != 0)
err_abort (status, "Wait for go");
}
status = pthread_mutex_unlock (&crew->mutex);
if (status != 0)
err_abort (status, "Unlock mutex");
DPRINTF (("Crew %d starting\n", mine->index));
/*
* Now, as long as there's work, keep doing it.
*/
while (1) {
/*
* Wait while there is nothing to do, and
* the hope of something coming along later. If
* crew->first is NULL, there's no work. But if
* crew->work_count goes to zero, we're done.
*/
status = pthread_mutex_lock (&crew->mutex);
if (status != 0)
err_abort (status, "Lock crew mutex");
DPRINTF (("Crew %d top: first is %#lx, count is %d\n",
mine->index, crew->first, crew->work_count));
while (crew->first == NULL) {
status = pthread_cond_wait (&crew->go, &crew->mutex);
if (status != 0)
err_abort (status, "Wait for work");
}
DPRINTF (("Crew %d woke: %#lx, %d\n",
mine->index, crew->first, crew->work_count));
/*
* Remove and process a work item
*/
work = crew->first;
crew->first = work->next;
if (crew->first == NULL)
crew->last = NULL;
DPRINTF (("Crew %d took %#lx, leaves first %#lx, last %#lx\n",
mine->index, work, crew->first, crew->last));
status = pthread_mutex_unlock (&crew->mutex);
if (status != 0)
err_abort (status, "Unlock mutex");
/*
* We have a work item. Process it, which may involve
* queuing new work items.
*/
status = lstat (work->path, &filestat);
if (S_ISLNK (filestat.st_mode))
printf (
"Thread %d: %s is a link, skipping.\n",
mine->index,
work->path);
else if (S_ISDIR (filestat.st_mode)) {
DIR *directory;
struct dirent *result;
/*
* If the file is a directory, search it and place
* all files onto the queue as new work items.
*/
directory = opendir (work->path);
if (directory == NULL) {
fprintf (
stderr, "Unable to open directory %s: %d (%s)\n",
work->path,
errno, strerror (errno));
continue;
}
while (1) {
status = readdir_r (directory, entry, &result);
if (status != 0) {
fprintf (
stderr,
"Unable to read directory %s: %d (%s)\n",
work->path,
status, strerror (status));
break;
}
if (result == NULL)
break; /* End of directory */
/*
* Ignore "." and ".." entries.
*/
if (strcmp (entry->d_name, ".") == 0)
continue;
if (strcmp (entry->d_name, "..") == 0)
continue;
new_work = (work_p)malloc (sizeof (work_t));
if (new_work == NULL)
errno_abort ("Unable to allocate space");
new_work->path = (char*)malloc (path_max);
if (new_work->path == NULL)
errno_abort ("Unable to allocate path");
strcpy (new_work->path, work->path);
strcat (new_work->path, "/");
strcat (new_work->path, entry->d_name);
new_work->string = work->string;
new_work->next = NULL;
status = pthread_mutex_lock (&crew->mutex);
if (status != 0)
err_abort (status, "Lock mutex");
if (crew->first == NULL) {
crew->first = new_work;
crew->last = new_work;
} else {
crew->last->next = new_work;
crew->last = new_work;
}
crew->work_count++;
DPRINTF ((
"Crew %d: add work %#lx, first %#lx, last %#lx, %d\n",
mine->index, new_work, crew->first,
crew->last, crew->work_count));
status = pthread_cond_signal (&crew->go);
status = pthread_mutex_unlock (&crew->mutex);
if (status != 0)
err_abort (status, "Unlock mutex");
}
closedir (directory);
} else if (S_ISREG (filestat.st_mode)) {
FILE *search;
char buffer[256], *bufptr, *search_ptr;
/*
* If this is a file, not a directory, then search
* it for the string.
*/
search = fopen (work->path, "r");
if (s
- 1
- 2
前往页