#endif
#define PROTOCOLS (CURLPROTO_HTTP | CURLPROTO_HTTPS | \
- CURLPROTO_FTP | CURLPROTO_FTPS | \
- CURLPROTO_TFTP)
+ CURLPROTO_FTP | CURLPROTO_FTPS)
#define CURL_NUM_STATES 8
#define CURL_NUM_ACB 8
-#define SECTOR_SIZE 512
#define READ_AHEAD_DEFAULT (256 * 1024)
#define CURL_TIMEOUT_DEFAULT 5
#define CURL_TIMEOUT_MAX 10000
typedef struct CURLAIOCB {
BlockAIOCB common;
- QEMUBH *bh;
QEMUIOVector *qiov;
int64_t sector_num;
size_t end;
} CURLAIOCB;
+typedef struct CURLSocket {
+ int fd;
+ QLIST_ENTRY(CURLSocket) next;
+} CURLSocket;
+
typedef struct CURLState
{
struct BDRVCURLState *s;
CURLAIOCB *acb[CURL_NUM_ACB];
CURL *curl;
- curl_socket_t sock_fd;
+ QLIST_HEAD(, CURLSocket) sockets;
char *orig_buf;
size_t buf_start;
size_t buf_off;
char *cookie;
bool accept_range;
AioContext *aio_context;
+ QemuMutex mutex;
char *username;
char *password;
char *proxyusername;
{
BDRVCURLState *s;
CURLState *state = NULL;
+ CURLSocket *socket;
+
curl_easy_getinfo(curl, CURLINFO_PRIVATE, (char **)&state);
- state->sock_fd = fd;
s = state->s;
+ QLIST_FOREACH(socket, &state->sockets, next) {
+ if (socket->fd == fd) {
+ if (action == CURL_POLL_REMOVE) {
+ QLIST_REMOVE(socket, next);
+ g_free(socket);
+ }
+ break;
+ }
+ }
+ if (!socket) {
+ socket = g_new0(CURLSocket, 1);
+ socket->fd = fd;
+ QLIST_INSERT_HEAD(&state->sockets, socket, next);
+ }
+ socket = NULL;
+
DPRINTF("CURL (AIO): Sock action %d on fd %d\n", action, (int)fd);
switch (action) {
case CURL_POLL_IN:
aio_set_fd_handler(s->aio_context, fd, false,
- curl_multi_read, NULL, state);
+ curl_multi_read, NULL, NULL, state);
break;
case CURL_POLL_OUT:
aio_set_fd_handler(s->aio_context, fd, false,
- NULL, curl_multi_do, state);
+ NULL, curl_multi_do, NULL, state);
break;
case CURL_POLL_INOUT:
aio_set_fd_handler(s->aio_context, fd, false,
- curl_multi_read, curl_multi_do, state);
+ curl_multi_read, curl_multi_do, NULL, state);
break;
case CURL_POLL_REMOVE:
aio_set_fd_handler(s->aio_context, fd, false,
- NULL, NULL, NULL);
+ NULL, NULL, NULL, NULL);
break;
}
DPRINTF("CURL: Just reading %zd bytes\n", realsize);
- if (!s || !s->orig_buf)
- return 0;
+ if (!s || !s->orig_buf) {
+ goto read_end;
+ }
if (s->buf_off >= s->buf_len) {
/* buffer full, read nothing */
- return 0;
+ goto read_end;
}
realsize = MIN(realsize, s->buf_len - s->buf_off);
memcpy(s->orig_buf + s->buf_off, ptr, realsize);
continue;
if ((s->buf_off >= acb->end)) {
+ size_t request_length = acb->nb_sectors * BDRV_SECTOR_SIZE;
+
qemu_iovec_from_buf(acb->qiov, 0, s->orig_buf + acb->start,
acb->end - acb->start);
+
+ if (acb->end - acb->start < request_length) {
+ size_t offset = acb->end - acb->start;
+ qemu_iovec_memset(acb->qiov, offset, 0,
+ request_length - offset);
+ }
+
acb->common.cb(acb->common.opaque, 0);
qemu_aio_unref(acb);
s->acb[i] = NULL;
}
}
- return realsize;
+read_end:
+ /* curl will error out if we do not return this value */
+ return size * nmemb;
}
static int curl_find_buf(BDRVCURLState *s, size_t start, size_t len,
{
int i;
size_t end = start + len;
+ size_t clamped_end = MIN(end, s->len);
+ size_t clamped_len = clamped_end - start;
for (i=0; i<CURL_NUM_STATES; i++) {
CURLState *state = &s->states[i];
// Does the existing buffer cover our section?
if ((start >= state->buf_start) &&
(start <= buf_end) &&
- (end >= state->buf_start) &&
- (end <= buf_end))
+ (clamped_end >= state->buf_start) &&
+ (clamped_end <= buf_end))
{
char *buf = state->orig_buf + (start - state->buf_start);
- qemu_iovec_from_buf(acb->qiov, 0, buf, len);
+ qemu_iovec_from_buf(acb->qiov, 0, buf, clamped_len);
+ if (clamped_len < len) {
+ qemu_iovec_memset(acb->qiov, clamped_len, 0, len - clamped_len);
+ }
acb->common.cb(acb->common.opaque, 0);
return FIND_RET_OK;
if (state->in_use &&
(start >= state->buf_start) &&
(start <= buf_fend) &&
- (end >= state->buf_start) &&
- (end <= buf_fend))
+ (clamped_end >= state->buf_start) &&
+ (clamped_end <= buf_fend))
{
int j;
acb->start = start - state->buf_start;
- acb->end = acb->start + len;
+ acb->end = acb->start + clamped_len;
for (j=0; j<CURL_NUM_ACB; j++) {
if (!state->acb[j]) {
return FIND_RET_NONE;
}
+/* Called with s->mutex held. */
static void curl_multi_check_completion(BDRVCURLState *s)
{
int msgs_in_queue;
continue;
}
+ qemu_mutex_unlock(&s->mutex);
acb->common.cb(acb->common.opaque, -EPROTO);
+ qemu_mutex_lock(&s->mutex);
qemu_aio_unref(acb);
state->acb[i] = NULL;
}
}
}
-static void curl_multi_do(void *arg)
+/* Called with s->mutex held. */
+static void curl_multi_do_locked(CURLState *s)
{
- CURLState *s = (CURLState *)arg;
+ CURLSocket *socket, *next_socket;
int running;
int r;
return;
}
- do {
- r = curl_multi_socket_action(s->s->multi, s->sock_fd, 0, &running);
- } while(r == CURLM_CALL_MULTI_PERFORM);
+ /* Need to use _SAFE because curl_multi_socket_action() may trigger
+ * curl_sock_cb() which might modify this list */
+ QLIST_FOREACH_SAFE(socket, &s->sockets, next, next_socket) {
+ do {
+ r = curl_multi_socket_action(s->s->multi, socket->fd, 0, &running);
+ } while (r == CURLM_CALL_MULTI_PERFORM);
+ }
+}
+static void curl_multi_do(void *arg)
+{
+ CURLState *s = (CURLState *)arg;
+
+ qemu_mutex_lock(&s->s->mutex);
+ curl_multi_do_locked(s);
+ qemu_mutex_unlock(&s->s->mutex);
}
static void curl_multi_read(void *arg)
{
CURLState *s = (CURLState *)arg;
- curl_multi_do(arg);
+ qemu_mutex_lock(&s->s->mutex);
+ curl_multi_do_locked(s);
curl_multi_check_completion(s->s);
+ qemu_mutex_unlock(&s->s->mutex);
}
static void curl_multi_timeout_do(void *arg)
return;
}
+ qemu_mutex_lock(&s->mutex);
curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
curl_multi_check_completion(s);
+ qemu_mutex_unlock(&s->mutex);
#else
abort();
#endif
#endif
}
+ QLIST_INIT(&state->sockets);
state->s = s;
return state;
{
if (s->s->multi)
curl_multi_remove_handle(s->s->multi, s->curl);
+
+ while (!QLIST_EMPTY(&s->sockets)) {
+ CURLSocket *socket = QLIST_FIRST(&s->sockets);
+
+ QLIST_REMOVE(socket, next);
+ g_free(socket);
+ }
+
s->in_use = 0;
}
curl_easy_cleanup(state->curl);
state->curl = NULL;
+ qemu_mutex_init(&s->mutex);
curl_attach_aio_context(bs, bdrv_get_aio_context(bs));
qemu_opts_del(opts);
{
CURLState *state;
int running;
+ int ret = -EINPROGRESS;
CURLAIOCB *acb = p;
- BDRVCURLState *s = acb->common.bs->opaque;
-
- qemu_bh_delete(acb->bh);
- acb->bh = NULL;
+ BlockDriverState *bs = acb->common.bs;
+ BDRVCURLState *s = bs->opaque;
- size_t start = acb->sector_num * SECTOR_SIZE;
+ size_t start = acb->sector_num * BDRV_SECTOR_SIZE;
size_t end;
+ qemu_mutex_lock(&s->mutex);
+
// In case we have the requested data already (e.g. read-ahead),
// we can just call the callback and be done.
- switch (curl_find_buf(s, start, acb->nb_sectors * SECTOR_SIZE, acb)) {
+ switch (curl_find_buf(s, start, acb->nb_sectors * BDRV_SECTOR_SIZE, acb)) {
case FIND_RET_OK:
qemu_aio_unref(acb);
// fall through
case FIND_RET_WAIT:
- return;
+ goto out;
default:
break;
}
// No cache found, so let's start a new request
state = curl_init_state(acb->common.bs, s);
if (!state) {
- acb->common.cb(acb->common.opaque, -EIO);
- qemu_aio_unref(acb);
- return;
+ ret = -EIO;
+ goto out;
}
acb->start = 0;
- acb->end = (acb->nb_sectors * SECTOR_SIZE);
+ acb->end = MIN(acb->nb_sectors * BDRV_SECTOR_SIZE, s->len - start);
state->buf_off = 0;
g_free(state->orig_buf);
state->buf_start = start;
- state->buf_len = acb->end + s->readahead_size;
- end = MIN(start + state->buf_len, s->len) - 1;
+ state->buf_len = MIN(acb->end + s->readahead_size, s->len - start);
+ end = start + state->buf_len - 1;
state->orig_buf = g_try_malloc(state->buf_len);
if (state->buf_len && state->orig_buf == NULL) {
curl_clean_state(state);
- acb->common.cb(acb->common.opaque, -ENOMEM);
- qemu_aio_unref(acb);
- return;
+ ret = -ENOMEM;
+ goto out;
}
state->acb[0] = acb;
snprintf(state->range, 127, "%zd-%zd", start, end);
- DPRINTF("CURL (AIO): Reading %d at %zd (%s)\n",
- (acb->nb_sectors * SECTOR_SIZE), start, state->range);
+ DPRINTF("CURL (AIO): Reading %llu at %zd (%s)\n",
+ (acb->nb_sectors * BDRV_SECTOR_SIZE), start, state->range);
curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range);
curl_multi_add_handle(s->multi, state->curl);
/* Tell curl it needs to kick things off */
curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
+
+out:
+ qemu_mutex_unlock(&s->mutex);
+ if (ret != -EINPROGRESS) {
+ acb->common.cb(acb->common.opaque, ret);
+ qemu_aio_unref(acb);
+ }
}
static BlockAIOCB *curl_aio_readv(BlockDriverState *bs,
acb->sector_num = sector_num;
acb->nb_sectors = nb_sectors;
- acb->bh = aio_bh_new(bdrv_get_aio_context(bs), curl_readv_bh_cb, acb);
- qemu_bh_schedule(acb->bh);
+ aio_bh_schedule_oneshot(bdrv_get_aio_context(bs), curl_readv_bh_cb, acb);
return &acb->common;
}
DPRINTF("CURL: Close\n");
curl_detach_aio_context(bs);
+ qemu_mutex_destroy(&s->mutex);
g_free(s->cookie);
g_free(s->url);
.bdrv_attach_aio_context = curl_attach_aio_context,
};
-static BlockDriver bdrv_tftp = {
- .format_name = "tftp",
- .protocol_name = "tftp",
-
- .instance_size = sizeof(BDRVCURLState),
- .bdrv_parse_filename = curl_parse_filename,
- .bdrv_file_open = curl_open,
- .bdrv_close = curl_close,
- .bdrv_getlength = curl_getlength,
-
- .bdrv_aio_readv = curl_aio_readv,
-
- .bdrv_detach_aio_context = curl_detach_aio_context,
- .bdrv_attach_aio_context = curl_attach_aio_context,
-};
-
static void curl_block_init(void)
{
bdrv_register(&bdrv_http);
bdrv_register(&bdrv_https);
bdrv_register(&bdrv_ftp);
bdrv_register(&bdrv_ftps);
- bdrv_register(&bdrv_tftp);
}
block_init(curl_block_init);