]> Git Repo - qemu.git/blobdiff - net/colo-compare.c
Merge remote-tracking branch 'remotes/stefanha/tags/tracing-pull-request' into staging
[qemu.git] / net / colo-compare.c
index 0ebdec936c42edd62a573f69bc385850ac562330..8622b0b35a92862e73a3e73d2f6bd18e650b9379 100644 (file)
@@ -37,6 +37,9 @@
 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
 #define MAX_QUEUE_SIZE 1024
 
+#define COLO_COMPARE_FREE_PRIMARY     0x01
+#define COLO_COMPARE_FREE_SECONDARY   0x02
+
 /* TODO: Should be configurable */
 #define REGULAR_PACKET_CHECK_MS 3000
 
@@ -111,14 +114,32 @@ static gint seq_sorter(Packet *a, Packet *b, gpointer data)
     return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
 }
 
+static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
+{
+    Packet *pkt = data;
+    struct tcphdr *tcphd;
+
+    tcphd = (struct tcphdr *)pkt->transport_header;
+
+    pkt->tcp_seq = ntohl(tcphd->th_seq);
+    pkt->tcp_ack = ntohl(tcphd->th_ack);
+    *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
+    pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
+                       + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
+    pkt->payload_size = pkt->size - pkt->header_size;
+    pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
+    pkt->flags = tcphd->th_flags;
+}
+
 /*
  * Return 1 on success, if return 0 means the
  * packet will be dropped
  */
-static int colo_insert_packet(GQueue *queue, Packet *pkt)
+static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
 {
     if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) {
         if (pkt->ip->ip_p == IPPROTO_TCP) {
+            fill_pkt_tcp_info(pkt, max_ack);
             g_queue_insert_sorted(queue,
                                   pkt,
                                   (GCompareDataFunc)seq_sorter,
@@ -168,12 +189,12 @@ static int packet_enqueue(CompareState *s, int mode, Connection **con)
     }
 
     if (mode == PRIMARY_IN) {
-        if (!colo_insert_packet(&conn->primary_list, pkt)) {
+        if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) {
             error_report("colo compare primary queue size too big,"
                          "drop packet");
         }
     } else {
-        if (!colo_insert_packet(&conn->secondary_list, pkt)) {
+        if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) {
             error_report("colo compare secondary queue size too big,"
                          "drop packet");
         }
@@ -183,6 +204,25 @@ static int packet_enqueue(CompareState *s, int mode, Connection **con)
     return 0;
 }
 
+static inline bool after(uint32_t seq1, uint32_t seq2)
+{
+        return (int32_t)(seq1 - seq2) > 0;
+}
+
+static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
+{
+    int ret;
+    ret = compare_chr_send(s,
+                           pkt->data,
+                           pkt->size,
+                           pkt->vnet_hdr_len);
+    if (ret < 0) {
+        error_report("colo send primary packet failed");
+    }
+    trace_colo_compare_main("packet same and release packet");
+    packet_destroy(pkt, NULL);
+}
+
 /*
  * The IP packets sent by primary and secondary
  * will be compared in here
@@ -190,10 +230,12 @@ static int packet_enqueue(CompareState *s, int mode, Connection **con)
  * return:    0  means packet same
  *            > 0 || < 0 means packet different
  */
-static int colo_packet_compare_common(Packet *ppkt,
-                                      Packet *spkt,
-                                      int poffset,
-                                      int soffset)
+static int colo_compare_packet_payload(Packet *ppkt,
+                                       Packet *spkt,
+                                       uint16_t poffset,
+                                       uint16_t soffset,
+                                       uint16_t len)
+
 {
     if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
@@ -208,131 +250,187 @@ static int colo_packet_compare_common(Packet *ppkt,
                                    sec_ip_src, sec_ip_dst);
     }
 
-    poffset = ppkt->vnet_hdr_len + poffset;
-    soffset = ppkt->vnet_hdr_len + soffset;
-
-    if (ppkt->size - poffset == spkt->size - soffset) {
-        return memcmp(ppkt->data + poffset,
-                      spkt->data + soffset,
-                      spkt->size - soffset);
-    } else {
-        trace_colo_compare_main("Net packet size are not the same");
-        return -1;
-    }
+    return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
 }
 
 /*
- * Called from the compare thread on the primary
- * for compare tcp packet
- * compare_tcp copied from Dr. David Alan Gilbert's branch
- */
-static int colo_packet_compare_tcp(Packet *spkt, Packet *ppkt)
+ * return true means that the payload is consist and
+ * need to make the next comparison, false means do
+ * the checkpoint
+*/
+static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
+                              int8_t *mark, uint32_t max_ack)
 {
-    struct tcphdr *ptcp, *stcp;
-    int res;
+    *mark = 0;
+
+    if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
+        if (colo_compare_packet_payload(ppkt, spkt,
+                                        ppkt->header_size, spkt->header_size,
+                                        ppkt->payload_size)) {
+            *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
+            return true;
+        }
+    }
+    if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
+        if (colo_compare_packet_payload(ppkt, spkt,
+                                        ppkt->header_size, spkt->header_size,
+                                        ppkt->payload_size)) {
+            *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
+            return true;
+        }
+    }
+
+    /* one part of secondary packet payload still need to be compared */
+    if (!after(ppkt->seq_end, spkt->seq_end)) {
+        if (colo_compare_packet_payload(ppkt, spkt,
+                                        ppkt->header_size + ppkt->offset,
+                                        spkt->header_size + spkt->offset,
+                                        ppkt->payload_size - ppkt->offset)) {
+            if (!after(ppkt->tcp_ack, max_ack)) {
+                *mark = COLO_COMPARE_FREE_PRIMARY;
+                spkt->offset += ppkt->payload_size - ppkt->offset;
+                return true;
+            } else {
+                /* secondary guest hasn't ack the data, don't send
+                 * out this packet
+                 */
+                return false;
+            }
+        }
+    } else {
+        /* primary packet is longer than secondary packet, compare
+         * the same part and mark the primary packet offset
+         */
+        if (colo_compare_packet_payload(ppkt, spkt,
+                                        ppkt->header_size + ppkt->offset,
+                                        spkt->header_size + spkt->offset,
+                                        spkt->payload_size - spkt->offset)) {
+            *mark = COLO_COMPARE_FREE_SECONDARY;
+            ppkt->offset += spkt->payload_size - spkt->offset;
+            return true;
+        }
+    }
 
-    trace_colo_compare_main("compare tcp");
+    return false;
+}
 
-    ptcp = (struct tcphdr *)ppkt->transport_header;
-    stcp = (struct tcphdr *)spkt->transport_header;
+static void colo_compare_tcp(CompareState *s, Connection *conn)
+{
+    Packet *ppkt = NULL, *spkt = NULL;
+    int8_t mark;
 
     /*
-     * The 'identification' field in the IP header is *very* random
-     * it almost never matches.  Fudge this by ignoring differences in
-     * unfragmented packets; they'll normally sort themselves out if different
-     * anyway, and it should recover at the TCP level.
-     * An alternative would be to get both the primary and secondary to rewrite
-     * somehow; but that would need some sync traffic to sync the state
-     */
-    if (ntohs(ppkt->ip->ip_off) & IP_DF) {
-        spkt->ip->ip_id = ppkt->ip->ip_id;
-        /* and the sum will be different if the IDs were different */
-        spkt->ip->ip_sum = ppkt->ip->ip_sum;
+     * If ppkt and spkt have the same payload, but ppkt's ACK
+     * is greater than spkt's ACK, in this case we can not
+     * send the ppkt because it will cause the secondary guest
+     * to miss sending some data in the next. Therefore, we
+     * record the maximum ACK in the current queue at both
+     * primary side and secondary side. Only when the ack is
+     * less than the smaller of the two maximum ack, then we
+     * can ensure that the packet's payload is acknowledged by
+     * primary and secondary.
+    */
+    uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
+
+pri:
+    if (g_queue_is_empty(&conn->primary_list)) {
+        return;
+    }
+    ppkt = g_queue_pop_head(&conn->primary_list);
+sec:
+    if (g_queue_is_empty(&conn->secondary_list)) {
+        g_queue_push_head(&conn->primary_list, ppkt);
+        return;
     }
+    spkt = g_queue_pop_head(&conn->secondary_list);
 
-    /*
-     * Check tcp header length for tcp option field.
-     * th_off > 5 means this tcp packet have options field.
-     * The tcp options maybe always different.
-     * for example:
-     * From RFC 7323.
-     * TCP Timestamps option (TSopt):
-     * Kind: 8
-     *
-     * Length: 10 bytes
-     *
-     *    +-------+-------+---------------------+---------------------+
-     *    |Kind=8 |  10   |   TS Value (TSval)  |TS Echo Reply (TSecr)|
-     *    +-------+-------+---------------------+---------------------+
-     *       1       1              4                     4
-     *
-     * In this case the primary guest's timestamp always different with
-     * the secondary guest's timestamp. COLO just focus on payload,
-     * so we just need skip this field.
-     */
-    if (ptcp->th_off > 5) {
-        ptrdiff_t ptcp_offset, stcp_offset;
+    if (ppkt->tcp_seq == ppkt->seq_end) {
+        colo_release_primary_pkt(s, ppkt);
+        ppkt = NULL;
+    }
 
-        ptcp_offset = ppkt->transport_header - (uint8_t *)ppkt->data
-                      + (ptcp->th_off * 4) - ppkt->vnet_hdr_len;
-        stcp_offset = spkt->transport_header - (uint8_t *)spkt->data
-                      + (stcp->th_off * 4) - spkt->vnet_hdr_len;
+    if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
+        trace_colo_compare_main("pri: this packet has compared");
+        colo_release_primary_pkt(s, ppkt);
+        ppkt = NULL;
+    }
 
-        /*
-         * When network is busy, some tcp options(like sack) will unpredictable
-         * occur in primary side or secondary side. it will make packet size
-         * not same, but the two packet's payload is identical. colo just
-         * care about packet payload, so we skip the option field.
-         */
-        res = colo_packet_compare_common(ppkt, spkt, ptcp_offset, stcp_offset);
-    } else if (ptcp->th_sum == stcp->th_sum) {
-        res = colo_packet_compare_common(ppkt, spkt, ETH_HLEN, ETH_HLEN);
+    if (spkt->tcp_seq == spkt->seq_end) {
+        packet_destroy(spkt, NULL);
+        if (!ppkt) {
+            goto pri;
+        } else {
+            goto sec;
+        }
     } else {
-        res = -1;
+        if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
+            trace_colo_compare_main("sec: this packet has compared");
+            packet_destroy(spkt, NULL);
+            if (!ppkt) {
+                goto pri;
+            } else {
+                goto sec;
+            }
+        }
+        if (!ppkt) {
+            g_queue_push_head(&conn->secondary_list, spkt);
+            goto pri;
+        }
     }
 
-    if (res != 0 &&
-        trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
-        char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
-
-        strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
-        strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
-        strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
-        strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
-
-        trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
-                                   pri_ip_dst, spkt->size,
-                                   sec_ip_src, sec_ip_dst);
-
-        trace_colo_compare_tcp_info("pri tcp packet",
-                                    ntohl(ptcp->th_seq),
-                                    ntohl(ptcp->th_ack),
-                                    res, ptcp->th_flags,
-                                    ppkt->size);
-
-        trace_colo_compare_tcp_info("sec tcp packet",
-                                    ntohl(stcp->th_seq),
-                                    ntohl(stcp->th_ack),
-                                    res, stcp->th_flags,
-                                    spkt->size);
+    if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
+        trace_colo_compare_tcp_info("pri",
+                                    ppkt->tcp_seq, ppkt->tcp_ack,
+                                    ppkt->header_size, ppkt->payload_size,
+                                    ppkt->offset, ppkt->flags);
+
+        trace_colo_compare_tcp_info("sec",
+                                    spkt->tcp_seq, spkt->tcp_ack,
+                                    spkt->header_size, spkt->payload_size,
+                                    spkt->offset, spkt->flags);
+
+        if (mark == COLO_COMPARE_FREE_PRIMARY) {
+            conn->compare_seq = ppkt->seq_end;
+            colo_release_primary_pkt(s, ppkt);
+            g_queue_push_head(&conn->secondary_list, spkt);
+            goto pri;
+        }
+        if (mark == COLO_COMPARE_FREE_SECONDARY) {
+            conn->compare_seq = spkt->seq_end;
+            packet_destroy(spkt, NULL);
+            goto sec;
+        }
+        if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
+            conn->compare_seq = ppkt->seq_end;
+            colo_release_primary_pkt(s, ppkt);
+            packet_destroy(spkt, NULL);
+            goto pri;
+        }
+    } else {
+        g_queue_push_head(&conn->primary_list, ppkt);
+        g_queue_push_head(&conn->secondary_list, spkt);
 
         qemu_hexdump((char *)ppkt->data, stderr,
                      "colo-compare ppkt", ppkt->size);
         qemu_hexdump((char *)spkt->data, stderr,
                      "colo-compare spkt", spkt->size);
-    }
 
-    return res;
+        /*
+         * colo_compare_inconsistent_notify();
+         * TODO: notice to checkpoint();
+         */
+    }
 }
 
+
 /*
  * Called from the compare thread on the primary
  * for compare udp packet
  */
 static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
 {
-    int ret;
-    int network_header_length = ppkt->ip->ip_hl * 4;
+    uint16_t network_header_length = ppkt->ip->ip_hl << 2;
+    uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
 
     trace_colo_compare_main("compare udp");
 
@@ -346,11 +444,12 @@ static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
      * other field like TOS,TTL,IP Checksum. we only need to compare
      * the ip payload here.
      */
-    ret = colo_packet_compare_common(ppkt, spkt,
-                                     network_header_length + ETH_HLEN,
-                                     network_header_length + ETH_HLEN);
-
-    if (ret) {
+    if (ppkt->size != spkt->size) {
+        trace_colo_compare_main("UDP: payload size of packets are different");
+        return -1;
+    }
+    if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
+                                    ppkt->size - offset)) {
         trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
         trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
         if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
@@ -359,9 +458,10 @@ static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
             qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
                          spkt->size);
         }
+        return -1;
+    } else {
+        return 0;
     }
-
-    return ret;
 }
 
 /*
@@ -370,7 +470,8 @@ static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
  */
 static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
 {
-    int network_header_length = ppkt->ip->ip_hl * 4;
+    uint16_t network_header_length = ppkt->ip->ip_hl << 2;
+    uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
 
     trace_colo_compare_main("compare icmp");
 
@@ -384,9 +485,12 @@ static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
      * other field like TOS,TTL,IP Checksum. we only need to compare
      * the ip payload here.
      */
-    if (colo_packet_compare_common(ppkt, spkt,
-                                   network_header_length + ETH_HLEN,
-                                   network_header_length + ETH_HLEN)) {
+    if (ppkt->size != spkt->size) {
+        trace_colo_compare_main("ICMP: payload size of packets are different");
+        return -1;
+    }
+    if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
+                                    ppkt->size - offset)) {
         trace_colo_compare_icmp_miscompare("primary pkt size",
                                            ppkt->size);
         trace_colo_compare_icmp_miscompare("Secondary pkt size",
@@ -409,6 +513,8 @@ static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
  */
 static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
 {
+    uint16_t offset = ppkt->vnet_hdr_len;
+
     trace_colo_compare_main("compare other");
     if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
@@ -423,7 +529,12 @@ static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
                                    sec_ip_src, sec_ip_dst);
     }
 
-    return colo_packet_compare_common(ppkt, spkt, 0, 0);
+    if (ppkt->size != spkt->size) {
+        trace_colo_compare_main("Other: payload size of packets are different");
+        return -1;
+    }
+    return colo_compare_packet_payload(ppkt, spkt, offset, offset,
+                                       ppkt->size - offset);
 }
 
 static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
@@ -477,53 +588,22 @@ static void colo_old_packet_check(void *opaque)
                         (GCompareFunc)colo_old_packet_check_one_conn);
 }
 
-/*
- * Called from the compare thread on the primary
- * for compare packet with secondary list of the
- * specified connection when a new packet was
- * queued to it.
- */
-static void colo_compare_connection(void *opaque, void *user_data)
+static void colo_compare_packet(CompareState *s, Connection *conn,
+                                int (*HandlePacket)(Packet *spkt,
+                                Packet *ppkt))
 {
-    CompareState *s = user_data;
-    Connection *conn = opaque;
     Packet *pkt = NULL;
     GList *result = NULL;
-    int ret;
 
     while (!g_queue_is_empty(&conn->primary_list) &&
            !g_queue_is_empty(&conn->secondary_list)) {
         pkt = g_queue_pop_head(&conn->primary_list);
-        switch (conn->ip_proto) {
-        case IPPROTO_TCP:
-            result = g_queue_find_custom(&conn->secondary_list,
-                     pkt, (GCompareFunc)colo_packet_compare_tcp);
-            break;
-        case IPPROTO_UDP:
-            result = g_queue_find_custom(&conn->secondary_list,
-                     pkt, (GCompareFunc)colo_packet_compare_udp);
-            break;
-        case IPPROTO_ICMP:
-            result = g_queue_find_custom(&conn->secondary_list,
-                     pkt, (GCompareFunc)colo_packet_compare_icmp);
-            break;
-        default:
-            result = g_queue_find_custom(&conn->secondary_list,
-                     pkt, (GCompareFunc)colo_packet_compare_other);
-            break;
-        }
+        result = g_queue_find_custom(&conn->secondary_list,
+                 pkt, (GCompareFunc)HandlePacket);
 
         if (result) {
-            ret = compare_chr_send(s,
-                                   pkt->data,
-                                   pkt->size,
-                                   pkt->vnet_hdr_len);
-            if (ret < 0) {
-                error_report("colo_send_primary_packet failed");
-            }
-            trace_colo_compare_main("packet same and release packet");
+            colo_release_primary_pkt(s, pkt);
             g_queue_remove(&conn->secondary_list, result->data);
-            packet_destroy(pkt, NULL);
         } else {
             /*
              * If one packet arrive late, the secondary_list or
@@ -538,6 +618,33 @@ static void colo_compare_connection(void *opaque, void *user_data)
     }
 }
 
+/*
+ * Called from the compare thread on the primary
+ * for compare packet with secondary list of the
+ * specified connection when a new packet was
+ * queued to it.
+ */
+static void colo_compare_connection(void *opaque, void *user_data)
+{
+    CompareState *s = user_data;
+    Connection *conn = opaque;
+
+    switch (conn->ip_proto) {
+    case IPPROTO_TCP:
+        colo_compare_tcp(s, conn);
+        break;
+    case IPPROTO_UDP:
+        colo_compare_packet(s, conn, colo_packet_compare_udp);
+        break;
+    case IPPROTO_ICMP:
+        colo_compare_packet(s, conn, colo_packet_compare_icmp);
+        break;
+    default:
+        colo_compare_packet(s, conn, colo_packet_compare_other);
+        break;
+    }
+}
+
 static int compare_chr_send(CompareState *s,
                             const uint8_t *buf,
                             uint32_t size,
This page took 0.038435 seconds and 4 git commands to generate.