#include "first.h" /** * the network chunk-API * * */ #include "chunk.h" #include "fdevent.h" #include "log.h" #include #include #include "sys-mmap.h" #include #include #include #include #include /* default 1MB, upper limit 128MB */ #define DEFAULT_TEMPFILE_SIZE (1 * 1024 * 1024) #define MAX_TEMPFILE_SIZE (128 * 1024 * 1024) static size_t chunk_buf_sz = 4096; static chunk *chunks; static chunk *chunk_buffers; static array *chunkqueue_default_tempdirs = NULL; static unsigned int chunkqueue_default_tempfile_size = DEFAULT_TEMPFILE_SIZE; void chunkqueue_set_chunk_size (size_t sz) { chunk_buf_sz = sz > 0 ? ((sz + 1023) & ~1023uL) : 4096; } void chunkqueue_set_tempdirs_default_reset (void) { chunkqueue_default_tempdirs = NULL; chunkqueue_default_tempfile_size = DEFAULT_TEMPFILE_SIZE; } chunkqueue *chunkqueue_init(void) { chunkqueue *cq; cq = calloc(1, sizeof(*cq)); force_assert(NULL != cq); cq->first = NULL; cq->last = NULL; cq->tempdirs = chunkqueue_default_tempdirs; cq->upload_temp_file_size = chunkqueue_default_tempfile_size; return cq; } static chunk *chunk_init(size_t sz) { chunk *c; c = calloc(1, sizeof(*c)); force_assert(NULL != c); c->type = MEM_CHUNK; c->mem = buffer_init(); c->file.start = c->file.length = c->file.mmap.offset = 0; c->file.fd = -1; c->file.mmap.start = MAP_FAILED; c->file.mmap.length = 0; c->file.is_temp = 0; c->offset = 0; c->next = NULL; buffer_string_prepare_copy(c->mem, sz-1); return c; } static void chunk_reset_file_chunk(chunk *c) { if (c->file.is_temp && !buffer_string_is_empty(c->mem)) { unlink(c->mem->ptr); } if (c->file.fd != -1) { close(c->file.fd); c->file.fd = -1; } if (MAP_FAILED != c->file.mmap.start) { munmap(c->file.mmap.start, c->file.mmap.length); c->file.mmap.start = MAP_FAILED; } c->file.start = c->file.length = c->file.mmap.offset = 0; c->file.mmap.length = 0; c->file.is_temp = 0; c->type = MEM_CHUNK; } static void chunk_reset(chunk *c) { if (c->type == FILE_CHUNK) chunk_reset_file_chunk(c); buffer_clear(c->mem); c->offset = 0; } static void chunk_free(chunk *c) { if (c->type == FILE_CHUNK) chunk_reset_file_chunk(c); buffer_free(c->mem); free(c); } buffer * chunk_buffer_acquire(void) { chunk *c; buffer *b; if (chunks) { c = chunks; chunks = c->next; } else { c = chunk_init(chunk_buf_sz); } c->next = chunk_buffers; chunk_buffers = c; b = c->mem; c->mem = NULL; return b; } void chunk_buffer_release(buffer *b) { if (NULL == b) return; if (b->size >= chunk_buf_sz && chunk_buffers) { chunk *c = chunk_buffers; chunk_buffers = c->next; c->mem = b; c->next = chunks; chunks = c; buffer_clear(b); } else { buffer_free(b); } } static chunk * chunk_acquire(void) { if (chunks) { chunk *c = chunks; chunks = c->next; return c; } else { return chunk_init(chunk_buf_sz); } } static void chunk_release(chunk *c) { if (c->mem->size >= chunk_buf_sz) { chunk_reset(c); c->next = chunks; chunks = c; } else { chunk_free(c); } } void chunkqueue_chunk_pool_clear(void) { for (chunk *next, *c = chunks; c; c = next) { next = c->next; chunk_free(c); } chunks = NULL; } void chunkqueue_chunk_pool_free(void) { chunkqueue_chunk_pool_clear(); for (chunk *next, *c = chunk_buffers; c; c = next) { next = c->next; c->mem = buffer_init(); /*(chunk_reset() expects c->mem != NULL)*/ chunk_free(c); } chunk_buffers = NULL; } static off_t chunk_remaining_length(const chunk *c) { off_t len = 0; switch (c->type) { case MEM_CHUNK: len = buffer_string_length(c->mem); break; case FILE_CHUNK: len = c->file.length; break; default: force_assert(c->type == MEM_CHUNK || c->type == FILE_CHUNK); break; } force_assert(c->offset <= len); return len - c->offset; } void chunkqueue_free(chunkqueue *cq) { chunk *c, *pc; if (NULL == cq) return; for (c = cq->first; c; ) { pc = c; c = c->next; chunk_release(pc); } free(cq); } static void chunkqueue_prepend_chunk(chunkqueue *cq, chunk *c) { c->next = cq->first; cq->first = c; if (NULL == cq->last) { cq->last = c; } } static void chunkqueue_append_chunk(chunkqueue *cq, chunk *c) { c->next = NULL; if (cq->last) { cq->last->next = c; } cq->last = c; if (NULL == cq->first) { cq->first = c; } } static chunk * chunkqueue_prepend_mem_chunk(chunkqueue *cq) { chunk *c = chunk_acquire(); chunkqueue_prepend_chunk(cq, c); return c; } static chunk * chunkqueue_append_mem_chunk(chunkqueue *cq) { chunk *c = chunk_acquire(); chunkqueue_append_chunk(cq, c); return c; } static chunk * chunkqueue_append_file_chunk(chunkqueue *cq, buffer *fn, off_t offset, off_t len) { chunk *c = chunk_acquire(); chunkqueue_append_chunk(cq, c); c->type = FILE_CHUNK; c->file.start = offset; c->file.length = len; cq->bytes_in += len; buffer_copy_buffer(c->mem, fn); return c; } void chunkqueue_reset(chunkqueue *cq) { chunk *cur = cq->first; cq->first = cq->last = NULL; while (NULL != cur) { chunk *next = cur->next; chunk_release(cur); cur = next; } cq->bytes_in = 0; cq->bytes_out = 0; cq->tempdir_idx = 0; } void chunkqueue_append_file_fd(chunkqueue *cq, buffer *fn, int fd, off_t offset, off_t len) { if (len > 0) { (chunkqueue_append_file_chunk(cq, fn, offset, len))->file.fd = fd; } else { close(fd); } } void chunkqueue_append_file(chunkqueue *cq, buffer *fn, off_t offset, off_t len) { if (len > 0) { chunkqueue_append_file_chunk(cq, fn, offset, len); } } static int chunkqueue_append_mem_extend_chunk(chunkqueue *cq, const char *mem, size_t len) { chunk *c = cq->last; if (0 == len) return 1; if (c != NULL && c->type == MEM_CHUNK && buffer_string_space(c->mem) >= len) { buffer_append_string_len(c->mem, mem, len); cq->bytes_in += len; return 1; } return 0; } void chunkqueue_append_buffer(chunkqueue *cq, buffer *mem) { chunk *c; size_t len = buffer_string_length(mem); if (len < 256 && chunkqueue_append_mem_extend_chunk(cq, mem->ptr, len)) return; c = chunkqueue_append_mem_chunk(cq); cq->bytes_in += len; buffer_move(c->mem, mem); } void chunkqueue_append_mem(chunkqueue *cq, const char * mem, size_t len) { chunk *c; if (len < chunk_buf_sz && chunkqueue_append_mem_extend_chunk(cq, mem, len)) return; c = chunkqueue_append_mem_chunk(cq); cq->bytes_in += len; buffer_copy_string_len(c->mem, mem, len); } void chunkqueue_append_mem_min(chunkqueue *cq, const char * mem, size_t len) { chunk *c; if (len < chunk_buf_sz && chunkqueue_append_mem_extend_chunk(cq, mem, len)) return; c = chunk_init(len+1); chunkqueue_append_chunk(cq, c); cq->bytes_in += len; buffer_copy_string_len(c->mem, mem, len); } void chunkqueue_append_chunkqueue(chunkqueue *cq, chunkqueue *src) { if (src == NULL || NULL == src->first) return; if (NULL == cq->first) { cq->first = src->first; } else { cq->last->next = src->first; } cq->last = src->last; cq->bytes_in += (src->bytes_in - src->bytes_out); src->first = NULL; src->last = NULL; src->bytes_out = src->bytes_in; } __attribute_cold__ static void chunkqueue_buffer_open_resize(chunk *c, size_t sz) { chunk * const n = chunk_init((sz + 4095) & ~4095uL); buffer * const b = c->mem; c->mem = n->mem; n->mem = b; chunk_release(n); } buffer * chunkqueue_prepend_buffer_open_sz(chunkqueue *cq, size_t sz) { chunk * const c = chunkqueue_prepend_mem_chunk(cq); if (buffer_string_space(c->mem) < sz) { chunkqueue_buffer_open_resize(c, sz); } return c->mem; } buffer * chunkqueue_prepend_buffer_open(chunkqueue *cq) { chunk *c = chunkqueue_prepend_mem_chunk(cq); return c->mem; } void chunkqueue_prepend_buffer_commit(chunkqueue *cq) { cq->bytes_in += buffer_string_length(cq->first->mem); } buffer * chunkqueue_append_buffer_open_sz(chunkqueue *cq, size_t sz) { chunk * const c = chunkqueue_append_mem_chunk(cq); if (buffer_string_space(c->mem) < sz) { chunkqueue_buffer_open_resize(c, sz); } return c->mem; } buffer * chunkqueue_append_buffer_open(chunkqueue *cq) { chunk *c = chunkqueue_append_mem_chunk(cq); return c->mem; } void chunkqueue_append_buffer_commit(chunkqueue *cq) { cq->bytes_in += buffer_string_length(cq->last->mem); } char * chunkqueue_get_memory(chunkqueue *cq, size_t *len) { size_t sz = *len ? *len : (chunk_buf_sz >> 1); buffer *b; chunk *c = cq->last; if (NULL != c && MEM_CHUNK == c->type) { /* return pointer into existing buffer if large enough */ size_t avail = buffer_string_space(c->mem); if (avail >= sz) { *len = avail; b = c->mem; return b->ptr + buffer_string_length(b); } } /* allocate new chunk */ b = chunkqueue_append_buffer_open_sz(cq, sz); *len = buffer_string_space(b); return b->ptr; } void chunkqueue_use_memory(chunkqueue *cq, size_t len) { buffer *b; force_assert(NULL != cq); force_assert(NULL != cq->last && MEM_CHUNK == cq->last->type); b = cq->last->mem; if (len > 0) { buffer_commit(b, len); cq->bytes_in += len; } else if (buffer_string_is_empty(b)) { /* unused buffer: can't remove chunk easily from * end of list, so just reset the buffer */ buffer_clear(b); } } void chunkqueue_set_tempdirs_default (array *tempdirs, unsigned int upload_temp_file_size) { chunkqueue_default_tempdirs = tempdirs; chunkqueue_default_tempfile_size = (0 == upload_temp_file_size) ? DEFAULT_TEMPFILE_SIZE : (upload_temp_file_size > MAX_TEMPFILE_SIZE) ? MAX_TEMPFILE_SIZE : upload_temp_file_size; } #if 0 void chunkqueue_set_tempdirs(chunkqueue *cq, array *tempdirs, unsigned int upload_temp_file_size) { force_assert(NULL != cq); cq->tempdirs = tempdirs; cq->upload_temp_file_size = (0 == upload_temp_file_size) ? DEFAULT_TEMPFILE_SIZE : (upload_temp_file_size > MAX_TEMPFILE_SIZE) ? MAX_TEMPFILE_SIZE : upload_temp_file_size; cq->tempdir_idx = 0; } #endif void chunkqueue_steal(chunkqueue *dest, chunkqueue *src, off_t len) { while (len > 0) { chunk *c = src->first; off_t clen = 0, use; if (NULL == c) break; clen = chunk_remaining_length(c); if (0 == clen) { /* drop empty chunk */ src->first = c->next; if (c == src->last) src->last = NULL; chunk_release(c); continue; } use = len >= clen ? clen : len; len -= use; if (use == clen) { /* move complete chunk */ src->first = c->next; if (c == src->last) src->last = NULL; chunkqueue_append_chunk(dest, c); dest->bytes_in += use; } else { /* partial chunk with length "use" */ switch (c->type) { case MEM_CHUNK: chunkqueue_append_mem(dest, c->mem->ptr + c->offset, use); break; case FILE_CHUNK: /* tempfile flag is in "last" chunk after the split */ chunkqueue_append_file(dest, c->mem, c->file.start + c->offset, use); break; } c->offset += use; force_assert(0 == len); } src->bytes_out += use; } } static chunk *chunkqueue_get_append_tempfile(server *srv, chunkqueue *cq) { chunk *c; buffer *template = buffer_init_string("/var/tmp/lighttpd-upload-XXXXXX"); int fd = -1; if (cq->tempdirs && cq->tempdirs->used) { /* we have several tempdirs, only if all of them fail we jump out */ for (errno = EIO; cq->tempdir_idx < cq->tempdirs->used; ++cq->tempdir_idx) { data_string *ds = (data_string *)cq->tempdirs->data[cq->tempdir_idx]; buffer_copy_buffer(template, ds->value); buffer_append_path_len(template, CONST_STR_LEN("lighttpd-upload-XXXXXX")); #ifdef __COVERITY__ /* POSIX-2008 requires mkstemp create file with 0600 perms */ umask(0600); #endif /* coverity[secure_temp : FALSE] */ if (-1 != (fd = mkstemp(template->ptr))) break; } } else { #ifdef __COVERITY__ /* POSIX-2008 requires mkstemp create file with 0600 perms */ umask(0600); #endif /* coverity[secure_temp : FALSE] */ fd = mkstemp(template->ptr); } if (fd < 0) { /* (report only the last error to mkstemp() * if multiple temp dirs attempted) */ log_error_write(srv, __FILE__, __LINE__, "sbs", "opening temp-file failed:", template, strerror(errno)); buffer_free(template); return NULL; } if (0 != fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_APPEND)) { /* (should not happen; fd is regular file) */ log_error_write(srv, __FILE__, __LINE__, "sbs", "fcntl():", template, strerror(errno)); close(fd); buffer_free(template); return NULL; } fdevent_setfd_cloexec(fd); c = chunkqueue_append_file_chunk(cq, template, 0, 0); c->file.fd = fd; c->file.is_temp = 1; buffer_free(template); return c; } static void chunkqueue_remove_empty_chunks(chunkqueue *cq); int chunkqueue_append_mem_to_tempfile(server *srv, chunkqueue *dest, const char *mem, size_t len) { chunk *dst_c; ssize_t written; do { /* * if the last chunk is * - smaller than dest->upload_temp_file_size * - not read yet (offset == 0) * -> append to it (so it might actually become larger than dest->upload_temp_file_size) * otherwise * -> create a new chunk * * */ dst_c = dest->last; if (NULL != dst_c && FILE_CHUNK == dst_c->type && dst_c->file.is_temp && dst_c->file.fd >= 0 && 0 == dst_c->offset) { /* ok, take the last chunk for our job */ if (dst_c->file.length >= (off_t)dest->upload_temp_file_size) { /* the chunk is too large now, close it */ int rc = close(dst_c->file.fd); dst_c->file.fd = -1; if (0 != rc) { log_error_write(srv, __FILE__, __LINE__, "sbss", "close() temp-file", dst_c->mem, "failed:", strerror(errno)); return -1; } dst_c = NULL; } } else { dst_c = NULL; } if (NULL == dst_c && NULL == (dst_c = chunkqueue_get_append_tempfile(srv, dest))) { return -1; } #ifdef __COVERITY__ if (dst_c->file.fd < 0) return -1; #endif /* (dst_c->file.fd >= 0) */ /* coverity[negative_returns : FALSE] */ written = write(dst_c->file.fd, mem, len); if ((size_t) written == len) { dst_c->file.length += len; dest->bytes_in += len; return 0; } else if (written >= 0) { /*(assume EINTR if partial write and retry write(); * retry write() might fail with ENOSPC if no more space on volume)*/ dest->bytes_in += written; mem += written; len -= (size_t)written; dst_c->file.length += (size_t)written; /* continue; retry */ } else if (errno == EINTR) { /* continue; retry */ } else { int retry = (errno == ENOSPC && dest->tempdirs && ++dest->tempdir_idx < dest->tempdirs->used); if (!retry) { log_error_write(srv, __FILE__, __LINE__, "sbs", "write() temp-file", dst_c->mem, "failed:", strerror(errno)); } if (0 == chunk_remaining_length(dst_c)) { /*(remove empty chunk and unlink tempfile)*/ chunkqueue_remove_empty_chunks(dest); } else {/*(close tempfile; avoid later attempts to append)*/ int rc = close(dst_c->file.fd); dst_c->file.fd = -1; if (0 != rc) { log_error_write(srv, __FILE__, __LINE__, "sbss", "close() temp-file", dst_c->mem, "failed:", strerror(errno)); return -1; } } if (!retry) break; /* return -1; */ /* continue; retry */ } } while (dst_c); return -1; } int chunkqueue_steal_with_tempfiles(server *srv, chunkqueue *dest, chunkqueue *src, off_t len) { while (len > 0) { chunk *c = src->first; off_t clen = 0, use; if (NULL == c) break; clen = chunk_remaining_length(c); if (0 == clen) { /* drop empty chunk */ src->first = c->next; if (c == src->last) src->last = NULL; chunk_release(c); continue; } use = (len >= clen) ? clen : len; len -= use; switch (c->type) { case FILE_CHUNK: if (use == clen) { /* move complete chunk */ src->first = c->next; if (c == src->last) src->last = NULL; chunkqueue_append_chunk(dest, c); dest->bytes_in += use; } else { /* partial chunk with length "use" */ /* tempfile flag is in "last" chunk after the split */ chunkqueue_append_file(dest, c->mem, c->file.start + c->offset, use); c->offset += use; force_assert(0 == len); } break; case MEM_CHUNK: /* store "use" bytes from memory chunk in tempfile */ if (0 != chunkqueue_append_mem_to_tempfile(srv, dest, c->mem->ptr + c->offset, use)) { return -1; } if (use == clen) { /* finished chunk */ src->first = c->next; if (c == src->last) src->last = NULL; chunk_release(c); } else { /* partial chunk */ c->offset += use; force_assert(0 == len); } break; } src->bytes_out += use; } return 0; } off_t chunkqueue_length(chunkqueue *cq) { off_t len = 0; chunk *c; for (c = cq->first; c; c = c->next) { len += chunk_remaining_length(c); } return len; } void chunkqueue_mark_written(chunkqueue *cq, off_t len) { off_t written = len; chunk *c; force_assert(len >= 0); for (c = cq->first; NULL != c; c = cq->first) { off_t c_len = chunk_remaining_length(c); if (0 == written && 0 != c_len) break; /* no more finished chunks */ if (written >= c_len) { /* chunk got finished */ c->offset += c_len; written -= c_len; cq->first = c->next; if (c == cq->last) cq->last = NULL; chunk_release(c); } else { /* partial chunk */ c->offset += written; written = 0; break; /* chunk not finished */ } } force_assert(0 == written); cq->bytes_out += len; } void chunkqueue_remove_finished_chunks(chunkqueue *cq) { chunk *c; for (c = cq->first; c; c = cq->first) { if (0 != chunk_remaining_length(c)) break; /* not finished yet */ cq->first = c->next; if (c == cq->last) cq->last = NULL; chunk_release(c); } } static void chunkqueue_remove_empty_chunks(chunkqueue *cq) { chunk *c; chunkqueue_remove_finished_chunks(cq); if (chunkqueue_is_empty(cq)) return; for (c = cq->first; c && c->next; c = c->next) { if (0 == chunk_remaining_length(c->next)) { chunk *empty = c->next; c->next = empty->next; if (empty == cq->last) cq->last = c; chunk_release(empty); } } } int chunkqueue_open_file_chunk(server *srv, chunkqueue *cq) { chunk* const c = cq->first; off_t offset, toSend; struct stat st; force_assert(NULL != c); force_assert(FILE_CHUNK == c->type); force_assert(c->offset >= 0 && c->offset <= c->file.length); offset = c->file.start + c->offset; toSend = c->file.length - c->offset; if (-1 == c->file.fd) { if (-1 == (c->file.fd = fdevent_open_cloexec(c->mem->ptr, O_RDONLY, 0))) { log_error_write(srv, __FILE__, __LINE__, "ssb", "open failed:", strerror(errno), c->mem); return -1; } } /*(skip file size checks if file is temp file created by lighttpd)*/ if (c->file.is_temp) return 0; if (-1 == fstat(c->file.fd, &st)) { log_error_write(srv, __FILE__, __LINE__, "ss", "fstat failed:", strerror(errno)); return -1; } if (offset > st.st_size || toSend > st.st_size || offset > st.st_size - toSend) { log_error_write(srv, __FILE__, __LINE__, "sb", "file shrunk:", c->mem); return -1; } return 0; }