#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "erm.h" #ifndef DEBUG #define printf(...) #define puts(s) #endif #define ACQUIRED (1U<<31) struct task { char *path; struct task *parent; /* reference counting */ unsigned files; /* stores the dirfd for this path or -1 */ int dfd; atomic_uint removed_count; }; static struct queue { pthread_mutex_t mtx; pthread_cond_t cond; struct task *tasks; size_t len, size; /* number of free threads */ unsigned free; } queue = {.mtx = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER}; static pthread_mutex_t fd_mtx = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t fd_cond = PTHREAD_COND_INITIALIZER; static unsigned nproc; /* is true if number of available fds is smaller than number of threads being used */ static bool limited_fds; /* stores amount of additional fds that can be used beyond the one per thread */ static atomic_int dfd_max; /* p_old is a struct task cache; this means each thread can leak one struct task in total */ static thread_local struct task *p_old = NULL; static inline void queue_print(struct queue *q) { #ifdef DEBUGP puts("begin========================"); for (size_t i=0; i < q->len; i++) { printf("item %010zu: '%s'\n", i, q->tasks[i].path); } puts("end=========================="); #else (void)q; #endif } static inline void queue_add(struct queue *q, char *path, struct task *parent) { pthread_mutex_lock(&q->mtx); if (q->len + 1 > q->size) { q->size *= 2; if (q->size == 0) q->size = 32; q->tasks = realloc(q->tasks, q->size * sizeof(struct task)); if (!q->tasks) { fprintf(stderr, "queue memory exhaustion: %m\n"); exit(1); } } q->tasks[q->len++] = (struct task){.path = path, .parent = parent}; pthread_cond_signal(&q->cond); pthread_mutex_unlock(&q->mtx); } static inline void queue_remove(struct queue *q, struct task *t) { pthread_mutex_lock(&q->mtx); while (q->len == 0) { if (q->free == nproc - 1) { /* we are done removing things */ free(q->tasks); exit(0); } q->free++; pthread_cond_wait(&q->cond, &q->mtx); q->free--; } queue_print(q); *t = q->tasks[--(q->len)]; /* the caller owns the path buffer now */ pthread_mutex_unlock(&q->mtx); } static void close_dfd(const struct task *t) { if (t->dfd == -1) return; close(t->dfd); atomic_fetch_add_explicit(&dfd_max, 1, memory_order_relaxed); } static int rmdir_parent(const struct task *t) { int rfd = (t->parent && t->parent->dfd != -1) ? t->parent->dfd : AT_FDCWD; return unlinkat(rfd, t->path, AT_REMOVEDIR); } static inline void recurse_into_parents(struct task *t) { struct task *recurse = t, *free_list = NULL; while ((recurse = recurse->parent)) { free(free_list); free_list = NULL; unsigned rc = atomic_fetch_add_explicit(&recurse->removed_count, 1, memory_order_acquire); if (rc & ACQUIRED) break; printf("parent: removed %04d total %04d '%s'\n", rc, recurse->files, recurse->path); if (rc == recurse->files) { /* we have removed all files in the directory */ if (rmdir_parent(recurse)) { fprintf(stderr, "rec rmdir failed '%s': %m\n", recurse->path); } else { printf("rec rmdir succeeded '%s'\n", recurse->path); } free(recurse->path); /* can't free now because the while condition uses it */ close_dfd(recurse); if (p_old) free_list = recurse; else p_old = recurse; } else { /* if we haven't removed this directory yet, * there's no reason to recurse further */ break; } } /* catch any stragglers, in case the loop doesn't iterate once more */ free(free_list); } static void *process_queue_item(void *arg) { struct queue *q = arg; struct task t; while (1) { queue_remove(q, &t); int dfd; int rfd = (t.parent && t.parent->dfd != -1) ? t.parent->dfd : AT_FDCWD; while ((dfd = openat(rfd, t.path, O_RDONLY|O_DIRECTORY|O_NOFOLLOW|O_CLOEXEC)) < 0) { if (limited_fds && errno == EMFILE) { pthread_mutex_lock(&fd_mtx); pthread_cond_wait(&fd_cond, &fd_mtx); pthread_mutex_unlock(&fd_mtx); continue; } else { fprintf(stderr, "couldn't open '%s': %m\n", t.path); exit(1); } } DIR *d = fdopendir(dfd); if (!d) { fprintf(stderr, "couldn't create directory stream: %m\n"); exit(1); } struct task *p = NULL; unsigned n = 0; size_t plen; struct dirent *entry; while ((entry = readdir(d))) { if (entry->d_name[0] == '.' && (entry->d_name[1] == '\0' || (entry->d_name[1] == '.' && entry->d_name[2] == '\0'))) continue; /* fast path to avoid allocations */ int trv; if (entry->d_type == DT_DIR) goto fast_path_dir; if ((trv = unlinkat(dfd, entry->d_name, 0)) && errno == EISDIR) { fast_path_dir: trv = unlinkat(dfd, entry->d_name, AT_REMOVEDIR); } if (!trv) continue; n++; /* lazy allocation of p and other operations */ if (!p) { if (p_old) { p = p_old; p_old = NULL; puts("used p_old"); } else { p = malloc(sizeof *p); puts("didn't use p_old"); } *p = t; p->dfd = -1; /* access happens only after mutex lock and release */ atomic_store_explicit(&p->removed_count, ACQUIRED, memory_order_relaxed); if (!limited_fds) { if (atomic_fetch_sub_explicit(&dfd_max, 1, memory_order_relaxed) > 0) { /* we need to duplicate the fd due to calling closedir() below */ p->dfd = dup(dfd); } else { atomic_fetch_add_explicit(&dfd_max, 1, memory_order_relaxed); /* we only use plen for absolute paths */ plen = strlen(t.path); } } } char *buf; if (p->dfd == -1) { size_t nlen = strlen(entry->d_name); buf = malloc(plen + nlen + 2); memcpy(buf, p->path, plen); buf[plen] = '/'; memcpy(buf+plen+1, entry->d_name, nlen); buf[plen+nlen+1] = '\0'; } else { buf = strdup(entry->d_name); } printf("adding to queue'%s'\n", buf); queue_add(q, buf, p); } closedir(d); if (limited_fds) { pthread_mutex_lock(&fd_mtx); pthread_cond_signal(&fd_cond); pthread_mutex_unlock(&fd_mtx); } if (p) { p->files = n-1; /* other thread will compare against removed_count-1 */ unsigned rc = atomic_fetch_and_explicit(&p->removed_count, ~ACQUIRED, memory_order_release); if (rc == (n|ACQUIRED)) { /* this branch is taken when other threads have already removed all of p's children */ close_dfd(p); if (p_old) free(p); else p_old = p; if (rmdir_parent(&t)) { fprintf(stderr, "atomic rmdir failed '%s': %m\n", t.path); } else { printf("atomic rmdir succeeded '%s'\n", t.path); } } else { /* we can't recurse into p's parent if p still has children that need to be removed */ continue; } } else { /* p wasn't set because we could delete everything inside it */ if (rmdir_parent(&t)) { fprintf(stderr, "fast path rmdir failed '%s': %m\n", t.path); } else { printf("fast path rmdir succeeded '%s'\n", t.path); } } if (t.parent) recurse_into_parents(&t); /* we took ownership of the buffer */ free(t.path); } return NULL; } static void exit_init(void) { fprintf(stderr, "thread initialization failed: %m\n"); exit(1); } void run_queue(void) { long nproc_l = sysconf(_SC_NPROCESSORS_ONLN); if (nproc_l < 1) nproc_l = 1; if (nproc_l > 64) nproc_l = 64; nproc = nproc_l; struct rlimit rl; if (getrlimit(RLIMIT_NOFILE, &rl)) exit_init(); /* soft limit minus open std streams and minus directory fds from each thread */ if (rl.rlim_cur < nproc + 2) limited_fds = true; else atomic_store_explicit(&dfd_max, rl.rlim_cur - 2 - nproc, memory_order_relaxed); /* main thread will also be a task */ unsigned nproc1 = nproc - 1; if (nproc1) { pthread_attr_t pattr; if (pthread_attr_init(&pattr)) exit_init(); #if defined(PTHREAD_STACK_MIN) if (pthread_attr_setstacksize(&pattr, PTHREAD_STACK_MIN) || pthread_attr_setguardsize(&pattr, 1)) exit_init(); #endif for (unsigned i = 0; i < nproc1; i++) { pthread_t thread; if (pthread_create(&thread, &pattr, process_queue_item, &queue)) exit_init(); pthread_detach(thread); } pthread_attr_destroy(&pattr); } /* become one of the worker threads */ process_queue_item(&queue); } static void fail_single_file(const char *path) { fprintf(stderr, "failed to remove '%s': %m\n", path); } int single_file(const char *path) { int rv = remove(path); if (rv) fail_single_file(path); return rv; } void recurse_into(const char *path, int stop_at_error) { if (!remove(path)) { return; } else if (errno == ENOTEMPTY) { queue_add(&queue, strdup(path), NULL); return; } else { fail_single_file(path); if (stop_at_error) exit(1); } }