*/
#include "qemu/osdep.h"
+#include "qemu-common.h"
#include "qemu/error-report.h"
#include "trace.h"
-#include "qemu-common.h"
#include "qapi/error.h"
#include "net/net.h"
#include "net/eth.h"
#include "net/colo-compare.h"
#include "migration/colo.h"
#include "migration/migration.h"
+#include "util.h"
#define TYPE_COLO_COMPARE "colo-compare"
#define COLO_COMPARE(obj) \
char *pri_indev;
char *sec_indev;
char *outdev;
+ char *notify_dev;
CharBackend chr_pri_in;
CharBackend chr_sec_in;
CharBackend chr_out;
+ CharBackend chr_notify_dev;
SocketReadState pri_rs;
SocketReadState sec_rs;
+ SocketReadState notify_rs;
bool vnet_hdr;
/*
SECONDARY_IN,
};
-static void colo_compare_inconsistency_notify(void)
-{
- notifier_list_notify(&colo_compare_notifiers,
- migrate_get_current());
-}
static int compare_chr_send(CompareState *s,
const uint8_t *buf,
uint32_t size,
- uint32_t vnet_hdr_len);
+ uint32_t vnet_hdr_len,
+ bool notify_remote_frame);
+
+static bool packet_matches_str(const char *str,
+ const uint8_t *buf,
+ uint32_t packet_len)
+{
+ if (packet_len != strlen(str)) {
+ return false;
+ }
+
+ return !memcmp(str, buf, strlen(str));
+}
+
+static void notify_remote_frame(CompareState *s)
+{
+ char msg[] = "DO_CHECKPOINT";
+ int ret = 0;
+
+ ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
+ if (ret < 0) {
+ error_report("Notify Xen COLO-frame failed");
+ }
+}
+
+static void colo_compare_inconsistency_notify(CompareState *s)
+{
+ if (s->notify_dev) {
+ notify_remote_frame(s);
+ } else {
+ notifier_list_notify(&colo_compare_notifiers,
+ migrate_get_current());
+ }
+}
static gint seq_sorter(Packet *a, Packet *b, gpointer data)
{
- struct tcphdr *atcp, *btcp;
+ struct tcp_hdr *atcp, *btcp;
- atcp = (struct tcphdr *)(a->transport_header);
- btcp = (struct tcphdr *)(b->transport_header);
+ atcp = (struct tcp_hdr *)(a->transport_header);
+ btcp = (struct tcp_hdr *)(b->transport_header);
return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
}
static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
{
Packet *pkt = data;
- struct tcphdr *tcphd;
+ struct tcp_hdr *tcphd;
- tcphd = (struct tcphdr *)pkt->transport_header;
+ tcphd = (struct tcp_hdr *)pkt->transport_header;
pkt->tcp_seq = ntohl(tcphd->th_seq);
pkt->tcp_ack = ntohl(tcphd->th_ack);
ret = compare_chr_send(s,
pkt->data,
pkt->size,
- pkt->vnet_hdr_len);
+ pkt->vnet_hdr_len,
+ false);
if (ret < 0) {
error_report("colo send primary packet failed");
}
{
*mark = 0;
- if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
- if (colo_compare_packet_payload(ppkt, spkt,
- ppkt->header_size, spkt->header_size,
- ppkt->payload_size)) {
- *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
- return true;
- }
- }
if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
if (colo_compare_packet_payload(ppkt, spkt,
ppkt->header_size, spkt->header_size,
qemu_hexdump((char *)spkt->data, stderr,
"colo-compare spkt", spkt->size);
- colo_compare_inconsistency_notify();
+ colo_compare_inconsistency_notify(s);
}
}
}
static int colo_old_packet_check_one_conn(Connection *conn,
- void *user_data)
+ CompareState *s)
{
GList *result = NULL;
int64_t check_time = REGULAR_PACKET_CHECK_MS;
if (result) {
/* Do checkpoint will flush old packet */
- colo_compare_inconsistency_notify();
+ colo_compare_inconsistency_notify(s);
return 0;
}
* If we find one old packet, stop finding job and notify
* COLO frame do checkpoint.
*/
- g_queue_find_custom(&s->conn_list, NULL,
+ g_queue_find_custom(&s->conn_list, s,
(GCompareFunc)colo_old_packet_check_one_conn);
}
*/
trace_colo_compare_main("packet different");
g_queue_push_head(&conn->primary_list, pkt);
- colo_compare_inconsistency_notify();
+
+ colo_compare_inconsistency_notify(s);
break;
}
}
static int compare_chr_send(CompareState *s,
const uint8_t *buf,
uint32_t size,
- uint32_t vnet_hdr_len)
+ uint32_t vnet_hdr_len,
+ bool notify_remote_frame)
{
int ret = 0;
uint32_t len = htonl(size);
return 0;
}
- ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
+ if (notify_remote_frame) {
+ ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
+ (uint8_t *)&len,
+ sizeof(len));
+ } else {
+ ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
+ }
+
if (ret != sizeof(len)) {
goto err;
}
* know how to parse net packet correctly.
*/
len = htonl(vnet_hdr_len);
- ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
+
+ if (!notify_remote_frame) {
+ ret = qemu_chr_fe_write_all(&s->chr_out,
+ (uint8_t *)&len,
+ sizeof(len));
+ }
+
if (ret != sizeof(len)) {
goto err;
}
}
- ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
+ if (notify_remote_frame) {
+ ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
+ (uint8_t *)buf,
+ size);
+ } else {
+ ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
+ }
+
if (ret != size) {
goto err;
}
}
}
+static void compare_notify_chr(void *opaque, const uint8_t *buf, int size)
+{
+ CompareState *s = COLO_COMPARE(opaque);
+ int ret;
+
+ ret = net_fill_rstate(&s->notify_rs, buf, size);
+ if (ret == -1) {
+ qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL,
+ NULL, NULL, true);
+ error_report("colo-compare notify_dev error");
+ }
+}
+
/*
* Check old packet regularly so it can watch for any packets
* that the secondary hasn't produced equivalents of.
break;
}
- assert(event_unhandled_count > 0);
-
qemu_mutex_lock(&event_mtx);
+ assert(event_unhandled_count > 0);
event_unhandled_count--;
qemu_cond_broadcast(&event_complete_cond);
qemu_mutex_unlock(&event_mtx);
qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
compare_sec_chr_in, NULL, NULL,
s, s->worker_context, true);
+ if (s->notify_dev) {
+ qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read,
+ compare_notify_chr, NULL, NULL,
+ s, s->worker_context, true);
+ }
colo_compare_timer_init(s);
s->event_bh = qemu_bh_new(colo_compare_handle_event, s);
s->vnet_hdr = value;
}
+static char *compare_get_notify_dev(Object *obj, Error **errp)
+{
+ CompareState *s = COLO_COMPARE(obj);
+
+ return g_strdup(s->notify_dev);
+}
+
+static void compare_set_notify_dev(Object *obj, const char *value, Error **errp)
+{
+ CompareState *s = COLO_COMPARE(obj);
+
+ g_free(s->notify_dev);
+ s->notify_dev = g_strdup(value);
+}
+
static void compare_pri_rs_finalize(SocketReadState *pri_rs)
{
CompareState *s = container_of(pri_rs, CompareState, pri_rs);
compare_chr_send(s,
pri_rs->buf,
pri_rs->packet_len,
- pri_rs->vnet_hdr_len);
+ pri_rs->vnet_hdr_len,
+ false);
} else {
/* compare packet in the specified connection */
colo_compare_connection(conn, s);
}
}
+static void compare_notify_rs_finalize(SocketReadState *notify_rs)
+{
+ CompareState *s = container_of(notify_rs, CompareState, notify_rs);
+
+ const char msg[] = "COLO_COMPARE_GET_XEN_INIT";
+ int ret;
+
+ if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
+ notify_rs->buf,
+ notify_rs->packet_len)) {
+ ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
+ if (ret < 0) {
+ error_report("Notify Xen COLO-frame INIT failed");
+ }
+ } else if (packet_matches_str("COLO_CHECKPOINT",
+ notify_rs->buf,
+ notify_rs->packet_len)) {
+ /* colo-compare do checkpoint, flush pri packet and remove sec packet */
+ g_queue_foreach(&s->conn_list, colo_flush_packets, s);
+ } else {
+ error_report("COLO compare got unsupported instruction");
+ }
+}
/*
* Return 0 is success.
return 1;
}
+ if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
+ error_setg(errp, "chardev \"%s\" cannot switch context",
+ chr_name);
+ return 1;
+ }
+
return 0;
}
net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
+ /* Try to enable remote notify chardev, currently just for Xen COLO */
+ if (s->notify_dev) {
+ if (find_and_check_chardev(&chr, s->notify_dev, errp) ||
+ !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) {
+ return;
+ }
+
+ net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize,
+ s->vnet_hdr);
+ }
+
QTAILQ_INSERT_TAIL(&net_compares, s, next);
g_queue_init(&s->conn_list);
compare_chr_send(s,
pkt->data,
pkt->size,
- pkt->vnet_hdr_len);
+ pkt->vnet_hdr_len,
+ false);
packet_destroy(pkt, NULL);
}
while (!g_queue_is_empty(&conn->secondary_list)) {
(Object **)&s->iothread,
object_property_allow_set_link,
OBJ_PROP_LINK_STRONG, NULL);
+ /* This parameter just for Xen COLO */
+ object_property_add_str(obj, "notify_dev",
+ compare_get_notify_dev, compare_set_notify_dev,
+ NULL);
s->vnet_hdr = false;
object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
qemu_chr_fe_deinit(&s->chr_pri_in, false);
qemu_chr_fe_deinit(&s->chr_sec_in, false);
qemu_chr_fe_deinit(&s->chr_out, false);
+ if (s->notify_dev) {
+ qemu_chr_fe_deinit(&s->chr_notify_dev, false);
+ }
+
if (s->iothread) {
colo_compare_timer_del(s);
}
g_free(s->pri_indev);
g_free(s->sec_indev);
g_free(s->outdev);
+ g_free(s->notify_dev);
}
static const TypeInfo colo_compare_info = {