]> Git Repo - qemu.git/blob - buffered_file.c
migration: just lock migrate_fd_put_ready
[qemu.git] / buffered_file.c
1 /*
2  * QEMU buffered QEMUFile
3  *
4  * Copyright IBM, Corp. 2008
5  *
6  * Authors:
7  *  Anthony Liguori   <[email protected]>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2.  See
10  * the COPYING file in the top-level directory.
11  *
12  * Contributions after 2012-01-13 are licensed under the terms of the
13  * GNU GPL, version 2 or (at your option) any later version.
14  */
15
16 #include "qemu-common.h"
17 #include "hw/hw.h"
18 #include "qemu/timer.h"
19 #include "buffered_file.h"
20 #include "qemu/thread.h"
21
22 //#define DEBUG_BUFFERED_FILE
23
24 typedef struct QEMUFileBuffered
25 {
26     MigrationState *migration_state;
27     QEMUFile *file;
28     size_t bytes_xfer;
29     size_t xfer_limit;
30     uint8_t *buffer;
31     size_t buffer_size;
32     size_t buffer_capacity;
33     QemuThread thread;
34 } QEMUFileBuffered;
35
36 #ifdef DEBUG_BUFFERED_FILE
37 #define DPRINTF(fmt, ...) \
38     do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0)
39 #else
40 #define DPRINTF(fmt, ...) \
41     do { } while (0)
42 #endif
43
44 static void buffered_append(QEMUFileBuffered *s,
45                             const uint8_t *buf, size_t size)
46 {
47     if (size > (s->buffer_capacity - s->buffer_size)) {
48         DPRINTF("increasing buffer capacity from %zu by %zu\n",
49                 s->buffer_capacity, size + 1024);
50
51         s->buffer_capacity += size + 1024;
52
53         s->buffer = g_realloc(s->buffer, s->buffer_capacity);
54     }
55
56     memcpy(s->buffer + s->buffer_size, buf, size);
57     s->buffer_size += size;
58 }
59
60 static ssize_t buffered_flush(QEMUFileBuffered *s)
61 {
62     size_t offset = 0;
63     ssize_t ret = 0;
64
65     DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
66
67     while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) {
68         size_t to_send = MIN(s->buffer_size - offset, s->xfer_limit - s->bytes_xfer);
69         ret = migrate_fd_put_buffer(s->migration_state, s->buffer + offset,
70                                     to_send);
71         if (ret <= 0) {
72             DPRINTF("error flushing data, %zd\n", ret);
73             break;
74         } else {
75             DPRINTF("flushed %zd byte(s)\n", ret);
76             offset += ret;
77             s->bytes_xfer += ret;
78         }
79     }
80
81     DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
82     memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
83     s->buffer_size -= offset;
84
85     if (ret < 0) {
86         return ret;
87     }
88     return offset;
89 }
90
91 static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
92 {
93     QEMUFileBuffered *s = opaque;
94     ssize_t error;
95
96     DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
97
98     error = qemu_file_get_error(s->file);
99     if (error) {
100         DPRINTF("flush when error, bailing: %s\n", strerror(-error));
101         return error;
102     }
103
104     if (size > 0) {
105         DPRINTF("buffering %d bytes\n", size - offset);
106         buffered_append(s, buf, size);
107     }
108
109     error = buffered_flush(s);
110     if (error < 0) {
111         DPRINTF("buffered flush error. bailing: %s\n", strerror(-error));
112         return error;
113     }
114
115     if (pos == 0 && size == 0) {
116         DPRINTF("file is ready\n");
117         if (s->bytes_xfer < s->xfer_limit) {
118             DPRINTF("notifying client\n");
119             migrate_fd_put_ready(s->migration_state);
120         }
121     }
122
123     return size;
124 }
125
126 static int buffered_close(void *opaque)
127 {
128     QEMUFileBuffered *s = opaque;
129     ssize_t ret = 0;
130     int ret2;
131
132     DPRINTF("closing\n");
133
134     s->xfer_limit = INT_MAX;
135     while (!qemu_file_get_error(s->file) && s->buffer_size) {
136         ret = buffered_flush(s);
137         if (ret < 0) {
138             break;
139         }
140     }
141
142     ret2 = migrate_fd_close(s->migration_state);
143     if (ret >= 0) {
144         ret = ret2;
145     }
146     ret = migrate_fd_close(s->migration_state);
147     s->migration_state->complete = true;
148     return ret;
149 }
150
151 /*
152  * The meaning of the return values is:
153  *   0: We can continue sending
154  *   1: Time to stop
155  *   negative: There has been an error
156  */
157 static int buffered_get_fd(void *opaque)
158 {
159     QEMUFileBuffered *s = opaque;
160
161     return qemu_get_fd(s->file);
162 }
163
164 static int buffered_rate_limit(void *opaque)
165 {
166     QEMUFileBuffered *s = opaque;
167     int ret;
168
169     ret = qemu_file_get_error(s->file);
170     if (ret) {
171         return ret;
172     }
173
174     if (s->bytes_xfer > s->xfer_limit)
175         return 1;
176
177     return 0;
178 }
179
180 static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
181 {
182     QEMUFileBuffered *s = opaque;
183     if (qemu_file_get_error(s->file)) {
184         goto out;
185     }
186     if (new_rate > SIZE_MAX) {
187         new_rate = SIZE_MAX;
188     }
189
190     s->xfer_limit = new_rate / 10;
191     
192 out:
193     return s->xfer_limit;
194 }
195
196 static int64_t buffered_get_rate_limit(void *opaque)
197 {
198     QEMUFileBuffered *s = opaque;
199   
200     return s->xfer_limit;
201 }
202
203 /* 10ms  xfer_limit is the limit that we should write each 10ms */
204 #define BUFFER_DELAY 100
205
206 static void *buffered_file_thread(void *opaque)
207 {
208     QEMUFileBuffered *s = opaque;
209     int64_t expire_time = qemu_get_clock_ms(rt_clock) + BUFFER_DELAY;
210
211     while (true) {
212         int64_t current_time = qemu_get_clock_ms(rt_clock);
213
214         if (s->migration_state->complete) {
215             break;
216         }
217         if (current_time >= expire_time) {
218             s->bytes_xfer = 0;
219             expire_time = current_time + BUFFER_DELAY;
220         }
221         if (s->bytes_xfer >= s->xfer_limit) {
222             /* usleep expects microseconds */
223             g_usleep((expire_time - current_time)*1000);
224         }
225         buffered_put_buffer(s, NULL, 0, 0);
226     }
227     g_free(s->buffer);
228     g_free(s);
229     return NULL;
230 }
231
232 static const QEMUFileOps buffered_file_ops = {
233     .get_fd =         buffered_get_fd,
234     .put_buffer =     buffered_put_buffer,
235     .close =          buffered_close,
236     .rate_limit =     buffered_rate_limit,
237     .get_rate_limit = buffered_get_rate_limit,
238     .set_rate_limit = buffered_set_rate_limit,
239 };
240
241 void qemu_fopen_ops_buffered(MigrationState *migration_state)
242 {
243     QEMUFileBuffered *s;
244
245     s = g_malloc0(sizeof(*s));
246
247     s->migration_state = migration_state;
248     s->xfer_limit = migration_state->bandwidth_limit / 10;
249     s->migration_state->complete = false;
250
251     s->file = qemu_fopen_ops(s, &buffered_file_ops);
252
253     migration_state->file = s->file;
254
255     qemu_thread_create(&s->thread, buffered_file_thread, s,
256                        QEMU_THREAD_DETACHED);
257 }
This page took 0.038617 seconds and 4 git commands to generate.