mirror of https://github.com/ericonr/erm.git
Add directory file descriptor caching as well.
Now a parent can store its own fd, which will live on for as long as the parent does, allowing us to use relative *at functions (that might be faster on the kernel side?) and store smaller path buffers. Since we were playing with soft limit detection, also make the EMFILE operations conditional on limited_fds, avoiding potentially expensive mutex operations when possible. When checking the soft limit, we use 2 for the number of standard file descriptors, because we closed stdin in main().
This commit is contained in:
parent
3db1db5cf4
commit
e56abc9ff9
77
remove.c
77
remove.c
|
@ -5,9 +5,11 @@
|
|||
#include <pthread.h>
|
||||
#include <sched.h>
|
||||
#include <stdatomic.h>
|
||||
#include <stdbool.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/resource.h>
|
||||
#include <threads.h>
|
||||
#include <unistd.h>
|
||||
|
||||
|
@ -25,6 +27,8 @@ struct task {
|
|||
struct task *parent;
|
||||
/* reference counting */
|
||||
unsigned files;
|
||||
/* stores the dirfd for this path or -1 */
|
||||
int dfd;
|
||||
atomic_uint removed_count;
|
||||
};
|
||||
|
||||
|
@ -41,6 +45,10 @@ static pthread_mutex_t fd_mtx = PTHREAD_MUTEX_INITIALIZER;
|
|||
static pthread_cond_t fd_cond = PTHREAD_COND_INITIALIZER;
|
||||
|
||||
static unsigned nproc;
|
||||
/* is true if number of available fds is smaller than number of threads being used */
|
||||
static bool limited_fds;
|
||||
/* stores amount of additional fds that can be used beyond the one per thread */
|
||||
static atomic_int dfd_max;
|
||||
|
||||
/* p_old is a struct task cache; this means each thread can leak one struct task in total */
|
||||
static thread_local struct task *p_old = NULL;
|
||||
|
@ -98,6 +106,19 @@ static inline void queue_remove(struct queue *q, struct task *t)
|
|||
pthread_mutex_unlock(&q->mtx);
|
||||
}
|
||||
|
||||
static void close_dfd(const struct task *t)
|
||||
{
|
||||
if (t->dfd == -1) return;
|
||||
close(t->dfd);
|
||||
atomic_fetch_add_explicit(&dfd_max, 1, memory_order_relaxed);
|
||||
}
|
||||
|
||||
static int rmdir_parent(const struct task *t)
|
||||
{
|
||||
int rfd = (t->parent && t->parent->dfd != -1) ? t->parent->dfd : AT_FDCWD;
|
||||
return unlinkat(rfd, t->path, AT_REMOVEDIR);
|
||||
}
|
||||
|
||||
static inline void recurse_into_parents(struct task *t)
|
||||
{
|
||||
struct task *recurse = t, *free_list = NULL;
|
||||
|
@ -110,7 +131,7 @@ static inline void recurse_into_parents(struct task *t)
|
|||
printf("parent: removed %04d total %04d '%s'\n", rc, recurse->files, recurse->path);
|
||||
if (rc == recurse->files) {
|
||||
/* we have removed all files in the directory */
|
||||
if (rmdir(recurse->path)) {
|
||||
if (rmdir_parent(recurse)) {
|
||||
fprintf(stderr, "rec rmdir failed '%s': %m\n", recurse->path);
|
||||
} else {
|
||||
printf("rec rmdir succeeded '%s'\n", recurse->path);
|
||||
|
@ -118,6 +139,7 @@ static inline void recurse_into_parents(struct task *t)
|
|||
free(recurse->path);
|
||||
|
||||
/* can't free now because the while condition uses it */
|
||||
close_dfd(recurse);
|
||||
if (p_old) free_list = recurse;
|
||||
else p_old = recurse;
|
||||
} else {
|
||||
|
@ -138,8 +160,9 @@ static void *process_queue_item(void *arg)
|
|||
queue_remove(q, &t);
|
||||
|
||||
int dfd;
|
||||
while ((dfd = open(t.path, O_RDONLY|O_DIRECTORY|O_NOFOLLOW|O_CLOEXEC)) < 0) {
|
||||
if (errno == EMFILE) {
|
||||
int rfd = (t.parent && t.parent->dfd != -1) ? t.parent->dfd : AT_FDCWD;
|
||||
while ((dfd = openat(rfd, t.path, O_RDONLY|O_DIRECTORY|O_NOFOLLOW|O_CLOEXEC)) < 0) {
|
||||
if (limited_fds && errno == EMFILE) {
|
||||
pthread_mutex_lock(&fd_mtx);
|
||||
pthread_cond_wait(&fd_cond, &fd_mtx);
|
||||
pthread_mutex_unlock(&fd_mtx);
|
||||
|
@ -187,36 +210,54 @@ fast_path_dir:
|
|||
puts("didn't use p_old");
|
||||
}
|
||||
*p = t;
|
||||
p->dfd = -1;
|
||||
/* access happens only after mutex lock and release */
|
||||
atomic_store_explicit(&p->removed_count, ACQUIRED, memory_order_relaxed);
|
||||
|
||||
plen = strlen(t.path);
|
||||
if (!limited_fds) {
|
||||
if (atomic_fetch_sub_explicit(&dfd_max, 1, memory_order_relaxed) > 0) {
|
||||
/* we need to duplicate the fd due to calling closedir() below */
|
||||
p->dfd = dup(dfd);
|
||||
} else {
|
||||
atomic_fetch_add_explicit(&dfd_max, 1, memory_order_relaxed);
|
||||
/* we only use plen for absolute paths */
|
||||
plen = strlen(t.path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
size_t nlen = strlen(entry->d_name);
|
||||
char *buf = malloc(plen + nlen + 2);
|
||||
memcpy(buf, p->path, plen);
|
||||
buf[plen] = '/';
|
||||
memcpy(buf+plen+1, entry->d_name, nlen);
|
||||
buf[plen+nlen+1] = '\0';
|
||||
char *buf;
|
||||
if (p->dfd == -1) {
|
||||
size_t nlen = strlen(entry->d_name);
|
||||
buf = malloc(plen + nlen + 2);
|
||||
memcpy(buf, p->path, plen);
|
||||
buf[plen] = '/';
|
||||
memcpy(buf+plen+1, entry->d_name, nlen);
|
||||
buf[plen+nlen+1] = '\0';
|
||||
} else {
|
||||
buf = strdup(entry->d_name);
|
||||
}
|
||||
|
||||
printf("adding to queue'%s'\n", buf);
|
||||
queue_add(q, buf, p);
|
||||
}
|
||||
closedir(d);
|
||||
pthread_mutex_lock(&fd_mtx);
|
||||
pthread_cond_signal(&fd_cond);
|
||||
pthread_mutex_unlock(&fd_mtx);
|
||||
if (limited_fds) {
|
||||
pthread_mutex_lock(&fd_mtx);
|
||||
pthread_cond_signal(&fd_cond);
|
||||
pthread_mutex_unlock(&fd_mtx);
|
||||
}
|
||||
|
||||
if (p) {
|
||||
p->files = n-1; /* other thread will compare against removed_count-1 */
|
||||
unsigned rc = atomic_fetch_and_explicit(&p->removed_count, ~ACQUIRED, memory_order_release);
|
||||
if (rc == (n|ACQUIRED)) {
|
||||
/* this branch is taken when other threads have already removed all of p's children */
|
||||
close_dfd(p);
|
||||
if (p_old) free(p);
|
||||
else p_old = p;
|
||||
|
||||
if (rmdir(t.path)) {
|
||||
if (rmdir_parent(&t)) {
|
||||
fprintf(stderr, "atomic rmdir failed '%s': %m\n", t.path);
|
||||
} else {
|
||||
printf("atomic rmdir succeeded '%s'\n", t.path);
|
||||
|
@ -227,7 +268,7 @@ fast_path_dir:
|
|||
}
|
||||
} else {
|
||||
/* p wasn't set because we could delete everything inside it */
|
||||
if (rmdir(t.path)) {
|
||||
if (rmdir_parent(&t)) {
|
||||
fprintf(stderr, "fast path rmdir failed '%s': %m\n", t.path);
|
||||
} else {
|
||||
printf("fast path rmdir succeeded '%s'\n", t.path);
|
||||
|
@ -255,10 +296,14 @@ void run_queue(void)
|
|||
if (nproc_l > 64) nproc_l = 64;
|
||||
nproc = nproc_l;
|
||||
|
||||
struct rlimit rl;
|
||||
if (getrlimit(RLIMIT_NOFILE, &rl)) exit_init();
|
||||
/* soft limit minus open std streams and minus directory fds from each thread */
|
||||
if (rl.rlim_cur < nproc + 2) limited_fds = true;
|
||||
else atomic_store_explicit(&dfd_max, rl.rlim_cur - 2 - nproc, memory_order_relaxed);
|
||||
|
||||
/* main thread will also be a task */
|
||||
unsigned nproc1 = nproc - 1;
|
||||
|
||||
if (nproc1) {
|
||||
pthread_attr_t pattr;
|
||||
if (pthread_attr_init(&pattr)) exit_init();
|
||||
|
|
Loading…
Reference in New Issue