mirror of https://github.com/ericonr/erm.git
Instead of using priorities, use a parent task.
This simplifies the code and avoids wasting syscalls. Does require atomics, though.
This commit is contained in:
parent
7c6fe16e7b
commit
fafb351498
142
remove.c
142
remove.c
|
@ -3,6 +3,7 @@
|
|||
#include <limits.h>
|
||||
#include <pthread.h>
|
||||
#include <sched.h>
|
||||
#include <stdatomic.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
@ -10,13 +11,19 @@
|
|||
|
||||
#include "erm.h"
|
||||
|
||||
#ifndef DEBUG
|
||||
#define printf(...)
|
||||
#define puts(s)
|
||||
#endif
|
||||
|
||||
struct task {
|
||||
pthread_t thread;
|
||||
char *path;
|
||||
int rv;
|
||||
int created;
|
||||
/* include priority so it doesn't deadlock */
|
||||
int priority;
|
||||
struct task *parent;
|
||||
/* reference counting */
|
||||
atomic_int rc;
|
||||
/* save on syscalls when possible
|
||||
* TODO: actually use */
|
||||
unsigned char type;
|
||||
};
|
||||
|
||||
struct queue {
|
||||
|
@ -27,7 +34,7 @@ struct queue {
|
|||
};
|
||||
static struct queue queue = {.mtx = PTHREAD_MUTEX_INITIALIZER};
|
||||
|
||||
int queue_add(struct queue *q, const char *path, int priority)
|
||||
int queue_add(struct queue *q, const char *path, unsigned char type, struct task *parent)
|
||||
{
|
||||
int rv = 0;
|
||||
|
||||
|
@ -48,7 +55,7 @@ int queue_add(struct queue *q, const char *path, int priority)
|
|||
rv = -1;
|
||||
goto error;
|
||||
}
|
||||
struct task t = {.path = p, .priority = priority};
|
||||
struct task t = {.path = p, .type = type, .parent = parent};
|
||||
q->tasks[q->len++] = t;
|
||||
|
||||
error:
|
||||
|
@ -63,37 +70,15 @@ int queue_remove(struct queue *q, struct task *t)
|
|||
int rv = 0;
|
||||
pthread_mutex_lock(&q->mtx);
|
||||
if (q->len == 0) {
|
||||
errno = EAGAIN;
|
||||
rv = -1;
|
||||
rv = EAGAIN;
|
||||
goto error;
|
||||
}
|
||||
puts("begin========================");
|
||||
for (size_t i=0; i < q->len; i++) {
|
||||
printf("item %010zu: %04d '%s'\n", i, q->tasks[i].priority, q->tasks[i].path);
|
||||
printf("item %010zu: '%s'\n", i, q->tasks[i].path);
|
||||
}
|
||||
puts("end==========================");
|
||||
size_t pos = q->len-1;
|
||||
/* the last position being a 0 is the best case */
|
||||
if (q->tasks[pos].priority == 0) {
|
||||
*t = q->tasks[pos];
|
||||
q->len=pos;
|
||||
} else {
|
||||
int min_priority = INT_MAX;
|
||||
for (size_t i=0; i < q->len; i++) {
|
||||
int priority = q->tasks[i].priority;
|
||||
/* if it's a zero we don't have to scan the rest */
|
||||
if (priority == 0) {
|
||||
pos = i;
|
||||
break;
|
||||
} else if (priority < min_priority) {
|
||||
pos = i;
|
||||
min_priority = priority;
|
||||
}
|
||||
}
|
||||
*t = q->tasks[pos];
|
||||
memmove(q->tasks+pos, q->tasks+pos+1, (q->len-pos-1)*sizeof(struct task));
|
||||
q->len--;
|
||||
}
|
||||
*t = q->tasks[--(q->len)];
|
||||
|
||||
/* the caller owns the path buffer now */
|
||||
error:
|
||||
|
@ -104,7 +89,12 @@ error:
|
|||
int recurse_into(const char *path, int c)
|
||||
{
|
||||
(void)c;
|
||||
return queue_add(&queue, path, 0);
|
||||
return queue_add(&queue, path, DT_UNKNOWN, NULL);
|
||||
}
|
||||
|
||||
static int filter_dir(const struct dirent *d)
|
||||
{
|
||||
return strcmp(".", d->d_name) && strcmp("..", d->d_name);
|
||||
}
|
||||
|
||||
static void *process_queue_item(void *arg)
|
||||
|
@ -113,23 +103,17 @@ static void *process_queue_item(void *arg)
|
|||
struct task t;
|
||||
while (1) {
|
||||
int rv = queue_remove(q, &t);
|
||||
if (rv) {
|
||||
if (errno == EAGAIN) {
|
||||
sched_yield();
|
||||
continue;
|
||||
}
|
||||
/* doesn't happen yet, can be used to signal nothing is needed anymore? */
|
||||
else break;
|
||||
if (rv == EAGAIN) {
|
||||
puts("yield");
|
||||
sched_yield();
|
||||
continue;
|
||||
/* no other errors yet, can be used to signal nothing is needed anymore? */
|
||||
}
|
||||
|
||||
if (unlink(t.path)) {
|
||||
if (errno == EISDIR) {
|
||||
if (rmdir(t.path)) {
|
||||
printf("rmdir failed '%s': %m\n", t.path);
|
||||
if (t.priority > 0) {
|
||||
/* we have already scanned the directory */
|
||||
goto end;
|
||||
}
|
||||
/* fall through to opening directory */
|
||||
} else {
|
||||
/* missing error checking here and in opendir below */
|
||||
|
@ -146,35 +130,69 @@ static void *process_queue_item(void *arg)
|
|||
goto end;
|
||||
}
|
||||
|
||||
DIR *d;
|
||||
while (!(d = opendir(t.path))) {
|
||||
struct dirent **entries;
|
||||
int n;
|
||||
while ((n = scandir(t.path, &entries, filter_dir, alphasort)) == -1) {
|
||||
if (errno == ENFILE) {
|
||||
/* sleep waiting for a closedir elsewhere */
|
||||
sched_yield();
|
||||
continue;
|
||||
} else {
|
||||
break;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
struct task *p = malloc(sizeof *p);
|
||||
*p = t;
|
||||
t.path = NULL;
|
||||
p->rc = n;
|
||||
|
||||
if (d) {
|
||||
struct dirent *entry;
|
||||
while ((entry = readdir(d))) {
|
||||
const char *name = entry->d_name;
|
||||
if (!strcmp(".", name) || !strcmp("..", name)) continue;
|
||||
char buf[PATH_MAX];
|
||||
/* deal with too large path? */
|
||||
snprintf(buf, PATH_MAX, "%s/%s", t.path, name);
|
||||
/* TODO: use recursive mutex, lock queue twice, use readdir only */
|
||||
while (n--) {
|
||||
struct dirent *entry = entries[n];
|
||||
const char *name = entry->d_name;
|
||||
char buf[PATH_MAX];
|
||||
/* deal with too large path? */
|
||||
snprintf(buf, PATH_MAX, "%s/%s", p->path, name);
|
||||
|
||||
queue_add(q, buf, 0);
|
||||
printf("adding to queue'%s'\n", buf);
|
||||
}
|
||||
queue_add(q, buf, entry->d_type, p);
|
||||
printf("adding to queue'%s'\n", buf);
|
||||
free(entry);
|
||||
}
|
||||
|
||||
/* try self again - but the other entries are more important */
|
||||
queue_add(q, t.path, t.priority+1);
|
||||
free(entries);
|
||||
continue;
|
||||
|
||||
end:
|
||||
if (t.parent) {
|
||||
struct task *recurse = &t;
|
||||
void *free_list = NULL;
|
||||
while ((recurse = recurse->parent)) {
|
||||
free(free_list); free_list = NULL;
|
||||
|
||||
int rc = atomic_fetch_sub(&recurse->rc, 1);
|
||||
printf("parent: %04d '%s'\n", rc, recurse->path);
|
||||
if (rc < 1) {
|
||||
printf("bad parent: %04d '%s'\n", rc, recurse->path);
|
||||
abort();
|
||||
}
|
||||
if (rc == 1) {
|
||||
/* reference counting fell down to 0 */
|
||||
if (rmdir(recurse->path)) {
|
||||
printf("rec rmdir failed '%s': %m\n", recurse->path);
|
||||
} else {
|
||||
printf("rec rmdir succeeded '%s'\n", recurse->path);
|
||||
}
|
||||
free(recurse->path);
|
||||
/* can't free now because the while condition uses it */
|
||||
free_list = recurse;
|
||||
} else {
|
||||
/* if we haven't removed this directory yet,
|
||||
* there's no reason to recurse further */
|
||||
break;
|
||||
}
|
||||
}
|
||||
/* catch any stragglers, in case the loop doesn't iterate once more */
|
||||
free(free_list);
|
||||
}
|
||||
/* we took ownership of the buffer */
|
||||
free(t.path);
|
||||
}
|
||||
|
@ -187,9 +205,7 @@ int run_queue(void)
|
|||
nproc = sysconf(_SC_NPROCESSORS_ONLN);
|
||||
if (nproc < 1) nproc = 1;
|
||||
if (nproc > 64) nproc = 64;
|
||||
//nproc = 1;
|
||||
|
||||
/* warning with sizeof(*threads) ? */
|
||||
pthread_t *threads = calloc(sizeof(pthread_t), nproc);
|
||||
if (!threads) return -1;
|
||||
|
||||
|
|
Loading…
Reference in New Issue