1 // SPDX-License-Identifier: GPL-2.0-only
3 * Copyright 2023 Red Hat
10 #include "memory-alloc.h"
12 #include "funnel-requestqueue.h"
13 #include "hash-utils.h"
14 #include "sparse-cache.h"
16 static const u64 NO_LAST_SAVE = U64_MAX;
19 * When searching for deduplication records, the index first searches the volume index, and then
20 * searches the chapter index for the relevant chapter. If the chapter has been fully committed to
21 * storage, the chapter pages are loaded into the page cache. If the chapter has not yet been
22 * committed (either the open chapter or a recently closed one), the index searches the in-memory
23 * representation of the chapter. Finally, if the volume index does not find a record and the index
24 * is sparse, the index will search the sparse cache.
26 * The index send two kinds of messages to coordinate between zones: chapter close messages for the
27 * chapter writer, and sparse cache barrier messages for the sparse cache.
29 * The chapter writer is responsible for committing chapters of records to storage. Since zones can
30 * get different numbers of records, some zones may fall behind others. Each time a zone fills up
31 * its available space in a chapter, it informs the chapter writer that the chapter is complete,
32 * and also informs all other zones that it has closed the chapter. Each other zone will then close
33 * the chapter immediately, regardless of how full it is, in order to minimize skew between zones.
34 * Once every zone has closed the chapter, the chapter writer will commit that chapter to storage.
36 * The last zone to close the chapter also removes the oldest chapter from the volume index.
37 * Although that chapter is invalid for zones that have moved on, the existence of the open chapter
38 * means that those zones will never ask the volume index about it. No zone is allowed to get more
39 * than one chapter ahead of any other. If a zone is so far ahead that it tries to close another
40 * chapter before the previous one has been closed by all zones, it is forced to wait.
42 * The sparse cache relies on having the same set of chapter indexes available to all zones. When a
43 * request wants to add a chapter to the sparse cache, it sends a barrier message to each zone
44 * during the triage stage that acts as a rendezvous. Once every zone has reached the barrier and
45 * paused its operations, the cache membership is changed and each zone is then informed that it
46 * can proceed. More details can be found in the sparse cache documentation.
48 * If a sparse cache has only one zone, it will not create a triage queue, but it still needs the
49 * barrier message to change the sparse cache membership, so the index simulates the message by
50 * invoking the handler directly.
53 struct chapter_writer {
54 /* The index to which we belong */
55 struct uds_index *index;
56 /* The thread to do the writing */
57 struct thread *thread;
58 /* The lock protecting the following fields */
60 /* The condition signalled on state changes */
62 /* Set to true to stop the thread */
64 /* The result from the most recent write */
66 /* The number of bytes allocated by the chapter writer */
68 /* The number of zones which have submitted a chapter for writing */
69 unsigned int zones_to_write;
70 /* Open chapter index used by uds_close_open_chapter() */
71 struct open_chapter_index *open_chapter_index;
72 /* Collated records used by uds_close_open_chapter() */
73 struct uds_volume_record *collated_records;
74 /* The chapters to write (one per zone) */
75 struct open_chapter_zone *chapters[];
78 static bool is_zone_chapter_sparse(const struct index_zone *zone, u64 virtual_chapter)
80 return uds_is_chapter_sparse(zone->index->volume->geometry,
81 zone->oldest_virtual_chapter,
82 zone->newest_virtual_chapter, virtual_chapter);
85 static int launch_zone_message(struct uds_zone_message message, unsigned int zone,
86 struct uds_index *index)
89 struct uds_request *request;
91 result = vdo_allocate(1, struct uds_request, __func__, &request);
92 if (result != VDO_SUCCESS)
95 request->index = index;
96 request->unbatched = true;
97 request->zone_number = zone;
98 request->zone_message = message;
100 uds_enqueue_request(request, STAGE_MESSAGE);
104 static void enqueue_barrier_messages(struct uds_index *index, u64 virtual_chapter)
106 struct uds_zone_message message = {
107 .type = UDS_MESSAGE_SPARSE_CACHE_BARRIER,
108 .virtual_chapter = virtual_chapter,
112 for (zone = 0; zone < index->zone_count; zone++) {
113 int result = launch_zone_message(message, zone, index);
115 VDO_ASSERT_LOG_ONLY((result == UDS_SUCCESS), "barrier message allocation");
120 * Determine whether this request should trigger a sparse cache barrier message to change the
121 * membership of the sparse cache. If a change in membership is desired, the function returns the
122 * chapter number to add.
124 static u64 triage_index_request(struct uds_index *index, struct uds_request *request)
127 struct index_zone *zone;
129 virtual_chapter = uds_lookup_volume_index_name(index->volume_index,
130 &request->record_name);
131 if (virtual_chapter == NO_CHAPTER)
134 zone = index->zones[request->zone_number];
135 if (!is_zone_chapter_sparse(zone, virtual_chapter))
139 * FIXME: Optimize for a common case by remembering the chapter from the most recent
140 * barrier message and skipping this chapter if is it the same.
143 return virtual_chapter;
147 * Simulate a message to change the sparse cache membership for a single-zone sparse index. This
148 * allows us to forgo the complicated locking required by a multi-zone sparse index. Any other kind
149 * of index does nothing here.
151 static int simulate_index_zone_barrier_message(struct index_zone *zone,
152 struct uds_request *request)
154 u64 sparse_virtual_chapter;
156 if ((zone->index->zone_count > 1) ||
157 !uds_is_sparse_index_geometry(zone->index->volume->geometry))
160 sparse_virtual_chapter = triage_index_request(zone->index, request);
161 if (sparse_virtual_chapter == NO_CHAPTER)
164 return uds_update_sparse_cache(zone, sparse_virtual_chapter);
167 /* This is the request processing function for the triage queue. */
168 static void triage_request(struct uds_request *request)
170 struct uds_index *index = request->index;
171 u64 sparse_virtual_chapter = triage_index_request(index, request);
173 if (sparse_virtual_chapter != NO_CHAPTER)
174 enqueue_barrier_messages(index, sparse_virtual_chapter);
176 uds_enqueue_request(request, STAGE_INDEX);
179 static int finish_previous_chapter(struct uds_index *index, u64 current_chapter_number)
182 struct chapter_writer *writer = index->chapter_writer;
184 mutex_lock(&writer->mutex);
185 while (index->newest_virtual_chapter < current_chapter_number)
186 uds_wait_cond(&writer->cond, &writer->mutex);
187 result = writer->result;
188 mutex_unlock(&writer->mutex);
190 if (result != UDS_SUCCESS)
191 return vdo_log_error_strerror(result,
192 "Writing of previous open chapter failed");
197 static int swap_open_chapter(struct index_zone *zone)
200 struct open_chapter_zone *temporary_chapter;
202 result = finish_previous_chapter(zone->index, zone->newest_virtual_chapter);
203 if (result != UDS_SUCCESS)
206 temporary_chapter = zone->open_chapter;
207 zone->open_chapter = zone->writing_chapter;
208 zone->writing_chapter = temporary_chapter;
213 * Inform the chapter writer that this zone is done with this chapter. The chapter won't start
214 * writing until all zones have closed it.
216 static unsigned int start_closing_chapter(struct uds_index *index,
217 unsigned int zone_number,
218 struct open_chapter_zone *chapter)
220 unsigned int finished_zones;
221 struct chapter_writer *writer = index->chapter_writer;
223 mutex_lock(&writer->mutex);
224 finished_zones = ++writer->zones_to_write;
225 writer->chapters[zone_number] = chapter;
226 uds_broadcast_cond(&writer->cond);
227 mutex_unlock(&writer->mutex);
229 return finished_zones;
232 static int announce_chapter_closed(struct index_zone *zone, u64 closed_chapter)
236 struct uds_zone_message zone_message = {
237 .type = UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED,
238 .virtual_chapter = closed_chapter,
241 for (i = 0; i < zone->index->zone_count; i++) {
245 result = launch_zone_message(zone_message, i, zone->index);
246 if (result != UDS_SUCCESS)
253 static int open_next_chapter(struct index_zone *zone)
258 unsigned int finished_zones;
261 vdo_log_debug("closing chapter %llu of zone %u after %u entries (%u short)",
262 (unsigned long long) zone->newest_virtual_chapter, zone->id,
263 zone->open_chapter->size,
264 zone->open_chapter->capacity - zone->open_chapter->size);
266 result = swap_open_chapter(zone);
267 if (result != UDS_SUCCESS)
270 closed_chapter = zone->newest_virtual_chapter++;
271 uds_set_volume_index_zone_open_chapter(zone->index->volume_index, zone->id,
272 zone->newest_virtual_chapter);
273 uds_reset_open_chapter(zone->open_chapter);
275 finished_zones = start_closing_chapter(zone->index, zone->id,
276 zone->writing_chapter);
277 if ((finished_zones == 1) && (zone->index->zone_count > 1)) {
278 result = announce_chapter_closed(zone, closed_chapter);
279 if (result != UDS_SUCCESS)
283 expiring = zone->oldest_virtual_chapter;
284 expire_chapters = uds_chapters_to_expire(zone->index->volume->geometry,
285 zone->newest_virtual_chapter);
286 zone->oldest_virtual_chapter += expire_chapters;
288 if (finished_zones < zone->index->zone_count)
291 while (expire_chapters-- > 0)
292 uds_forget_chapter(zone->index->volume, expiring++);
297 static int handle_chapter_closed(struct index_zone *zone, u64 virtual_chapter)
299 if (zone->newest_virtual_chapter == virtual_chapter)
300 return open_next_chapter(zone);
305 static int dispatch_index_zone_control_request(struct uds_request *request)
307 struct uds_zone_message *message = &request->zone_message;
308 struct index_zone *zone = request->index->zones[request->zone_number];
310 switch (message->type) {
311 case UDS_MESSAGE_SPARSE_CACHE_BARRIER:
312 return uds_update_sparse_cache(zone, message->virtual_chapter);
314 case UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED:
315 return handle_chapter_closed(zone, message->virtual_chapter);
318 vdo_log_error("invalid message type: %d", message->type);
319 return UDS_INVALID_ARGUMENT;
323 static void set_request_location(struct uds_request *request,
324 enum uds_index_region new_location)
326 request->location = new_location;
327 request->found = ((new_location == UDS_LOCATION_IN_OPEN_CHAPTER) ||
328 (new_location == UDS_LOCATION_IN_DENSE) ||
329 (new_location == UDS_LOCATION_IN_SPARSE));
332 static void set_chapter_location(struct uds_request *request,
333 const struct index_zone *zone, u64 virtual_chapter)
335 request->found = true;
336 if (virtual_chapter == zone->newest_virtual_chapter)
337 request->location = UDS_LOCATION_IN_OPEN_CHAPTER;
338 else if (is_zone_chapter_sparse(zone, virtual_chapter))
339 request->location = UDS_LOCATION_IN_SPARSE;
341 request->location = UDS_LOCATION_IN_DENSE;
344 static int search_sparse_cache_in_zone(struct index_zone *zone, struct uds_request *request,
345 u64 virtual_chapter, bool *found)
348 struct volume *volume;
349 u16 record_page_number;
352 result = uds_search_sparse_cache(zone, &request->record_name, &virtual_chapter,
353 &record_page_number);
354 if ((result != UDS_SUCCESS) || (virtual_chapter == NO_CHAPTER))
357 request->virtual_chapter = virtual_chapter;
358 volume = zone->index->volume;
359 chapter = uds_map_to_physical_chapter(volume->geometry, virtual_chapter);
360 return uds_search_cached_record_page(volume, request, chapter,
361 record_page_number, found);
364 static int get_record_from_zone(struct index_zone *zone, struct uds_request *request,
367 struct volume *volume;
369 if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) {
372 } else if (request->location == UDS_LOCATION_UNAVAILABLE) {
377 if (request->virtual_chapter == zone->newest_virtual_chapter) {
378 uds_search_open_chapter(zone->open_chapter, &request->record_name,
379 &request->old_metadata, found);
383 if ((zone->newest_virtual_chapter > 0) &&
384 (request->virtual_chapter == (zone->newest_virtual_chapter - 1)) &&
385 (zone->writing_chapter->size > 0)) {
386 uds_search_open_chapter(zone->writing_chapter, &request->record_name,
387 &request->old_metadata, found);
391 volume = zone->index->volume;
392 if (is_zone_chapter_sparse(zone, request->virtual_chapter) &&
393 uds_sparse_cache_contains(volume->sparse_cache, request->virtual_chapter,
394 request->zone_number))
395 return search_sparse_cache_in_zone(zone, request,
396 request->virtual_chapter, found);
398 return uds_search_volume_page_cache(volume, request, found);
401 static int put_record_in_zone(struct index_zone *zone, struct uds_request *request,
402 const struct uds_record_data *metadata)
404 unsigned int remaining;
406 remaining = uds_put_open_chapter(zone->open_chapter, &request->record_name,
409 return open_next_chapter(zone);
414 static int search_index_zone(struct index_zone *zone, struct uds_request *request)
417 struct volume_index_record record;
418 bool overflow_record, found = false;
419 struct uds_record_data *metadata;
422 result = uds_get_volume_index_record(zone->index->volume_index,
423 &request->record_name, &record);
424 if (result != UDS_SUCCESS)
427 if (record.is_found) {
428 if (request->requeued && request->virtual_chapter != record.virtual_chapter)
429 set_request_location(request, UDS_LOCATION_UNKNOWN);
431 request->virtual_chapter = record.virtual_chapter;
432 result = get_record_from_zone(zone, request, &found);
433 if (result != UDS_SUCCESS)
438 set_chapter_location(request, zone, record.virtual_chapter);
441 * If a record has overflowed a chapter index in more than one chapter (or overflowed in
442 * one chapter and collided with an existing record), it will exist as a collision record
443 * in the volume index, but we won't find it in the volume. This case needs special
446 overflow_record = (record.is_found && record.is_collision && !found);
447 chapter = zone->newest_virtual_chapter;
448 if (found || overflow_record) {
449 if ((request->type == UDS_QUERY_NO_UPDATE) ||
450 ((request->type == UDS_QUERY) && overflow_record)) {
451 /* There is nothing left to do. */
455 if (record.virtual_chapter != chapter) {
457 * Update the volume index to reference the new chapter for the block. If
458 * the record had been deleted or dropped from the chapter index, it will
461 result = uds_set_volume_index_record_chapter(&record, chapter);
462 } else if (request->type != UDS_UPDATE) {
463 /* The record is already in the open chapter. */
468 * The record wasn't in the volume index, so check whether the
469 * name is in a cached sparse chapter. If we found the name on
470 * a previous search, use that result instead.
472 if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) {
474 } else if (request->location == UDS_LOCATION_UNAVAILABLE) {
476 } else if (uds_is_sparse_index_geometry(zone->index->volume->geometry) &&
477 !uds_is_volume_index_sample(zone->index->volume_index,
478 &request->record_name)) {
479 result = search_sparse_cache_in_zone(zone, request, NO_CHAPTER,
481 if (result != UDS_SUCCESS)
486 set_request_location(request, UDS_LOCATION_IN_SPARSE);
488 if ((request->type == UDS_QUERY_NO_UPDATE) ||
489 ((request->type == UDS_QUERY) && !found)) {
490 /* There is nothing left to do. */
495 * Add a new entry to the volume index referencing the open chapter. This needs to
496 * be done both for new records, and for records from cached sparse chapters.
498 result = uds_put_volume_index_record(&record, chapter);
501 if (result == UDS_OVERFLOW) {
503 * The volume index encountered a delta list overflow. The condition was already
504 * logged. We will go on without adding the record to the open chapter.
509 if (result != UDS_SUCCESS)
512 if (!found || (request->type == UDS_UPDATE)) {
513 /* This is a new record or we're updating an existing record. */
514 metadata = &request->new_metadata;
516 /* Move the existing record to the open chapter. */
517 metadata = &request->old_metadata;
520 return put_record_in_zone(zone, request, metadata);
523 static int remove_from_index_zone(struct index_zone *zone, struct uds_request *request)
526 struct volume_index_record record;
528 result = uds_get_volume_index_record(zone->index->volume_index,
529 &request->record_name, &record);
530 if (result != UDS_SUCCESS)
533 if (!record.is_found)
536 /* If the request was requeued, check whether the saved state is still valid. */
538 if (record.is_collision) {
539 set_chapter_location(request, zone, record.virtual_chapter);
541 /* Non-collision records are hints, so resolve the name in the chapter. */
544 if (request->requeued && request->virtual_chapter != record.virtual_chapter)
545 set_request_location(request, UDS_LOCATION_UNKNOWN);
547 request->virtual_chapter = record.virtual_chapter;
548 result = get_record_from_zone(zone, request, &found);
549 if (result != UDS_SUCCESS)
553 /* There is no record to remove. */
558 set_chapter_location(request, zone, record.virtual_chapter);
561 * Delete the volume index entry for the named record only. Note that a later search might
562 * later return stale advice if there is a colliding name in the same chapter, but it's a
563 * very rare case (1 in 2^21).
565 result = uds_remove_volume_index_record(&record);
566 if (result != UDS_SUCCESS)
570 * If the record is in the open chapter, we must remove it or mark it deleted to avoid
571 * trouble if the record is added again later.
573 if (request->location == UDS_LOCATION_IN_OPEN_CHAPTER)
574 uds_remove_from_open_chapter(zone->open_chapter, &request->record_name);
579 static int dispatch_index_request(struct uds_index *index, struct uds_request *request)
582 struct index_zone *zone = index->zones[request->zone_number];
584 if (!request->requeued) {
585 result = simulate_index_zone_barrier_message(zone, request);
586 if (result != UDS_SUCCESS)
590 switch (request->type) {
594 case UDS_QUERY_NO_UPDATE:
595 result = search_index_zone(zone, request);
599 result = remove_from_index_zone(zone, request);
603 result = vdo_log_warning_strerror(UDS_INVALID_ARGUMENT,
604 "invalid request type: %d",
612 /* This is the request processing function invoked by each zone's thread. */
613 static void execute_zone_request(struct uds_request *request)
616 struct uds_index *index = request->index;
618 if (request->zone_message.type != UDS_MESSAGE_NONE) {
619 result = dispatch_index_zone_control_request(request);
620 if (result != UDS_SUCCESS) {
621 vdo_log_error_strerror(result, "error executing message: %d",
622 request->zone_message.type);
625 /* Once the message is processed it can be freed. */
626 vdo_free(vdo_forget(request));
630 index->need_to_save = true;
631 if (request->requeued && (request->status != UDS_SUCCESS)) {
632 set_request_location(request, UDS_LOCATION_UNAVAILABLE);
633 index->callback(request);
637 result = dispatch_index_request(index, request);
638 if (result == UDS_QUEUED) {
639 /* The request has been requeued so don't let it complete. */
644 set_request_location(request, UDS_LOCATION_UNAVAILABLE);
646 request->status = result;
647 index->callback(request);
650 static int initialize_index_queues(struct uds_index *index,
651 const struct index_geometry *geometry)
656 for (i = 0; i < index->zone_count; i++) {
657 result = uds_make_request_queue("indexW", &execute_zone_request,
658 &index->zone_queues[i]);
659 if (result != UDS_SUCCESS)
663 /* The triage queue is only needed for sparse multi-zone indexes. */
664 if ((index->zone_count > 1) && uds_is_sparse_index_geometry(geometry)) {
665 result = uds_make_request_queue("triageW", &triage_request,
666 &index->triage_queue);
667 if (result != UDS_SUCCESS)
674 /* This is the driver function for the chapter writer thread. */
675 static void close_chapters(void *arg)
678 struct chapter_writer *writer = arg;
679 struct uds_index *index = writer->index;
681 vdo_log_debug("chapter writer starting");
682 mutex_lock(&writer->mutex);
684 while (writer->zones_to_write < index->zone_count) {
685 if (writer->stop && (writer->zones_to_write == 0)) {
687 * We've been told to stop, and all of the zones are in the same
688 * open chapter, so we can exit now.
690 mutex_unlock(&writer->mutex);
691 vdo_log_debug("chapter writer stopping");
694 uds_wait_cond(&writer->cond, &writer->mutex);
698 * Release the lock while closing a chapter. We probably don't need to do this, but
699 * it seems safer in principle. It's OK to access the chapter and chapter_number
700 * fields without the lock since those aren't allowed to change until we're done.
702 mutex_unlock(&writer->mutex);
704 if (index->has_saved_open_chapter) {
706 * Remove the saved open chapter the first time we close an open chapter
707 * after loading from a clean shutdown, or after doing a clean save. The
708 * lack of the saved open chapter will indicate that a recovery is
711 index->has_saved_open_chapter = false;
712 result = uds_discard_open_chapter(index->layout);
713 if (result == UDS_SUCCESS)
714 vdo_log_debug("Discarding saved open chapter");
717 result = uds_close_open_chapter(writer->chapters, index->zone_count,
719 writer->open_chapter_index,
720 writer->collated_records,
721 index->newest_virtual_chapter);
723 mutex_lock(&writer->mutex);
724 index->newest_virtual_chapter++;
725 index->oldest_virtual_chapter +=
726 uds_chapters_to_expire(index->volume->geometry,
727 index->newest_virtual_chapter);
728 writer->result = result;
729 writer->zones_to_write = 0;
730 uds_broadcast_cond(&writer->cond);
734 static void stop_chapter_writer(struct chapter_writer *writer)
736 struct thread *writer_thread = NULL;
738 mutex_lock(&writer->mutex);
739 if (writer->thread != NULL) {
740 writer_thread = writer->thread;
741 writer->thread = NULL;
743 uds_broadcast_cond(&writer->cond);
745 mutex_unlock(&writer->mutex);
747 if (writer_thread != NULL)
748 vdo_join_threads(writer_thread);
751 static void free_chapter_writer(struct chapter_writer *writer)
756 stop_chapter_writer(writer);
757 uds_free_open_chapter_index(writer->open_chapter_index);
758 vdo_free(writer->collated_records);
762 static int make_chapter_writer(struct uds_index *index,
763 struct chapter_writer **writer_ptr)
766 struct chapter_writer *writer;
767 size_t collated_records_size =
768 (sizeof(struct uds_volume_record) * index->volume->geometry->records_per_chapter);
770 result = vdo_allocate_extended(struct chapter_writer, index->zone_count,
771 struct open_chapter_zone *, "Chapter Writer",
773 if (result != VDO_SUCCESS)
776 writer->index = index;
777 mutex_init(&writer->mutex);
778 uds_init_cond(&writer->cond);
780 result = vdo_allocate_cache_aligned(collated_records_size, "collated records",
781 &writer->collated_records);
782 if (result != VDO_SUCCESS) {
783 free_chapter_writer(writer);
787 result = uds_make_open_chapter_index(&writer->open_chapter_index,
788 index->volume->geometry,
789 index->volume->nonce);
790 if (result != UDS_SUCCESS) {
791 free_chapter_writer(writer);
795 writer->memory_size = (sizeof(struct chapter_writer) +
796 index->zone_count * sizeof(struct open_chapter_zone *) +
797 collated_records_size +
798 writer->open_chapter_index->memory_size);
800 result = vdo_create_thread(close_chapters, writer, "writer", &writer->thread);
801 if (result != VDO_SUCCESS) {
802 free_chapter_writer(writer);
806 *writer_ptr = writer;
810 static int load_index(struct uds_index *index)
813 u64 last_save_chapter;
815 result = uds_load_index_state(index->layout, index);
816 if (result != UDS_SUCCESS)
817 return UDS_INDEX_NOT_SAVED_CLEANLY;
819 last_save_chapter = ((index->last_save != NO_LAST_SAVE) ? index->last_save : 0);
821 vdo_log_info("loaded index from chapter %llu through chapter %llu",
822 (unsigned long long) index->oldest_virtual_chapter,
823 (unsigned long long) last_save_chapter);
828 static int rebuild_index_page_map(struct uds_index *index, u64 vcn)
831 struct delta_index_page *chapter_index_page;
832 struct index_geometry *geometry = index->volume->geometry;
833 u32 chapter = uds_map_to_physical_chapter(geometry, vcn);
834 u32 expected_list_number = 0;
835 u32 index_page_number;
836 u32 lowest_delta_list;
837 u32 highest_delta_list;
839 for (index_page_number = 0;
840 index_page_number < geometry->index_pages_per_chapter;
841 index_page_number++) {
842 result = uds_get_volume_index_page(index->volume, chapter,
844 &chapter_index_page);
845 if (result != UDS_SUCCESS) {
846 return vdo_log_error_strerror(result,
847 "failed to read index page %u in chapter %u",
848 index_page_number, chapter);
851 lowest_delta_list = chapter_index_page->lowest_list_number;
852 highest_delta_list = chapter_index_page->highest_list_number;
853 if (lowest_delta_list != expected_list_number) {
854 return vdo_log_error_strerror(UDS_CORRUPT_DATA,
855 "chapter %u index page %u is corrupt",
856 chapter, index_page_number);
859 uds_update_index_page_map(index->volume->index_page_map, vcn, chapter,
860 index_page_number, highest_delta_list);
861 expected_list_number = highest_delta_list + 1;
867 static int replay_record(struct uds_index *index, const struct uds_record_name *name,
868 u64 virtual_chapter, bool will_be_sparse_chapter)
871 struct volume_index_record record;
874 if (will_be_sparse_chapter &&
875 !uds_is_volume_index_sample(index->volume_index, name)) {
877 * This entry will be in a sparse chapter after the rebuild completes, and it is
878 * not a sample, so just skip over it.
883 result = uds_get_volume_index_record(index->volume_index, name, &record);
884 if (result != UDS_SUCCESS)
887 if (record.is_found) {
888 if (record.is_collision) {
889 if (record.virtual_chapter == virtual_chapter) {
890 /* The record is already correct. */
894 update_record = true;
895 } else if (record.virtual_chapter == virtual_chapter) {
897 * There is a volume index entry pointing to the current chapter, but we
898 * don't know if it is for the same name as the one we are currently
899 * working on or not. For now, we're just going to assume that it isn't.
900 * This will create one extra collision record if there was a deleted
901 * record in the current chapter.
903 update_record = false;
906 * If we're rebuilding, we don't normally want to go to disk to see if the
907 * record exists, since we will likely have just read the record from disk
908 * (i.e. we know it's there). The exception to this is when we find an
909 * entry in the volume index that has a different chapter. In this case, we
910 * need to search that chapter to determine if the volume index entry was
911 * for the same record or a different one.
913 result = uds_search_volume_page_cache_for_rebuild(index->volume,
915 record.virtual_chapter,
917 if (result != UDS_SUCCESS)
921 update_record = false;
926 * Update the volume index to reference the new chapter for the block. If the
927 * record had been deleted or dropped from the chapter index, it will be back.
929 result = uds_set_volume_index_record_chapter(&record, virtual_chapter);
932 * Add a new entry to the volume index referencing the open chapter. This should be
933 * done regardless of whether we are a brand new record or a sparse record, i.e.
934 * one that doesn't exist in the index but does on disk, since for a sparse record,
935 * we would want to un-sparsify if it did exist.
937 result = uds_put_volume_index_record(&record, virtual_chapter);
940 if ((result == UDS_DUPLICATE_NAME) || (result == UDS_OVERFLOW)) {
941 /* The rebuilt index will lose these records. */
948 static bool check_for_suspend(struct uds_index *index)
952 if (index->load_context == NULL)
955 mutex_lock(&index->load_context->mutex);
956 if (index->load_context->status != INDEX_SUSPENDING) {
957 mutex_unlock(&index->load_context->mutex);
961 /* Notify that we are suspended and wait for the resume. */
962 index->load_context->status = INDEX_SUSPENDED;
963 uds_broadcast_cond(&index->load_context->cond);
965 while ((index->load_context->status != INDEX_OPENING) &&
966 (index->load_context->status != INDEX_FREEING))
967 uds_wait_cond(&index->load_context->cond, &index->load_context->mutex);
969 closing = (index->load_context->status == INDEX_FREEING);
970 mutex_unlock(&index->load_context->mutex);
974 static int replay_chapter(struct uds_index *index, u64 virtual, bool sparse)
979 const struct index_geometry *geometry;
980 u32 physical_chapter;
982 if (check_for_suspend(index)) {
983 vdo_log_info("Replay interrupted by index shutdown at chapter %llu",
984 (unsigned long long) virtual);
988 geometry = index->volume->geometry;
989 physical_chapter = uds_map_to_physical_chapter(geometry, virtual);
990 uds_prefetch_volume_chapter(index->volume, physical_chapter);
991 uds_set_volume_index_open_chapter(index->volume_index, virtual);
993 result = rebuild_index_page_map(index, virtual);
994 if (result != UDS_SUCCESS) {
995 return vdo_log_error_strerror(result,
996 "could not rebuild index page map for chapter %u",
1000 for (i = 0; i < geometry->record_pages_per_chapter; i++) {
1002 u32 record_page_number;
1004 record_page_number = geometry->index_pages_per_chapter + i;
1005 result = uds_get_volume_record_page(index->volume, physical_chapter,
1006 record_page_number, &record_page);
1007 if (result != UDS_SUCCESS) {
1008 return vdo_log_error_strerror(result, "could not get page %d",
1009 record_page_number);
1012 for (j = 0; j < geometry->records_per_page; j++) {
1013 const u8 *name_bytes;
1014 struct uds_record_name name;
1016 name_bytes = record_page + (j * BYTES_PER_RECORD);
1017 memcpy(&name.name, name_bytes, UDS_RECORD_NAME_SIZE);
1018 result = replay_record(index, &name, virtual, sparse);
1019 if (result != UDS_SUCCESS)
1027 static int replay_volume(struct uds_index *index)
1033 u64 from_virtual = index->oldest_virtual_chapter;
1034 u64 upto_virtual = index->newest_virtual_chapter;
1035 bool will_be_sparse;
1037 vdo_log_info("Replaying volume from chapter %llu through chapter %llu",
1038 (unsigned long long) from_virtual,
1039 (unsigned long long) upto_virtual);
1042 * The index failed to load, so the volume index is empty. Add records to the volume index
1043 * in order, skipping non-hooks in chapters which will be sparse to save time.
1045 * Go through each record page of each chapter and add the records back to the volume
1046 * index. This should not cause anything to be written to either the open chapter or the
1047 * on-disk volume. Also skip the on-disk chapter corresponding to upto_virtual, as this
1048 * would have already been purged from the volume index when the chapter was opened.
1050 * Also, go through each index page for each chapter and rebuild the index page map.
1052 old_map_update = index->volume->index_page_map->last_update;
1053 for (virtual = from_virtual; virtual < upto_virtual; virtual++) {
1054 will_be_sparse = uds_is_chapter_sparse(index->volume->geometry,
1055 from_virtual, upto_virtual,
1057 result = replay_chapter(index, virtual, will_be_sparse);
1058 if (result != UDS_SUCCESS)
1062 /* Also reap the chapter being replaced by the open chapter. */
1063 uds_set_volume_index_open_chapter(index->volume_index, upto_virtual);
1065 new_map_update = index->volume->index_page_map->last_update;
1066 if (new_map_update != old_map_update) {
1067 vdo_log_info("replay changed index page map update from %llu to %llu",
1068 (unsigned long long) old_map_update,
1069 (unsigned long long) new_map_update);
1075 static int rebuild_index(struct uds_index *index)
1080 bool is_empty = false;
1081 u32 chapters_per_volume = index->volume->geometry->chapters_per_volume;
1083 index->volume->lookup_mode = LOOKUP_FOR_REBUILD;
1084 result = uds_find_volume_chapter_boundaries(index->volume, &lowest, &highest,
1086 if (result != UDS_SUCCESS) {
1087 return vdo_log_fatal_strerror(result,
1088 "cannot rebuild index: unknown volume chapter boundaries");
1092 index->newest_virtual_chapter = 0;
1093 index->oldest_virtual_chapter = 0;
1094 index->volume->lookup_mode = LOOKUP_NORMAL;
1098 index->newest_virtual_chapter = highest + 1;
1099 index->oldest_virtual_chapter = lowest;
1100 if (index->newest_virtual_chapter ==
1101 (index->oldest_virtual_chapter + chapters_per_volume)) {
1102 /* Skip the chapter shadowed by the open chapter. */
1103 index->oldest_virtual_chapter++;
1106 result = replay_volume(index);
1107 if (result != UDS_SUCCESS)
1110 index->volume->lookup_mode = LOOKUP_NORMAL;
1114 static void free_index_zone(struct index_zone *zone)
1119 uds_free_open_chapter(zone->open_chapter);
1120 uds_free_open_chapter(zone->writing_chapter);
1124 static int make_index_zone(struct uds_index *index, unsigned int zone_number)
1127 struct index_zone *zone;
1129 result = vdo_allocate(1, struct index_zone, "index zone", &zone);
1130 if (result != VDO_SUCCESS)
1133 result = uds_make_open_chapter(index->volume->geometry, index->zone_count,
1134 &zone->open_chapter);
1135 if (result != UDS_SUCCESS) {
1136 free_index_zone(zone);
1140 result = uds_make_open_chapter(index->volume->geometry, index->zone_count,
1141 &zone->writing_chapter);
1142 if (result != UDS_SUCCESS) {
1143 free_index_zone(zone);
1147 zone->index = index;
1148 zone->id = zone_number;
1149 index->zones[zone_number] = zone;
1154 int uds_make_index(struct uds_configuration *config, enum uds_open_index_type open_type,
1155 struct index_load_context *load_context, index_callback_fn callback,
1156 struct uds_index **new_index)
1159 bool loaded = false;
1160 bool new = (open_type == UDS_CREATE);
1161 struct uds_index *index = NULL;
1162 struct index_zone *zone;
1166 result = vdo_allocate_extended(struct uds_index, config->zone_count,
1167 struct uds_request_queue *, "index", &index);
1168 if (result != VDO_SUCCESS)
1171 index->zone_count = config->zone_count;
1173 result = uds_make_index_layout(config, new, &index->layout);
1174 if (result != UDS_SUCCESS) {
1175 uds_free_index(index);
1179 result = vdo_allocate(index->zone_count, struct index_zone *, "zones",
1181 if (result != VDO_SUCCESS) {
1182 uds_free_index(index);
1186 result = uds_make_volume(config, index->layout, &index->volume);
1187 if (result != UDS_SUCCESS) {
1188 uds_free_index(index);
1192 index->volume->lookup_mode = LOOKUP_NORMAL;
1193 for (z = 0; z < index->zone_count; z++) {
1194 result = make_index_zone(index, z);
1195 if (result != UDS_SUCCESS) {
1196 uds_free_index(index);
1197 return vdo_log_error_strerror(result,
1198 "Could not create index zone");
1202 nonce = uds_get_volume_nonce(index->layout);
1203 result = uds_make_volume_index(config, nonce, &index->volume_index);
1204 if (result != UDS_SUCCESS) {
1205 uds_free_index(index);
1206 return vdo_log_error_strerror(result, "could not make volume index");
1209 index->load_context = load_context;
1210 index->callback = callback;
1212 result = initialize_index_queues(index, config->geometry);
1213 if (result != UDS_SUCCESS) {
1214 uds_free_index(index);
1218 result = make_chapter_writer(index, &index->chapter_writer);
1219 if (result != UDS_SUCCESS) {
1220 uds_free_index(index);
1225 result = load_index(index);
1231 /* We should not try a rebuild for this error. */
1232 vdo_log_error_strerror(result, "index could not be loaded");
1235 vdo_log_error_strerror(result, "index could not be loaded");
1236 if (open_type == UDS_LOAD) {
1237 result = rebuild_index(index);
1238 if (result != UDS_SUCCESS) {
1239 vdo_log_error_strerror(result,
1240 "index could not be rebuilt");
1247 if (result != UDS_SUCCESS) {
1248 uds_free_index(index);
1249 return vdo_log_error_strerror(result, "fatal error in %s()", __func__);
1252 for (z = 0; z < index->zone_count; z++) {
1253 zone = index->zones[z];
1254 zone->oldest_virtual_chapter = index->oldest_virtual_chapter;
1255 zone->newest_virtual_chapter = index->newest_virtual_chapter;
1258 if (index->load_context != NULL) {
1259 mutex_lock(&index->load_context->mutex);
1260 index->load_context->status = INDEX_READY;
1262 * If we get here, suspend is meaningless, but notify any thread trying to suspend
1263 * us so it doesn't hang.
1265 uds_broadcast_cond(&index->load_context->cond);
1266 mutex_unlock(&index->load_context->mutex);
1269 index->has_saved_open_chapter = loaded;
1270 index->need_to_save = !loaded;
1275 void uds_free_index(struct uds_index *index)
1282 uds_request_queue_finish(index->triage_queue);
1283 for (i = 0; i < index->zone_count; i++)
1284 uds_request_queue_finish(index->zone_queues[i]);
1286 free_chapter_writer(index->chapter_writer);
1288 uds_free_volume_index(index->volume_index);
1289 if (index->zones != NULL) {
1290 for (i = 0; i < index->zone_count; i++)
1291 free_index_zone(index->zones[i]);
1292 vdo_free(index->zones);
1295 uds_free_volume(index->volume);
1296 uds_free_index_layout(vdo_forget(index->layout));
1300 /* Wait for the chapter writer to complete any outstanding writes. */
1301 void uds_wait_for_idle_index(struct uds_index *index)
1303 struct chapter_writer *writer = index->chapter_writer;
1305 mutex_lock(&writer->mutex);
1306 while (writer->zones_to_write > 0)
1307 uds_wait_cond(&writer->cond, &writer->mutex);
1308 mutex_unlock(&writer->mutex);
1311 /* This function assumes that all requests have been drained. */
1312 int uds_save_index(struct uds_index *index)
1316 if (!index->need_to_save)
1319 uds_wait_for_idle_index(index);
1320 index->prev_save = index->last_save;
1321 index->last_save = ((index->newest_virtual_chapter == 0) ?
1322 NO_LAST_SAVE : index->newest_virtual_chapter - 1);
1323 vdo_log_info("beginning save (vcn %llu)", (unsigned long long) index->last_save);
1325 result = uds_save_index_state(index->layout, index);
1326 if (result != UDS_SUCCESS) {
1327 vdo_log_info("save index failed");
1328 index->last_save = index->prev_save;
1330 index->has_saved_open_chapter = true;
1331 index->need_to_save = false;
1332 vdo_log_info("finished save (vcn %llu)",
1333 (unsigned long long) index->last_save);
1339 int uds_replace_index_storage(struct uds_index *index, struct block_device *bdev)
1341 return uds_replace_volume_storage(index->volume, index->layout, bdev);
1344 /* Accessing statistics should be safe from any thread. */
1345 void uds_get_index_stats(struct uds_index *index, struct uds_index_stats *counters)
1347 struct volume_index_stats stats;
1349 uds_get_volume_index_stats(index->volume_index, &stats);
1350 counters->entries_indexed = stats.record_count;
1351 counters->collisions = stats.collision_count;
1352 counters->entries_discarded = stats.discard_count;
1354 counters->memory_used = (index->volume_index->memory_size +
1355 index->volume->cache_size +
1356 index->chapter_writer->memory_size);
1359 void uds_enqueue_request(struct uds_request *request, enum request_stage stage)
1361 struct uds_index *index = request->index;
1362 struct uds_request_queue *queue;
1366 if (index->triage_queue != NULL) {
1367 queue = index->triage_queue;
1374 request->zone_number =
1375 uds_get_volume_index_zone(index->volume_index, &request->record_name);
1379 queue = index->zone_queues[request->zone_number];
1383 VDO_ASSERT_LOG_ONLY(false, "invalid index stage: %d", stage);
1387 uds_request_queue_enqueue(queue, request);