diff --git a/remove.c b/remove.c index 8738851..2163802 100644 --- a/remove.c +++ b/remove.c @@ -5,9 +5,11 @@ #include #include #include +#include #include #include #include +#include #include #include @@ -25,6 +27,8 @@ struct task { struct task *parent; /* reference counting */ unsigned files; + /* stores the dirfd for this path or -1 */ + int dfd; atomic_uint removed_count; }; @@ -41,6 +45,10 @@ static pthread_mutex_t fd_mtx = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t fd_cond = PTHREAD_COND_INITIALIZER; static unsigned nproc; +/* is true if number of available fds is smaller than number of threads being used */ +static bool limited_fds; +/* stores amount of additional fds that can be used beyond the one per thread */ +static atomic_int dfd_max; /* p_old is a struct task cache; this means each thread can leak one struct task in total */ static thread_local struct task *p_old = NULL; @@ -98,6 +106,19 @@ static inline void queue_remove(struct queue *q, struct task *t) pthread_mutex_unlock(&q->mtx); } +static void close_dfd(const struct task *t) +{ + if (t->dfd == -1) return; + close(t->dfd); + atomic_fetch_add_explicit(&dfd_max, 1, memory_order_relaxed); +} + +static int rmdir_parent(const struct task *t) +{ + int rfd = (t->parent && t->parent->dfd != -1) ? t->parent->dfd : AT_FDCWD; + return unlinkat(rfd, t->path, AT_REMOVEDIR); +} + static inline void recurse_into_parents(struct task *t) { struct task *recurse = t, *free_list = NULL; @@ -110,7 +131,7 @@ static inline void recurse_into_parents(struct task *t) printf("parent: removed %04d total %04d '%s'\n", rc, recurse->files, recurse->path); if (rc == recurse->files) { /* we have removed all files in the directory */ - if (rmdir(recurse->path)) { + if (rmdir_parent(recurse)) { fprintf(stderr, "rec rmdir failed '%s': %m\n", recurse->path); } else { printf("rec rmdir succeeded '%s'\n", recurse->path); @@ -118,6 +139,7 @@ static inline void recurse_into_parents(struct task *t) free(recurse->path); /* can't free now because the while condition uses it */ + close_dfd(recurse); if (p_old) free_list = recurse; else p_old = recurse; } else { @@ -138,8 +160,9 @@ static void *process_queue_item(void *arg) queue_remove(q, &t); int dfd; - while ((dfd = open(t.path, O_RDONLY|O_DIRECTORY|O_NOFOLLOW|O_CLOEXEC)) < 0) { - if (errno == EMFILE) { + int rfd = (t.parent && t.parent->dfd != -1) ? t.parent->dfd : AT_FDCWD; + while ((dfd = openat(rfd, t.path, O_RDONLY|O_DIRECTORY|O_NOFOLLOW|O_CLOEXEC)) < 0) { + if (limited_fds && errno == EMFILE) { pthread_mutex_lock(&fd_mtx); pthread_cond_wait(&fd_cond, &fd_mtx); pthread_mutex_unlock(&fd_mtx); @@ -187,36 +210,54 @@ fast_path_dir: puts("didn't use p_old"); } *p = t; + p->dfd = -1; /* access happens only after mutex lock and release */ atomic_store_explicit(&p->removed_count, ACQUIRED, memory_order_relaxed); - plen = strlen(t.path); + if (!limited_fds) { + if (atomic_fetch_sub_explicit(&dfd_max, 1, memory_order_relaxed) > 0) { + /* we need to duplicate the fd due to calling closedir() below */ + p->dfd = dup(dfd); + } else { + atomic_fetch_add_explicit(&dfd_max, 1, memory_order_relaxed); + /* we only use plen for absolute paths */ + plen = strlen(t.path); + } + } } - size_t nlen = strlen(entry->d_name); - char *buf = malloc(plen + nlen + 2); - memcpy(buf, p->path, plen); - buf[plen] = '/'; - memcpy(buf+plen+1, entry->d_name, nlen); - buf[plen+nlen+1] = '\0'; + char *buf; + if (p->dfd == -1) { + size_t nlen = strlen(entry->d_name); + buf = malloc(plen + nlen + 2); + memcpy(buf, p->path, plen); + buf[plen] = '/'; + memcpy(buf+plen+1, entry->d_name, nlen); + buf[plen+nlen+1] = '\0'; + } else { + buf = strdup(entry->d_name); + } printf("adding to queue'%s'\n", buf); queue_add(q, buf, p); } closedir(d); - pthread_mutex_lock(&fd_mtx); - pthread_cond_signal(&fd_cond); - pthread_mutex_unlock(&fd_mtx); + if (limited_fds) { + pthread_mutex_lock(&fd_mtx); + pthread_cond_signal(&fd_cond); + pthread_mutex_unlock(&fd_mtx); + } if (p) { p->files = n-1; /* other thread will compare against removed_count-1 */ unsigned rc = atomic_fetch_and_explicit(&p->removed_count, ~ACQUIRED, memory_order_release); if (rc == (n|ACQUIRED)) { /* this branch is taken when other threads have already removed all of p's children */ + close_dfd(p); if (p_old) free(p); else p_old = p; - if (rmdir(t.path)) { + if (rmdir_parent(&t)) { fprintf(stderr, "atomic rmdir failed '%s': %m\n", t.path); } else { printf("atomic rmdir succeeded '%s'\n", t.path); @@ -227,7 +268,7 @@ fast_path_dir: } } else { /* p wasn't set because we could delete everything inside it */ - if (rmdir(t.path)) { + if (rmdir_parent(&t)) { fprintf(stderr, "fast path rmdir failed '%s': %m\n", t.path); } else { printf("fast path rmdir succeeded '%s'\n", t.path); @@ -255,10 +296,14 @@ void run_queue(void) if (nproc_l > 64) nproc_l = 64; nproc = nproc_l; + struct rlimit rl; + if (getrlimit(RLIMIT_NOFILE, &rl)) exit_init(); + /* soft limit minus open std streams and minus directory fds from each thread */ + if (rl.rlim_cur < nproc + 2) limited_fds = true; + else atomic_store_explicit(&dfd_max, rl.rlim_cur - 2 - nproc, memory_order_relaxed); /* main thread will also be a task */ unsigned nproc1 = nproc - 1; - if (nproc1) { pthread_attr_t pattr; if (pthread_attr_init(&pattr)) exit_init();