]> Git Repo - linux.git/blob - drivers/md/dm-vdo/indexer/index.c
Merge tag 'kbuild-v6.9' of git://git.kernel.org/pub/scm/linux/kernel/git/masahiroy...
[linux.git] / drivers / md / dm-vdo / indexer / index.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5
6
7 #include "index.h"
8
9 #include "logger.h"
10 #include "memory-alloc.h"
11
12 #include "funnel-requestqueue.h"
13 #include "hash-utils.h"
14 #include "sparse-cache.h"
15
16 static const u64 NO_LAST_SAVE = U64_MAX;
17
18 /*
19  * When searching for deduplication records, the index first searches the volume index, and then
20  * searches the chapter index for the relevant chapter. If the chapter has been fully committed to
21  * storage, the chapter pages are loaded into the page cache. If the chapter has not yet been
22  * committed (either the open chapter or a recently closed one), the index searches the in-memory
23  * representation of the chapter. Finally, if the volume index does not find a record and the index
24  * is sparse, the index will search the sparse cache.
25  *
26  * The index send two kinds of messages to coordinate between zones: chapter close messages for the
27  * chapter writer, and sparse cache barrier messages for the sparse cache.
28  *
29  * The chapter writer is responsible for committing chapters of records to storage. Since zones can
30  * get different numbers of records, some zones may fall behind others. Each time a zone fills up
31  * its available space in a chapter, it informs the chapter writer that the chapter is complete,
32  * and also informs all other zones that it has closed the chapter. Each other zone will then close
33  * the chapter immediately, regardless of how full it is, in order to minimize skew between zones.
34  * Once every zone has closed the chapter, the chapter writer will commit that chapter to storage.
35  *
36  * The last zone to close the chapter also removes the oldest chapter from the volume index.
37  * Although that chapter is invalid for zones that have moved on, the existence of the open chapter
38  * means that those zones will never ask the volume index about it. No zone is allowed to get more
39  * than one chapter ahead of any other. If a zone is so far ahead that it tries to close another
40  * chapter before the previous one has been closed by all zones, it is forced to wait.
41  *
42  * The sparse cache relies on having the same set of chapter indexes available to all zones. When a
43  * request wants to add a chapter to the sparse cache, it sends a barrier message to each zone
44  * during the triage stage that acts as a rendezvous. Once every zone has reached the barrier and
45  * paused its operations, the cache membership is changed and each zone is then informed that it
46  * can proceed. More details can be found in the sparse cache documentation.
47  *
48  * If a sparse cache has only one zone, it will not create a triage queue, but it still needs the
49  * barrier message to change the sparse cache membership, so the index simulates the message by
50  * invoking the handler directly.
51  */
52
53 struct chapter_writer {
54         /* The index to which we belong */
55         struct uds_index *index;
56         /* The thread to do the writing */
57         struct thread *thread;
58         /* The lock protecting the following fields */
59         struct mutex mutex;
60         /* The condition signalled on state changes */
61         struct cond_var cond;
62         /* Set to true to stop the thread */
63         bool stop;
64         /* The result from the most recent write */
65         int result;
66         /* The number of bytes allocated by the chapter writer */
67         size_t memory_size;
68         /* The number of zones which have submitted a chapter for writing */
69         unsigned int zones_to_write;
70         /* Open chapter index used by uds_close_open_chapter() */
71         struct open_chapter_index *open_chapter_index;
72         /* Collated records used by uds_close_open_chapter() */
73         struct uds_volume_record *collated_records;
74         /* The chapters to write (one per zone) */
75         struct open_chapter_zone *chapters[];
76 };
77
78 static bool is_zone_chapter_sparse(const struct index_zone *zone, u64 virtual_chapter)
79 {
80         return uds_is_chapter_sparse(zone->index->volume->geometry,
81                                      zone->oldest_virtual_chapter,
82                                      zone->newest_virtual_chapter, virtual_chapter);
83 }
84
85 static int launch_zone_message(struct uds_zone_message message, unsigned int zone,
86                                struct uds_index *index)
87 {
88         int result;
89         struct uds_request *request;
90
91         result = vdo_allocate(1, struct uds_request, __func__, &request);
92         if (result != VDO_SUCCESS)
93                 return result;
94
95         request->index = index;
96         request->unbatched = true;
97         request->zone_number = zone;
98         request->zone_message = message;
99
100         uds_enqueue_request(request, STAGE_MESSAGE);
101         return UDS_SUCCESS;
102 }
103
104 static void enqueue_barrier_messages(struct uds_index *index, u64 virtual_chapter)
105 {
106         struct uds_zone_message message = {
107                 .type = UDS_MESSAGE_SPARSE_CACHE_BARRIER,
108                 .virtual_chapter = virtual_chapter,
109         };
110         unsigned int zone;
111
112         for (zone = 0; zone < index->zone_count; zone++) {
113                 int result = launch_zone_message(message, zone, index);
114
115                 VDO_ASSERT_LOG_ONLY((result == UDS_SUCCESS), "barrier message allocation");
116         }
117 }
118
119 /*
120  * Determine whether this request should trigger a sparse cache barrier message to change the
121  * membership of the sparse cache. If a change in membership is desired, the function returns the
122  * chapter number to add.
123  */
124 static u64 triage_index_request(struct uds_index *index, struct uds_request *request)
125 {
126         u64 virtual_chapter;
127         struct index_zone *zone;
128
129         virtual_chapter = uds_lookup_volume_index_name(index->volume_index,
130                                                        &request->record_name);
131         if (virtual_chapter == NO_CHAPTER)
132                 return NO_CHAPTER;
133
134         zone = index->zones[request->zone_number];
135         if (!is_zone_chapter_sparse(zone, virtual_chapter))
136                 return NO_CHAPTER;
137
138         /*
139          * FIXME: Optimize for a common case by remembering the chapter from the most recent
140          * barrier message and skipping this chapter if is it the same.
141          */
142
143         return virtual_chapter;
144 }
145
146 /*
147  * Simulate a message to change the sparse cache membership for a single-zone sparse index. This
148  * allows us to forgo the complicated locking required by a multi-zone sparse index. Any other kind
149  * of index does nothing here.
150  */
151 static int simulate_index_zone_barrier_message(struct index_zone *zone,
152                                                struct uds_request *request)
153 {
154         u64 sparse_virtual_chapter;
155
156         if ((zone->index->zone_count > 1) ||
157             !uds_is_sparse_index_geometry(zone->index->volume->geometry))
158                 return UDS_SUCCESS;
159
160         sparse_virtual_chapter = triage_index_request(zone->index, request);
161         if (sparse_virtual_chapter == NO_CHAPTER)
162                 return UDS_SUCCESS;
163
164         return uds_update_sparse_cache(zone, sparse_virtual_chapter);
165 }
166
167 /* This is the request processing function for the triage queue. */
168 static void triage_request(struct uds_request *request)
169 {
170         struct uds_index *index = request->index;
171         u64 sparse_virtual_chapter = triage_index_request(index, request);
172
173         if (sparse_virtual_chapter != NO_CHAPTER)
174                 enqueue_barrier_messages(index, sparse_virtual_chapter);
175
176         uds_enqueue_request(request, STAGE_INDEX);
177 }
178
179 static int finish_previous_chapter(struct uds_index *index, u64 current_chapter_number)
180 {
181         int result;
182         struct chapter_writer *writer = index->chapter_writer;
183
184         mutex_lock(&writer->mutex);
185         while (index->newest_virtual_chapter < current_chapter_number)
186                 uds_wait_cond(&writer->cond, &writer->mutex);
187         result = writer->result;
188         mutex_unlock(&writer->mutex);
189
190         if (result != UDS_SUCCESS)
191                 return vdo_log_error_strerror(result,
192                                               "Writing of previous open chapter failed");
193
194         return UDS_SUCCESS;
195 }
196
197 static int swap_open_chapter(struct index_zone *zone)
198 {
199         int result;
200         struct open_chapter_zone *temporary_chapter;
201
202         result = finish_previous_chapter(zone->index, zone->newest_virtual_chapter);
203         if (result != UDS_SUCCESS)
204                 return result;
205
206         temporary_chapter = zone->open_chapter;
207         zone->open_chapter = zone->writing_chapter;
208         zone->writing_chapter = temporary_chapter;
209         return UDS_SUCCESS;
210 }
211
212 /*
213  * Inform the chapter writer that this zone is done with this chapter. The chapter won't start
214  * writing until all zones have closed it.
215  */
216 static unsigned int start_closing_chapter(struct uds_index *index,
217                                           unsigned int zone_number,
218                                           struct open_chapter_zone *chapter)
219 {
220         unsigned int finished_zones;
221         struct chapter_writer *writer = index->chapter_writer;
222
223         mutex_lock(&writer->mutex);
224         finished_zones = ++writer->zones_to_write;
225         writer->chapters[zone_number] = chapter;
226         uds_broadcast_cond(&writer->cond);
227         mutex_unlock(&writer->mutex);
228
229         return finished_zones;
230 }
231
232 static int announce_chapter_closed(struct index_zone *zone, u64 closed_chapter)
233 {
234         int result;
235         unsigned int i;
236         struct uds_zone_message zone_message = {
237                 .type = UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED,
238                 .virtual_chapter = closed_chapter,
239         };
240
241         for (i = 0; i < zone->index->zone_count; i++) {
242                 if (zone->id == i)
243                         continue;
244
245                 result = launch_zone_message(zone_message, i, zone->index);
246                 if (result != UDS_SUCCESS)
247                         return result;
248         }
249
250         return UDS_SUCCESS;
251 }
252
253 static int open_next_chapter(struct index_zone *zone)
254 {
255         int result;
256         u64 closed_chapter;
257         u64 expiring;
258         unsigned int finished_zones;
259         u32 expire_chapters;
260
261         vdo_log_debug("closing chapter %llu of zone %u after %u entries (%u short)",
262                       (unsigned long long) zone->newest_virtual_chapter, zone->id,
263                       zone->open_chapter->size,
264                       zone->open_chapter->capacity - zone->open_chapter->size);
265
266         result = swap_open_chapter(zone);
267         if (result != UDS_SUCCESS)
268                 return result;
269
270         closed_chapter = zone->newest_virtual_chapter++;
271         uds_set_volume_index_zone_open_chapter(zone->index->volume_index, zone->id,
272                                                zone->newest_virtual_chapter);
273         uds_reset_open_chapter(zone->open_chapter);
274
275         finished_zones = start_closing_chapter(zone->index, zone->id,
276                                                zone->writing_chapter);
277         if ((finished_zones == 1) && (zone->index->zone_count > 1)) {
278                 result = announce_chapter_closed(zone, closed_chapter);
279                 if (result != UDS_SUCCESS)
280                         return result;
281         }
282
283         expiring = zone->oldest_virtual_chapter;
284         expire_chapters = uds_chapters_to_expire(zone->index->volume->geometry,
285                                                  zone->newest_virtual_chapter);
286         zone->oldest_virtual_chapter += expire_chapters;
287
288         if (finished_zones < zone->index->zone_count)
289                 return UDS_SUCCESS;
290
291         while (expire_chapters-- > 0)
292                 uds_forget_chapter(zone->index->volume, expiring++);
293
294         return UDS_SUCCESS;
295 }
296
297 static int handle_chapter_closed(struct index_zone *zone, u64 virtual_chapter)
298 {
299         if (zone->newest_virtual_chapter == virtual_chapter)
300                 return open_next_chapter(zone);
301
302         return UDS_SUCCESS;
303 }
304
305 static int dispatch_index_zone_control_request(struct uds_request *request)
306 {
307         struct uds_zone_message *message = &request->zone_message;
308         struct index_zone *zone = request->index->zones[request->zone_number];
309
310         switch (message->type) {
311         case UDS_MESSAGE_SPARSE_CACHE_BARRIER:
312                 return uds_update_sparse_cache(zone, message->virtual_chapter);
313
314         case UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED:
315                 return handle_chapter_closed(zone, message->virtual_chapter);
316
317         default:
318                 vdo_log_error("invalid message type: %d", message->type);
319                 return UDS_INVALID_ARGUMENT;
320         }
321 }
322
323 static void set_request_location(struct uds_request *request,
324                                  enum uds_index_region new_location)
325 {
326         request->location = new_location;
327         request->found = ((new_location == UDS_LOCATION_IN_OPEN_CHAPTER) ||
328                           (new_location == UDS_LOCATION_IN_DENSE) ||
329                           (new_location == UDS_LOCATION_IN_SPARSE));
330 }
331
332 static void set_chapter_location(struct uds_request *request,
333                                  const struct index_zone *zone, u64 virtual_chapter)
334 {
335         request->found = true;
336         if (virtual_chapter == zone->newest_virtual_chapter)
337                 request->location = UDS_LOCATION_IN_OPEN_CHAPTER;
338         else if (is_zone_chapter_sparse(zone, virtual_chapter))
339                 request->location = UDS_LOCATION_IN_SPARSE;
340         else
341                 request->location = UDS_LOCATION_IN_DENSE;
342 }
343
344 static int search_sparse_cache_in_zone(struct index_zone *zone, struct uds_request *request,
345                                        u64 virtual_chapter, bool *found)
346 {
347         int result;
348         struct volume *volume;
349         u16 record_page_number;
350         u32 chapter;
351
352         result = uds_search_sparse_cache(zone, &request->record_name, &virtual_chapter,
353                                          &record_page_number);
354         if ((result != UDS_SUCCESS) || (virtual_chapter == NO_CHAPTER))
355                 return result;
356
357         request->virtual_chapter = virtual_chapter;
358         volume = zone->index->volume;
359         chapter = uds_map_to_physical_chapter(volume->geometry, virtual_chapter);
360         return uds_search_cached_record_page(volume, request, chapter,
361                                              record_page_number, found);
362 }
363
364 static int get_record_from_zone(struct index_zone *zone, struct uds_request *request,
365                                 bool *found)
366 {
367         struct volume *volume;
368
369         if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) {
370                 *found = true;
371                 return UDS_SUCCESS;
372         } else if (request->location == UDS_LOCATION_UNAVAILABLE) {
373                 *found = false;
374                 return UDS_SUCCESS;
375         }
376
377         if (request->virtual_chapter == zone->newest_virtual_chapter) {
378                 uds_search_open_chapter(zone->open_chapter, &request->record_name,
379                                         &request->old_metadata, found);
380                 return UDS_SUCCESS;
381         }
382
383         if ((zone->newest_virtual_chapter > 0) &&
384             (request->virtual_chapter == (zone->newest_virtual_chapter - 1)) &&
385             (zone->writing_chapter->size > 0)) {
386                 uds_search_open_chapter(zone->writing_chapter, &request->record_name,
387                                         &request->old_metadata, found);
388                 return UDS_SUCCESS;
389         }
390
391         volume = zone->index->volume;
392         if (is_zone_chapter_sparse(zone, request->virtual_chapter) &&
393             uds_sparse_cache_contains(volume->sparse_cache, request->virtual_chapter,
394                                       request->zone_number))
395                 return search_sparse_cache_in_zone(zone, request,
396                                                    request->virtual_chapter, found);
397
398         return uds_search_volume_page_cache(volume, request, found);
399 }
400
401 static int put_record_in_zone(struct index_zone *zone, struct uds_request *request,
402                               const struct uds_record_data *metadata)
403 {
404         unsigned int remaining;
405
406         remaining = uds_put_open_chapter(zone->open_chapter, &request->record_name,
407                                          metadata);
408         if (remaining == 0)
409                 return open_next_chapter(zone);
410
411         return UDS_SUCCESS;
412 }
413
414 static int search_index_zone(struct index_zone *zone, struct uds_request *request)
415 {
416         int result;
417         struct volume_index_record record;
418         bool overflow_record, found = false;
419         struct uds_record_data *metadata;
420         u64 chapter;
421
422         result = uds_get_volume_index_record(zone->index->volume_index,
423                                              &request->record_name, &record);
424         if (result != UDS_SUCCESS)
425                 return result;
426
427         if (record.is_found) {
428                 if (request->requeued && request->virtual_chapter != record.virtual_chapter)
429                         set_request_location(request, UDS_LOCATION_UNKNOWN);
430
431                 request->virtual_chapter = record.virtual_chapter;
432                 result = get_record_from_zone(zone, request, &found);
433                 if (result != UDS_SUCCESS)
434                         return result;
435         }
436
437         if (found)
438                 set_chapter_location(request, zone, record.virtual_chapter);
439
440         /*
441          * If a record has overflowed a chapter index in more than one chapter (or overflowed in
442          * one chapter and collided with an existing record), it will exist as a collision record
443          * in the volume index, but we won't find it in the volume. This case needs special
444          * handling.
445          */
446         overflow_record = (record.is_found && record.is_collision && !found);
447         chapter = zone->newest_virtual_chapter;
448         if (found || overflow_record) {
449                 if ((request->type == UDS_QUERY_NO_UPDATE) ||
450                     ((request->type == UDS_QUERY) && overflow_record)) {
451                         /* There is nothing left to do. */
452                         return UDS_SUCCESS;
453                 }
454
455                 if (record.virtual_chapter != chapter) {
456                         /*
457                          * Update the volume index to reference the new chapter for the block. If
458                          * the record had been deleted or dropped from the chapter index, it will
459                          * be back.
460                          */
461                         result = uds_set_volume_index_record_chapter(&record, chapter);
462                 } else if (request->type != UDS_UPDATE) {
463                         /* The record is already in the open chapter. */
464                         return UDS_SUCCESS;
465                 }
466         } else {
467                 /*
468                  * The record wasn't in the volume index, so check whether the
469                  * name is in a cached sparse chapter. If we found the name on
470                  * a previous search, use that result instead.
471                  */
472                 if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) {
473                         found = true;
474                 } else if (request->location == UDS_LOCATION_UNAVAILABLE) {
475                         found = false;
476                 } else if (uds_is_sparse_index_geometry(zone->index->volume->geometry) &&
477                            !uds_is_volume_index_sample(zone->index->volume_index,
478                                                        &request->record_name)) {
479                         result = search_sparse_cache_in_zone(zone, request, NO_CHAPTER,
480                                                              &found);
481                         if (result != UDS_SUCCESS)
482                                 return result;
483                 }
484
485                 if (found)
486                         set_request_location(request, UDS_LOCATION_IN_SPARSE);
487
488                 if ((request->type == UDS_QUERY_NO_UPDATE) ||
489                     ((request->type == UDS_QUERY) && !found)) {
490                         /* There is nothing left to do. */
491                         return UDS_SUCCESS;
492                 }
493
494                 /*
495                  * Add a new entry to the volume index referencing the open chapter. This needs to
496                  * be done both for new records, and for records from cached sparse chapters.
497                  */
498                 result = uds_put_volume_index_record(&record, chapter);
499         }
500
501         if (result == UDS_OVERFLOW) {
502                 /*
503                  * The volume index encountered a delta list overflow. The condition was already
504                  * logged. We will go on without adding the record to the open chapter.
505                  */
506                 return UDS_SUCCESS;
507         }
508
509         if (result != UDS_SUCCESS)
510                 return result;
511
512         if (!found || (request->type == UDS_UPDATE)) {
513                 /* This is a new record or we're updating an existing record. */
514                 metadata = &request->new_metadata;
515         } else {
516                 /* Move the existing record to the open chapter. */
517                 metadata = &request->old_metadata;
518         }
519
520         return put_record_in_zone(zone, request, metadata);
521 }
522
523 static int remove_from_index_zone(struct index_zone *zone, struct uds_request *request)
524 {
525         int result;
526         struct volume_index_record record;
527
528         result = uds_get_volume_index_record(zone->index->volume_index,
529                                              &request->record_name, &record);
530         if (result != UDS_SUCCESS)
531                 return result;
532
533         if (!record.is_found)
534                 return UDS_SUCCESS;
535
536         /* If the request was requeued, check whether the saved state is still valid. */
537
538         if (record.is_collision) {
539                 set_chapter_location(request, zone, record.virtual_chapter);
540         } else {
541                 /* Non-collision records are hints, so resolve the name in the chapter. */
542                 bool found;
543
544                 if (request->requeued && request->virtual_chapter != record.virtual_chapter)
545                         set_request_location(request, UDS_LOCATION_UNKNOWN);
546
547                 request->virtual_chapter = record.virtual_chapter;
548                 result = get_record_from_zone(zone, request, &found);
549                 if (result != UDS_SUCCESS)
550                         return result;
551
552                 if (!found) {
553                         /* There is no record to remove. */
554                         return UDS_SUCCESS;
555                 }
556         }
557
558         set_chapter_location(request, zone, record.virtual_chapter);
559
560         /*
561          * Delete the volume index entry for the named record only. Note that a later search might
562          * later return stale advice if there is a colliding name in the same chapter, but it's a
563          * very rare case (1 in 2^21).
564          */
565         result = uds_remove_volume_index_record(&record);
566         if (result != UDS_SUCCESS)
567                 return result;
568
569         /*
570          * If the record is in the open chapter, we must remove it or mark it deleted to avoid
571          * trouble if the record is added again later.
572          */
573         if (request->location == UDS_LOCATION_IN_OPEN_CHAPTER)
574                 uds_remove_from_open_chapter(zone->open_chapter, &request->record_name);
575
576         return UDS_SUCCESS;
577 }
578
579 static int dispatch_index_request(struct uds_index *index, struct uds_request *request)
580 {
581         int result;
582         struct index_zone *zone = index->zones[request->zone_number];
583
584         if (!request->requeued) {
585                 result = simulate_index_zone_barrier_message(zone, request);
586                 if (result != UDS_SUCCESS)
587                         return result;
588         }
589
590         switch (request->type) {
591         case UDS_POST:
592         case UDS_UPDATE:
593         case UDS_QUERY:
594         case UDS_QUERY_NO_UPDATE:
595                 result = search_index_zone(zone, request);
596                 break;
597
598         case UDS_DELETE:
599                 result = remove_from_index_zone(zone, request);
600                 break;
601
602         default:
603                 result = vdo_log_warning_strerror(UDS_INVALID_ARGUMENT,
604                                                   "invalid request type: %d",
605                                                   request->type);
606                 break;
607         }
608
609         return result;
610 }
611
612 /* This is the request processing function invoked by each zone's thread. */
613 static void execute_zone_request(struct uds_request *request)
614 {
615         int result;
616         struct uds_index *index = request->index;
617
618         if (request->zone_message.type != UDS_MESSAGE_NONE) {
619                 result = dispatch_index_zone_control_request(request);
620                 if (result != UDS_SUCCESS) {
621                         vdo_log_error_strerror(result, "error executing message: %d",
622                                                request->zone_message.type);
623                 }
624
625                 /* Once the message is processed it can be freed. */
626                 vdo_free(vdo_forget(request));
627                 return;
628         }
629
630         index->need_to_save = true;
631         if (request->requeued && (request->status != UDS_SUCCESS)) {
632                 set_request_location(request, UDS_LOCATION_UNAVAILABLE);
633                 index->callback(request);
634                 return;
635         }
636
637         result = dispatch_index_request(index, request);
638         if (result == UDS_QUEUED) {
639                 /* The request has been requeued so don't let it complete. */
640                 return;
641         }
642
643         if (!request->found)
644                 set_request_location(request, UDS_LOCATION_UNAVAILABLE);
645
646         request->status = result;
647         index->callback(request);
648 }
649
650 static int initialize_index_queues(struct uds_index *index,
651                                    const struct index_geometry *geometry)
652 {
653         int result;
654         unsigned int i;
655
656         for (i = 0; i < index->zone_count; i++) {
657                 result = uds_make_request_queue("indexW", &execute_zone_request,
658                                                 &index->zone_queues[i]);
659                 if (result != UDS_SUCCESS)
660                         return result;
661         }
662
663         /* The triage queue is only needed for sparse multi-zone indexes. */
664         if ((index->zone_count > 1) && uds_is_sparse_index_geometry(geometry)) {
665                 result = uds_make_request_queue("triageW", &triage_request,
666                                                 &index->triage_queue);
667                 if (result != UDS_SUCCESS)
668                         return result;
669         }
670
671         return UDS_SUCCESS;
672 }
673
674 /* This is the driver function for the chapter writer thread. */
675 static void close_chapters(void *arg)
676 {
677         int result;
678         struct chapter_writer *writer = arg;
679         struct uds_index *index = writer->index;
680
681         vdo_log_debug("chapter writer starting");
682         mutex_lock(&writer->mutex);
683         for (;;) {
684                 while (writer->zones_to_write < index->zone_count) {
685                         if (writer->stop && (writer->zones_to_write == 0)) {
686                                 /*
687                                  * We've been told to stop, and all of the zones are in the same
688                                  * open chapter, so we can exit now.
689                                  */
690                                 mutex_unlock(&writer->mutex);
691                                 vdo_log_debug("chapter writer stopping");
692                                 return;
693                         }
694                         uds_wait_cond(&writer->cond, &writer->mutex);
695                 }
696
697                 /*
698                  * Release the lock while closing a chapter. We probably don't need to do this, but
699                  * it seems safer in principle. It's OK to access the chapter and chapter_number
700                  * fields without the lock since those aren't allowed to change until we're done.
701                  */
702                 mutex_unlock(&writer->mutex);
703
704                 if (index->has_saved_open_chapter) {
705                         /*
706                          * Remove the saved open chapter the first time we close an open chapter
707                          * after loading from a clean shutdown, or after doing a clean save. The
708                          * lack of the saved open chapter will indicate that a recovery is
709                          * necessary.
710                          */
711                         index->has_saved_open_chapter = false;
712                         result = uds_discard_open_chapter(index->layout);
713                         if (result == UDS_SUCCESS)
714                                 vdo_log_debug("Discarding saved open chapter");
715                 }
716
717                 result = uds_close_open_chapter(writer->chapters, index->zone_count,
718                                                 index->volume,
719                                                 writer->open_chapter_index,
720                                                 writer->collated_records,
721                                                 index->newest_virtual_chapter);
722
723                 mutex_lock(&writer->mutex);
724                 index->newest_virtual_chapter++;
725                 index->oldest_virtual_chapter +=
726                         uds_chapters_to_expire(index->volume->geometry,
727                                                index->newest_virtual_chapter);
728                 writer->result = result;
729                 writer->zones_to_write = 0;
730                 uds_broadcast_cond(&writer->cond);
731         }
732 }
733
734 static void stop_chapter_writer(struct chapter_writer *writer)
735 {
736         struct thread *writer_thread = NULL;
737
738         mutex_lock(&writer->mutex);
739         if (writer->thread != NULL) {
740                 writer_thread = writer->thread;
741                 writer->thread = NULL;
742                 writer->stop = true;
743                 uds_broadcast_cond(&writer->cond);
744         }
745         mutex_unlock(&writer->mutex);
746
747         if (writer_thread != NULL)
748                 vdo_join_threads(writer_thread);
749 }
750
751 static void free_chapter_writer(struct chapter_writer *writer)
752 {
753         if (writer == NULL)
754                 return;
755
756         stop_chapter_writer(writer);
757         uds_free_open_chapter_index(writer->open_chapter_index);
758         vdo_free(writer->collated_records);
759         vdo_free(writer);
760 }
761
762 static int make_chapter_writer(struct uds_index *index,
763                                struct chapter_writer **writer_ptr)
764 {
765         int result;
766         struct chapter_writer *writer;
767         size_t collated_records_size =
768                 (sizeof(struct uds_volume_record) * index->volume->geometry->records_per_chapter);
769
770         result = vdo_allocate_extended(struct chapter_writer, index->zone_count,
771                                        struct open_chapter_zone *, "Chapter Writer",
772                                        &writer);
773         if (result != VDO_SUCCESS)
774                 return result;
775
776         writer->index = index;
777         mutex_init(&writer->mutex);
778         uds_init_cond(&writer->cond);
779
780         result = vdo_allocate_cache_aligned(collated_records_size, "collated records",
781                                             &writer->collated_records);
782         if (result != VDO_SUCCESS) {
783                 free_chapter_writer(writer);
784                 return result;
785         }
786
787         result = uds_make_open_chapter_index(&writer->open_chapter_index,
788                                              index->volume->geometry,
789                                              index->volume->nonce);
790         if (result != UDS_SUCCESS) {
791                 free_chapter_writer(writer);
792                 return result;
793         }
794
795         writer->memory_size = (sizeof(struct chapter_writer) +
796                                index->zone_count * sizeof(struct open_chapter_zone *) +
797                                collated_records_size +
798                                writer->open_chapter_index->memory_size);
799
800         result = vdo_create_thread(close_chapters, writer, "writer", &writer->thread);
801         if (result != VDO_SUCCESS) {
802                 free_chapter_writer(writer);
803                 return result;
804         }
805
806         *writer_ptr = writer;
807         return UDS_SUCCESS;
808 }
809
810 static int load_index(struct uds_index *index)
811 {
812         int result;
813         u64 last_save_chapter;
814
815         result = uds_load_index_state(index->layout, index);
816         if (result != UDS_SUCCESS)
817                 return UDS_INDEX_NOT_SAVED_CLEANLY;
818
819         last_save_chapter = ((index->last_save != NO_LAST_SAVE) ? index->last_save : 0);
820
821         vdo_log_info("loaded index from chapter %llu through chapter %llu",
822                      (unsigned long long) index->oldest_virtual_chapter,
823                      (unsigned long long) last_save_chapter);
824
825         return UDS_SUCCESS;
826 }
827
828 static int rebuild_index_page_map(struct uds_index *index, u64 vcn)
829 {
830         int result;
831         struct delta_index_page *chapter_index_page;
832         struct index_geometry *geometry = index->volume->geometry;
833         u32 chapter = uds_map_to_physical_chapter(geometry, vcn);
834         u32 expected_list_number = 0;
835         u32 index_page_number;
836         u32 lowest_delta_list;
837         u32 highest_delta_list;
838
839         for (index_page_number = 0;
840              index_page_number < geometry->index_pages_per_chapter;
841              index_page_number++) {
842                 result = uds_get_volume_index_page(index->volume, chapter,
843                                                    index_page_number,
844                                                    &chapter_index_page);
845                 if (result != UDS_SUCCESS) {
846                         return vdo_log_error_strerror(result,
847                                                       "failed to read index page %u in chapter %u",
848                                                       index_page_number, chapter);
849                 }
850
851                 lowest_delta_list = chapter_index_page->lowest_list_number;
852                 highest_delta_list = chapter_index_page->highest_list_number;
853                 if (lowest_delta_list != expected_list_number) {
854                         return vdo_log_error_strerror(UDS_CORRUPT_DATA,
855                                                       "chapter %u index page %u is corrupt",
856                                                       chapter, index_page_number);
857                 }
858
859                 uds_update_index_page_map(index->volume->index_page_map, vcn, chapter,
860                                           index_page_number, highest_delta_list);
861                 expected_list_number = highest_delta_list + 1;
862         }
863
864         return UDS_SUCCESS;
865 }
866
867 static int replay_record(struct uds_index *index, const struct uds_record_name *name,
868                          u64 virtual_chapter, bool will_be_sparse_chapter)
869 {
870         int result;
871         struct volume_index_record record;
872         bool update_record;
873
874         if (will_be_sparse_chapter &&
875             !uds_is_volume_index_sample(index->volume_index, name)) {
876                 /*
877                  * This entry will be in a sparse chapter after the rebuild completes, and it is
878                  * not a sample, so just skip over it.
879                  */
880                 return UDS_SUCCESS;
881         }
882
883         result = uds_get_volume_index_record(index->volume_index, name, &record);
884         if (result != UDS_SUCCESS)
885                 return result;
886
887         if (record.is_found) {
888                 if (record.is_collision) {
889                         if (record.virtual_chapter == virtual_chapter) {
890                                 /* The record is already correct. */
891                                 return UDS_SUCCESS;
892                         }
893
894                         update_record = true;
895                 } else if (record.virtual_chapter == virtual_chapter) {
896                         /*
897                          * There is a volume index entry pointing to the current chapter, but we
898                          * don't know if it is for the same name as the one we are currently
899                          * working on or not. For now, we're just going to assume that it isn't.
900                          * This will create one extra collision record if there was a deleted
901                          * record in the current chapter.
902                          */
903                         update_record = false;
904                 } else {
905                         /*
906                          * If we're rebuilding, we don't normally want to go to disk to see if the
907                          * record exists, since we will likely have just read the record from disk
908                          * (i.e. we know it's there). The exception to this is when we find an
909                          * entry in the volume index that has a different chapter. In this case, we
910                          * need to search that chapter to determine if the volume index entry was
911                          * for the same record or a different one.
912                          */
913                         result = uds_search_volume_page_cache_for_rebuild(index->volume,
914                                                                           name,
915                                                                           record.virtual_chapter,
916                                                                           &update_record);
917                         if (result != UDS_SUCCESS)
918                                 return result;
919                         }
920         } else {
921                 update_record = false;
922         }
923
924         if (update_record) {
925                 /*
926                  * Update the volume index to reference the new chapter for the block. If the
927                  * record had been deleted or dropped from the chapter index, it will be back.
928                  */
929                 result = uds_set_volume_index_record_chapter(&record, virtual_chapter);
930         } else {
931                 /*
932                  * Add a new entry to the volume index referencing the open chapter. This should be
933                  * done regardless of whether we are a brand new record or a sparse record, i.e.
934                  * one that doesn't exist in the index but does on disk, since for a sparse record,
935                  * we would want to un-sparsify if it did exist.
936                  */
937                 result = uds_put_volume_index_record(&record, virtual_chapter);
938         }
939
940         if ((result == UDS_DUPLICATE_NAME) || (result == UDS_OVERFLOW)) {
941                 /* The rebuilt index will lose these records. */
942                 return UDS_SUCCESS;
943         }
944
945         return result;
946 }
947
948 static bool check_for_suspend(struct uds_index *index)
949 {
950         bool closing;
951
952         if (index->load_context == NULL)
953                 return false;
954
955         mutex_lock(&index->load_context->mutex);
956         if (index->load_context->status != INDEX_SUSPENDING) {
957                 mutex_unlock(&index->load_context->mutex);
958                 return false;
959         }
960
961         /* Notify that we are suspended and wait for the resume. */
962         index->load_context->status = INDEX_SUSPENDED;
963         uds_broadcast_cond(&index->load_context->cond);
964
965         while ((index->load_context->status != INDEX_OPENING) &&
966                (index->load_context->status != INDEX_FREEING))
967                 uds_wait_cond(&index->load_context->cond, &index->load_context->mutex);
968
969         closing = (index->load_context->status == INDEX_FREEING);
970         mutex_unlock(&index->load_context->mutex);
971         return closing;
972 }
973
974 static int replay_chapter(struct uds_index *index, u64 virtual, bool sparse)
975 {
976         int result;
977         u32 i;
978         u32 j;
979         const struct index_geometry *geometry;
980         u32 physical_chapter;
981
982         if (check_for_suspend(index)) {
983                 vdo_log_info("Replay interrupted by index shutdown at chapter %llu",
984                              (unsigned long long) virtual);
985                 return -EBUSY;
986         }
987
988         geometry = index->volume->geometry;
989         physical_chapter = uds_map_to_physical_chapter(geometry, virtual);
990         uds_prefetch_volume_chapter(index->volume, physical_chapter);
991         uds_set_volume_index_open_chapter(index->volume_index, virtual);
992
993         result = rebuild_index_page_map(index, virtual);
994         if (result != UDS_SUCCESS) {
995                 return vdo_log_error_strerror(result,
996                                               "could not rebuild index page map for chapter %u",
997                                               physical_chapter);
998         }
999
1000         for (i = 0; i < geometry->record_pages_per_chapter; i++) {
1001                 u8 *record_page;
1002                 u32 record_page_number;
1003
1004                 record_page_number = geometry->index_pages_per_chapter + i;
1005                 result = uds_get_volume_record_page(index->volume, physical_chapter,
1006                                                     record_page_number, &record_page);
1007                 if (result != UDS_SUCCESS) {
1008                         return vdo_log_error_strerror(result, "could not get page %d",
1009                                                       record_page_number);
1010                 }
1011
1012                 for (j = 0; j < geometry->records_per_page; j++) {
1013                         const u8 *name_bytes;
1014                         struct uds_record_name name;
1015
1016                         name_bytes = record_page + (j * BYTES_PER_RECORD);
1017                         memcpy(&name.name, name_bytes, UDS_RECORD_NAME_SIZE);
1018                         result = replay_record(index, &name, virtual, sparse);
1019                         if (result != UDS_SUCCESS)
1020                                 return result;
1021                 }
1022         }
1023
1024         return UDS_SUCCESS;
1025 }
1026
1027 static int replay_volume(struct uds_index *index)
1028 {
1029         int result;
1030         u64 old_map_update;
1031         u64 new_map_update;
1032         u64 virtual;
1033         u64 from_virtual = index->oldest_virtual_chapter;
1034         u64 upto_virtual = index->newest_virtual_chapter;
1035         bool will_be_sparse;
1036
1037         vdo_log_info("Replaying volume from chapter %llu through chapter %llu",
1038                      (unsigned long long) from_virtual,
1039                      (unsigned long long) upto_virtual);
1040
1041         /*
1042          * The index failed to load, so the volume index is empty. Add records to the volume index
1043          * in order, skipping non-hooks in chapters which will be sparse to save time.
1044          *
1045          * Go through each record page of each chapter and add the records back to the volume
1046          * index. This should not cause anything to be written to either the open chapter or the
1047          * on-disk volume. Also skip the on-disk chapter corresponding to upto_virtual, as this
1048          * would have already been purged from the volume index when the chapter was opened.
1049          *
1050          * Also, go through each index page for each chapter and rebuild the index page map.
1051          */
1052         old_map_update = index->volume->index_page_map->last_update;
1053         for (virtual = from_virtual; virtual < upto_virtual; virtual++) {
1054                 will_be_sparse = uds_is_chapter_sparse(index->volume->geometry,
1055                                                        from_virtual, upto_virtual,
1056                                                        virtual);
1057                 result = replay_chapter(index, virtual, will_be_sparse);
1058                 if (result != UDS_SUCCESS)
1059                         return result;
1060         }
1061
1062         /* Also reap the chapter being replaced by the open chapter. */
1063         uds_set_volume_index_open_chapter(index->volume_index, upto_virtual);
1064
1065         new_map_update = index->volume->index_page_map->last_update;
1066         if (new_map_update != old_map_update) {
1067                 vdo_log_info("replay changed index page map update from %llu to %llu",
1068                              (unsigned long long) old_map_update,
1069                              (unsigned long long) new_map_update);
1070         }
1071
1072         return UDS_SUCCESS;
1073 }
1074
1075 static int rebuild_index(struct uds_index *index)
1076 {
1077         int result;
1078         u64 lowest;
1079         u64 highest;
1080         bool is_empty = false;
1081         u32 chapters_per_volume = index->volume->geometry->chapters_per_volume;
1082
1083         index->volume->lookup_mode = LOOKUP_FOR_REBUILD;
1084         result = uds_find_volume_chapter_boundaries(index->volume, &lowest, &highest,
1085                                                     &is_empty);
1086         if (result != UDS_SUCCESS) {
1087                 return vdo_log_fatal_strerror(result,
1088                                               "cannot rebuild index: unknown volume chapter boundaries");
1089         }
1090
1091         if (is_empty) {
1092                 index->newest_virtual_chapter = 0;
1093                 index->oldest_virtual_chapter = 0;
1094                 index->volume->lookup_mode = LOOKUP_NORMAL;
1095                 return UDS_SUCCESS;
1096         }
1097
1098         index->newest_virtual_chapter = highest + 1;
1099         index->oldest_virtual_chapter = lowest;
1100         if (index->newest_virtual_chapter ==
1101             (index->oldest_virtual_chapter + chapters_per_volume)) {
1102                 /* Skip the chapter shadowed by the open chapter. */
1103                 index->oldest_virtual_chapter++;
1104         }
1105
1106         result = replay_volume(index);
1107         if (result != UDS_SUCCESS)
1108                 return result;
1109
1110         index->volume->lookup_mode = LOOKUP_NORMAL;
1111         return UDS_SUCCESS;
1112 }
1113
1114 static void free_index_zone(struct index_zone *zone)
1115 {
1116         if (zone == NULL)
1117                 return;
1118
1119         uds_free_open_chapter(zone->open_chapter);
1120         uds_free_open_chapter(zone->writing_chapter);
1121         vdo_free(zone);
1122 }
1123
1124 static int make_index_zone(struct uds_index *index, unsigned int zone_number)
1125 {
1126         int result;
1127         struct index_zone *zone;
1128
1129         result = vdo_allocate(1, struct index_zone, "index zone", &zone);
1130         if (result != VDO_SUCCESS)
1131                 return result;
1132
1133         result = uds_make_open_chapter(index->volume->geometry, index->zone_count,
1134                                        &zone->open_chapter);
1135         if (result != UDS_SUCCESS) {
1136                 free_index_zone(zone);
1137                 return result;
1138         }
1139
1140         result = uds_make_open_chapter(index->volume->geometry, index->zone_count,
1141                                        &zone->writing_chapter);
1142         if (result != UDS_SUCCESS) {
1143                 free_index_zone(zone);
1144                 return result;
1145         }
1146
1147         zone->index = index;
1148         zone->id = zone_number;
1149         index->zones[zone_number] = zone;
1150
1151         return UDS_SUCCESS;
1152 }
1153
1154 int uds_make_index(struct uds_configuration *config, enum uds_open_index_type open_type,
1155                    struct index_load_context *load_context, index_callback_fn callback,
1156                    struct uds_index **new_index)
1157 {
1158         int result;
1159         bool loaded = false;
1160         bool new = (open_type == UDS_CREATE);
1161         struct uds_index *index = NULL;
1162         struct index_zone *zone;
1163         u64 nonce;
1164         unsigned int z;
1165
1166         result = vdo_allocate_extended(struct uds_index, config->zone_count,
1167                                        struct uds_request_queue *, "index", &index);
1168         if (result != VDO_SUCCESS)
1169                 return result;
1170
1171         index->zone_count = config->zone_count;
1172
1173         result = uds_make_index_layout(config, new, &index->layout);
1174         if (result != UDS_SUCCESS) {
1175                 uds_free_index(index);
1176                 return result;
1177         }
1178
1179         result = vdo_allocate(index->zone_count, struct index_zone *, "zones",
1180                               &index->zones);
1181         if (result != VDO_SUCCESS) {
1182                 uds_free_index(index);
1183                 return result;
1184         }
1185
1186         result = uds_make_volume(config, index->layout, &index->volume);
1187         if (result != UDS_SUCCESS) {
1188                 uds_free_index(index);
1189                 return result;
1190         }
1191
1192         index->volume->lookup_mode = LOOKUP_NORMAL;
1193         for (z = 0; z < index->zone_count; z++) {
1194                 result = make_index_zone(index, z);
1195                 if (result != UDS_SUCCESS) {
1196                         uds_free_index(index);
1197                         return vdo_log_error_strerror(result,
1198                                                       "Could not create index zone");
1199                 }
1200         }
1201
1202         nonce = uds_get_volume_nonce(index->layout);
1203         result = uds_make_volume_index(config, nonce, &index->volume_index);
1204         if (result != UDS_SUCCESS) {
1205                 uds_free_index(index);
1206                 return vdo_log_error_strerror(result, "could not make volume index");
1207         }
1208
1209         index->load_context = load_context;
1210         index->callback = callback;
1211
1212         result = initialize_index_queues(index, config->geometry);
1213         if (result != UDS_SUCCESS) {
1214                 uds_free_index(index);
1215                 return result;
1216         }
1217
1218         result = make_chapter_writer(index, &index->chapter_writer);
1219         if (result != UDS_SUCCESS) {
1220                 uds_free_index(index);
1221                 return result;
1222         }
1223
1224         if (!new) {
1225                 result = load_index(index);
1226                 switch (result) {
1227                 case UDS_SUCCESS:
1228                         loaded = true;
1229                         break;
1230                 case -ENOMEM:
1231                         /* We should not try a rebuild for this error. */
1232                         vdo_log_error_strerror(result, "index could not be loaded");
1233                         break;
1234                 default:
1235                         vdo_log_error_strerror(result, "index could not be loaded");
1236                         if (open_type == UDS_LOAD) {
1237                                 result = rebuild_index(index);
1238                                 if (result != UDS_SUCCESS) {
1239                                         vdo_log_error_strerror(result,
1240                                                                "index could not be rebuilt");
1241                                 }
1242                         }
1243                         break;
1244                 }
1245         }
1246
1247         if (result != UDS_SUCCESS) {
1248                 uds_free_index(index);
1249                 return vdo_log_error_strerror(result, "fatal error in %s()", __func__);
1250         }
1251
1252         for (z = 0; z < index->zone_count; z++) {
1253                 zone = index->zones[z];
1254                 zone->oldest_virtual_chapter = index->oldest_virtual_chapter;
1255                 zone->newest_virtual_chapter = index->newest_virtual_chapter;
1256         }
1257
1258         if (index->load_context != NULL) {
1259                 mutex_lock(&index->load_context->mutex);
1260                 index->load_context->status = INDEX_READY;
1261                 /*
1262                  * If we get here, suspend is meaningless, but notify any thread trying to suspend
1263                  * us so it doesn't hang.
1264                  */
1265                 uds_broadcast_cond(&index->load_context->cond);
1266                 mutex_unlock(&index->load_context->mutex);
1267         }
1268
1269         index->has_saved_open_chapter = loaded;
1270         index->need_to_save = !loaded;
1271         *new_index = index;
1272         return UDS_SUCCESS;
1273 }
1274
1275 void uds_free_index(struct uds_index *index)
1276 {
1277         unsigned int i;
1278
1279         if (index == NULL)
1280                 return;
1281
1282         uds_request_queue_finish(index->triage_queue);
1283         for (i = 0; i < index->zone_count; i++)
1284                 uds_request_queue_finish(index->zone_queues[i]);
1285
1286         free_chapter_writer(index->chapter_writer);
1287
1288         uds_free_volume_index(index->volume_index);
1289         if (index->zones != NULL) {
1290                 for (i = 0; i < index->zone_count; i++)
1291                         free_index_zone(index->zones[i]);
1292                 vdo_free(index->zones);
1293         }
1294
1295         uds_free_volume(index->volume);
1296         uds_free_index_layout(vdo_forget(index->layout));
1297         vdo_free(index);
1298 }
1299
1300 /* Wait for the chapter writer to complete any outstanding writes. */
1301 void uds_wait_for_idle_index(struct uds_index *index)
1302 {
1303         struct chapter_writer *writer = index->chapter_writer;
1304
1305         mutex_lock(&writer->mutex);
1306         while (writer->zones_to_write > 0)
1307                 uds_wait_cond(&writer->cond, &writer->mutex);
1308         mutex_unlock(&writer->mutex);
1309 }
1310
1311 /* This function assumes that all requests have been drained. */
1312 int uds_save_index(struct uds_index *index)
1313 {
1314         int result;
1315
1316         if (!index->need_to_save)
1317                 return UDS_SUCCESS;
1318
1319         uds_wait_for_idle_index(index);
1320         index->prev_save = index->last_save;
1321         index->last_save = ((index->newest_virtual_chapter == 0) ?
1322                             NO_LAST_SAVE : index->newest_virtual_chapter - 1);
1323         vdo_log_info("beginning save (vcn %llu)", (unsigned long long) index->last_save);
1324
1325         result = uds_save_index_state(index->layout, index);
1326         if (result != UDS_SUCCESS) {
1327                 vdo_log_info("save index failed");
1328                 index->last_save = index->prev_save;
1329         } else {
1330                 index->has_saved_open_chapter = true;
1331                 index->need_to_save = false;
1332                 vdo_log_info("finished save (vcn %llu)",
1333                              (unsigned long long) index->last_save);
1334         }
1335
1336         return result;
1337 }
1338
1339 int uds_replace_index_storage(struct uds_index *index, struct block_device *bdev)
1340 {
1341         return uds_replace_volume_storage(index->volume, index->layout, bdev);
1342 }
1343
1344 /* Accessing statistics should be safe from any thread. */
1345 void uds_get_index_stats(struct uds_index *index, struct uds_index_stats *counters)
1346 {
1347         struct volume_index_stats stats;
1348
1349         uds_get_volume_index_stats(index->volume_index, &stats);
1350         counters->entries_indexed = stats.record_count;
1351         counters->collisions = stats.collision_count;
1352         counters->entries_discarded = stats.discard_count;
1353
1354         counters->memory_used = (index->volume_index->memory_size +
1355                                  index->volume->cache_size +
1356                                  index->chapter_writer->memory_size);
1357 }
1358
1359 void uds_enqueue_request(struct uds_request *request, enum request_stage stage)
1360 {
1361         struct uds_index *index = request->index;
1362         struct uds_request_queue *queue;
1363
1364         switch (stage) {
1365         case STAGE_TRIAGE:
1366                 if (index->triage_queue != NULL) {
1367                         queue = index->triage_queue;
1368                         break;
1369                 }
1370
1371                 fallthrough;
1372
1373         case STAGE_INDEX:
1374                 request->zone_number =
1375                         uds_get_volume_index_zone(index->volume_index, &request->record_name);
1376                 fallthrough;
1377
1378         case STAGE_MESSAGE:
1379                 queue = index->zone_queues[request->zone_number];
1380                 break;
1381
1382         default:
1383                 VDO_ASSERT_LOG_ONLY(false, "invalid index stage: %d", stage);
1384                 return;
1385         }
1386
1387         uds_request_queue_enqueue(queue, request);
1388 }
This page took 0.120107 seconds and 4 git commands to generate.