Stop using ftw.

This commit is contained in:
Érico Nogueira 2021-09-18 11:23:56 -03:00
parent e24aef5395
commit 7c6fe16e7b
3 changed files with 190 additions and 37 deletions

13
erm.c
View File

@ -40,14 +40,8 @@ int main(int argc, char **argv)
usage(1);
}
if (recursive) {
if (!malloc_task_list(argc)) {
perror("malloc");
return 5;
}
}
file_action action = recursive ? recurse_into : single_file;
file_action callback = recursive ? join_thread : NULL;
file_action callback = recursive ? (void*)1 : NULL;
const char *err_fmt = recursive ?
"failed to delve into '%s': %s\n" : "failed to remove '%s': %s\n";
@ -64,13 +58,14 @@ int main(int argc, char **argv)
}
}
if (callback) {
for (int i = 0; i < argc; i++) {
/*for (int i = 0; i < argc; i++) {
const char *path = argv[i];
if (callback(path, i)) {
fprintf(stderr, err_fmt, path, strerror(errno));
rv = 1;
}
}
}*/
run_queue();
}
return rv;

2
erm.h
View File

@ -2,7 +2,7 @@ typedef int (*file_action)(const char *path, int position);
struct task;
/* remove.c */
struct task *malloc_task_list(size_t);
int recurse_into(const char *, int);
int join_thread(const char *, int);
int single_file(const char *, int);
int run_queue(void);

212
remove.c
View File

@ -1,56 +1,214 @@
#define _XOPEN_SOURCE 500 /* nftw */
#include <dirent.h>
#include <errno.h>
#include <ftw.h>
#include <limits.h>
#include <pthread.h>
#include <sched.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "erm.h"
struct task {
pthread_t thread;
const char *path;
char *path;
int rv;
int created;
} *task_list = NULL;
/* include priority so it doesn't deadlock */
int priority;
};
struct task *malloc_task_list(size_t n)
struct queue {
pthread_mutex_t mtx;
size_t len, size;
struct task *tasks;
/* add a counter to be decremented by each thread that can't add more stuff until we know we can stop searching? */
};
static struct queue queue = {.mtx = PTHREAD_MUTEX_INITIALIZER};
int queue_add(struct queue *q, const char *path, int priority)
{
return task_list = calloc(sizeof(struct task), n);
int rv = 0;
pthread_mutex_lock(&q->mtx);
if (q->len + 1 > q->size) {
q->size *= 2;
if (q->size == 0) q->size = 32;
void *t = realloc(q->tasks, q->size * sizeof(struct task));
if (!t) {
rv = -1;
goto error;
}
q->tasks = t;
}
char *p = strdup(path);
if (!p) {
rv = -1;
goto error;
}
struct task t = {.path = p, .priority = priority};
q->tasks[q->len++] = t;
error:
pthread_mutex_unlock(&q->mtx);
return rv;
}
static int ftw_cb(const char *fpath, const struct stat *sb,
int typeflag, struct FTW *ftwbuf)
static long nproc;
int queue_remove(struct queue *q, struct task *t)
{
if (typeflag == FTW_D || typeflag == FTW_DP) return rmdir(fpath);
else return unlink(fpath);
}
static void *run_ftw(void *arg)
{
struct task *t = arg;
t->rv = nftw(t->path, ftw_cb, 20, FTW_DEPTH|FTW_PHYS) ? errno : 0;
return NULL;
int rv = 0;
pthread_mutex_lock(&q->mtx);
if (q->len == 0) {
errno = EAGAIN;
rv = -1;
goto error;
}
puts("begin========================");
for (size_t i=0; i < q->len; i++) {
printf("item %010zu: %04d '%s'\n", i, q->tasks[i].priority, q->tasks[i].path);
}
puts("end==========================");
size_t pos = q->len-1;
/* the last position being a 0 is the best case */
if (q->tasks[pos].priority == 0) {
*t = q->tasks[pos];
q->len=pos;
} else {
int min_priority = INT_MAX;
for (size_t i=0; i < q->len; i++) {
int priority = q->tasks[i].priority;
/* if it's a zero we don't have to scan the rest */
if (priority == 0) {
pos = i;
break;
} else if (priority < min_priority) {
pos = i;
min_priority = priority;
}
}
*t = q->tasks[pos];
memmove(q->tasks+pos, q->tasks+pos+1, (q->len-pos-1)*sizeof(struct task));
q->len--;
}
/* the caller owns the path buffer now */
error:
pthread_mutex_unlock(&q->mtx);
return rv;
}
int recurse_into(const char *path, int c)
{
task_list[c].path = path;
int rv = pthread_create(&task_list[c].thread, NULL, run_ftw, &task_list[c]);
if (!rv) task_list[c].created = 1;
return rv;
(void)c;
return queue_add(&queue, path, 0);
}
int join_thread(const char *path, int c)
static void *process_queue_item(void *arg)
{
if (!task_list[c].created) {
errno = EINVAL;
return -1;
struct queue *q = arg;
struct task t;
while (1) {
int rv = queue_remove(q, &t);
if (rv) {
if (errno == EAGAIN) {
sched_yield();
continue;
}
/* doesn't happen yet, can be used to signal nothing is needed anymore? */
else break;
}
if (unlink(t.path)) {
if (errno == EISDIR) {
if (rmdir(t.path)) {
printf("rmdir failed '%s': %m\n", t.path);
if (t.priority > 0) {
/* we have already scanned the directory */
goto end;
}
/* fall through to opening directory */
} else {
/* missing error checking here and in opendir below */
printf("rmdir succeeded '%s'\n", t.path);
goto end;
}
} else {
/* break ? */
printf("unlink failed '%s': %m\n", t.path);
goto end;
}
} else {
printf("unlink succeeded '%s'\n", t.path);
goto end;
}
DIR *d;
while (!(d = opendir(t.path))) {
if (errno == ENFILE) {
/* sleep waiting for a closedir elsewhere */
sched_yield();
continue;
} else {
break;
}
}
if (d) {
struct dirent *entry;
while ((entry = readdir(d))) {
const char *name = entry->d_name;
if (!strcmp(".", name) || !strcmp("..", name)) continue;
char buf[PATH_MAX];
/* deal with too large path? */
snprintf(buf, PATH_MAX, "%s/%s", t.path, name);
queue_add(q, buf, 0);
printf("adding to queue'%s'\n", buf);
}
}
/* try self again - but the other entries are more important */
queue_add(q, t.path, t.priority+1);
end:
/* we took ownership of the buffer */
free(t.path);
}
pthread_join(task_list[c].thread, NULL);
errno = task_list[c].rv;
return errno ? -1 : 0;
return NULL;
}
int run_queue(void)
{
nproc = sysconf(_SC_NPROCESSORS_ONLN);
if (nproc < 1) nproc = 1;
if (nproc > 64) nproc = 64;
//nproc = 1;
/* warning with sizeof(*threads) ? */
pthread_t *threads = calloc(sizeof(pthread_t), nproc);
if (!threads) return -1;
int i, j = 0;
for (i = 0; i < nproc; i++) {
if (pthread_create(threads+i, NULL, process_queue_item, &queue)) {
j = 1;
break;
}
}
/* if creating threads fails, cancell all the already created ones */
if (j) for (j = 0; j < i; j++) {
pthread_cancel(threads[j]);
}
for (j = 0; j < i; j++) {
pthread_join(threads[j], NULL);
}
return 0;
}
int single_file(const char *path, int c)