Add fast path removal to avoid allocations.

Instead of always adding files to the queue, we can try to remove them
in the readdir loop. This allows us to:

- add fewer items to the queue
- skip allocating and copying the path, since with the dir stream open
  we can use unlinkat(2)
- allocate parent task lazily, since it might not be needed
- stop using a recursive mutex, which can be slightly more expensive

Doing this in a naive way lead to a slow down, since we were holding the
queue mutex during the entirety of the operation. Instead, it was
necessary to change the loop structure a lot in order to be able to add
items to the queue without knowing the number of entries in the
directory. It could have been calculated with a readdir(3) loop +
rewinddir(3), but that would have added a lot of syscalls. In order to
work around that, we changed the purpose of the atomic int in struct
task.

Now, removed_count holds how many entries from the directory were
removed and a flag in its most significant byte to signal whether we are
still adding entries to the queue that refer to it or not. This flag,
plus some fancy atomic operations, allow us to control whether the
directory cleanup should happen in the thread that was adding its
entries to the queue or in the thread that removes the last item from
the queue.

We consider it safe to use the most significant bit of the unsigned int
as a flag because scandir(3) returns a signed int for the number of
entries in a directory.
This commit is contained in:
Érico Nogueira 2021-10-03 01:08:18 -03:00
parent be074ea79e
commit a00591c9a2
1 changed files with 80 additions and 48 deletions

128
remove.c
View File

@ -1,5 +1,6 @@
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <pthread.h>
#include <sched.h>
@ -16,11 +17,14 @@
#define puts(s)
#endif
#define ACQUIRED (1U<<31)
struct task {
char *path;
struct task *parent;
/* reference counting */
atomic_int rc;
unsigned files;
atomic_uint removed_count;
/* file type as reported by readdir() */
unsigned char type;
};
@ -105,6 +109,37 @@ error:
return rv;
}
static inline void recurse_into_parents(struct task *t)
{
struct task *recurse = t;
void *free_list = NULL;
while ((recurse = recurse->parent)) {
free(free_list); free_list = NULL;
unsigned rc = atomic_fetch_add_explicit(&recurse->removed_count, 1, memory_order_acq_rel);
if (rc & ACQUIRED) break;
printf("parent: removed %04d total %04d '%s'\n", rc, recurse->files, recurse->path);
if (rc == recurse->files) {
/* we have removed all files in the directory */
if (rmdir(recurse->path)) {
fprintf(stderr, "rec rmdir failed '%s': %m\n", recurse->path);
} else {
printf("rec rmdir succeeded '%s'\n", recurse->path);
}
free(recurse->path);
/* can't free now because the while condition uses it */
free_list = recurse;
} else {
/* if we haven't removed this directory yet,
* there's no reason to recurse further */
break;
}
}
/* catch any stragglers, in case the loop doesn't iterate once more */
free(free_list);
}
static void *process_queue_item(void *arg)
{
struct queue *q = arg;
@ -145,19 +180,34 @@ remove_dir:
break;
}
}
struct task *p = malloc(sizeof *p);
*p = t;
t.path = NULL;
int dfd = dirfd(d);
int n = 0;
size_t plen = strlen(p->path);
struct task *p = NULL;
unsigned n = 0;
size_t plen = strlen(t.path);
struct dirent *entry;
pthread_mutex_lock(&q->mtx);
while ((entry = readdir(d))) {
if (strcmp(".", entry->d_name)==0 || strcmp("..", entry->d_name)==0) continue;
/* fast path to avoid allocations */
int trv;
if (entry->d_type == DT_DIR) goto fast_path_dir;
if ((trv = unlinkat(dfd, entry->d_name, 0)) && errno == EISDIR) {
fast_path_dir:
trv = unlinkat(dfd, entry->d_name, AT_REMOVEDIR);
}
if (!trv) continue;
n++;
/* lazy allocation of p */
if (!p) {
p = malloc(sizeof *p);
*p = t;
/* access happens only after mutex lock and release */
atomic_store_explicit(&p->removed_count, ACQUIRED, memory_order_relaxed);
}
size_t nlen = strlen(entry->d_name);
char *buf = malloc(plen + nlen + 2);
memcpy(buf, p->path, plen);
@ -168,48 +218,35 @@ remove_dir:
printf("adding to queue'%s'\n", buf);
queue_add(q, buf, entry->d_type, p);
}
/* this store doesn't need to be atomic, since we release the mutex below */
atomic_store_explicit(&p->rc, n, memory_order_relaxed);
pthread_mutex_unlock(&q->mtx);
closedir(d);
pthread_mutex_lock(&fd_mtx);
pthread_cond_signal(&fd_cond);
pthread_mutex_unlock(&fd_mtx);
continue;
if (p) {
p->files = n-1; /* other thread will compare against removed_count-1 */
unsigned rc = atomic_fetch_and_explicit(&p->removed_count, ~ACQUIRED, memory_order_acq_rel);
if (rc == (n|ACQUIRED)) {
free(p);
if (rmdir(t.path)) {
fprintf(stderr, "atomic rmdir failed '%s': %m\n", t.path);
} else {
printf("atomic rmdir succeeded '%s'\n", t.path);
}
} else {
continue;
}
} else {
/* p wasn't set because we could delete everything inside it */
if (rmdir(t.path)) {
fprintf(stderr, "fast path rmdir failed '%s': %m\n", t.path);
} else {
printf("fast path rmdir succeeded '%s'\n", t.path);
}
}
end:
if (t.parent) {
struct task *recurse = &t;
void *free_list = NULL;
while ((recurse = recurse->parent)) {
free(free_list); free_list = NULL;
int rc = atomic_fetch_sub(&recurse->rc, 1);
printf("parent: %04d '%s'\n", rc, recurse->path);
if (rc < 1) {
fprintf(stderr, "bad parent: %04d '%s'\n", rc, recurse->path);
abort();
}
if (rc == 1) {
/* reference counting fell down to 0 */
if (rmdir(recurse->path)) {
printf("rec rmdir failed '%s': %m\n", recurse->path);
} else {
printf("rec rmdir succeeded '%s'\n", recurse->path);
}
free(recurse->path);
/* can't free now because the while condition uses it */
free_list = recurse;
} else {
/* if we haven't removed this directory yet,
* there's no reason to recurse further */
break;
}
}
/* catch any stragglers, in case the loop doesn't iterate once more */
free(free_list);
}
if (t.parent) recurse_into_parents(&t);
/* we took ownership of the buffer */
free(t.path);
}
@ -223,12 +260,7 @@ int run_queue(void)
if (nproc < 1) nproc = 1;
if (nproc > 64) nproc = 64;
pthread_mutexattr_t mattr;
if (pthread_mutexattr_init(&mattr)) return -1;
if (pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_RECURSIVE)) return -1;
if (pthread_mutex_init(&queue.mtx, &mattr)) return -1;
pthread_mutexattr_destroy(&mattr);
if (pthread_mutex_init(&queue.mtx, NULL)) return -1;
if (pthread_cond_init(&queue.cond, NULL)) return -1;
pthread_attr_t pattr;