]> Git Repo - linux.git/blob - drivers/md/dm-vdo/data-vio.c
Merge tag 'kbuild-v6.9' of git://git.kernel.org/pub/scm/linux/kernel/git/masahiroy...
[linux.git] / drivers / md / dm-vdo / data-vio.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5
6 #include "data-vio.h"
7
8 #include <linux/atomic.h>
9 #include <linux/bio.h>
10 #include <linux/blkdev.h>
11 #include <linux/delay.h>
12 #include <linux/device-mapper.h>
13 #include <linux/jiffies.h>
14 #include <linux/kernel.h>
15 #include <linux/list.h>
16 #include <linux/lz4.h>
17 #include <linux/minmax.h>
18 #include <linux/sched.h>
19 #include <linux/spinlock.h>
20 #include <linux/wait.h>
21
22 #include "logger.h"
23 #include "memory-alloc.h"
24 #include "murmurhash3.h"
25 #include "permassert.h"
26
27 #include "block-map.h"
28 #include "dump.h"
29 #include "encodings.h"
30 #include "int-map.h"
31 #include "io-submitter.h"
32 #include "logical-zone.h"
33 #include "packer.h"
34 #include "recovery-journal.h"
35 #include "slab-depot.h"
36 #include "status-codes.h"
37 #include "types.h"
38 #include "vdo.h"
39 #include "vio.h"
40 #include "wait-queue.h"
41
42 /**
43  * DOC: Bio flags.
44  *
45  * For certain flags set on user bios, if the user bio has not yet been acknowledged, setting those
46  * flags on our own bio(s) for that request may help underlying layers better fulfill the user
47  * bio's needs. This constant contains the aggregate of those flags; VDO strips all the other
48  * flags, as they convey incorrect information.
49  *
50  * These flags are always irrelevant if we have already finished the user bio as they are only
51  * hints on IO importance. If VDO has finished the user bio, any remaining IO done doesn't care how
52  * important finishing the finished bio was.
53  *
54  * Note that bio.c contains the complete list of flags we believe may be set; the following list
55  * explains the action taken with each of those flags VDO could receive:
56  *
57  * * REQ_SYNC: Passed down if the user bio is not yet completed, since it indicates the user bio
58  *   completion is required for further work to be done by the issuer.
59  * * REQ_META: Passed down if the user bio is not yet completed, since it may mean the lower layer
60  *   treats it as more urgent, similar to REQ_SYNC.
61  * * REQ_PRIO: Passed down if the user bio is not yet completed, since it indicates the user bio is
62  *   important.
63  * * REQ_NOMERGE: Set only if the incoming bio was split; irrelevant to VDO IO.
64  * * REQ_IDLE: Set if the incoming bio had more IO quickly following; VDO's IO pattern doesn't
65  *   match incoming IO, so this flag is incorrect for it.
66  * * REQ_FUA: Handled separately, and irrelevant to VDO IO otherwise.
67  * * REQ_RAHEAD: Passed down, as, for reads, it indicates trivial importance.
68  * * REQ_BACKGROUND: Not passed down, as VIOs are a limited resource and VDO needs them recycled
69  *   ASAP to service heavy load, which is the only place where REQ_BACKGROUND might aid in load
70  *   prioritization.
71  */
72 static blk_opf_t PASSTHROUGH_FLAGS = (REQ_PRIO | REQ_META | REQ_SYNC | REQ_RAHEAD);
73
74 /**
75  * DOC:
76  *
77  * The data_vio_pool maintains the pool of data_vios which a vdo uses to service incoming bios. For
78  * correctness, and in order to avoid potentially expensive or blocking memory allocations during
79  * normal operation, the number of concurrently active data_vios is capped. Furthermore, in order
80  * to avoid starvation of reads and writes, at most 75% of the data_vios may be used for
81  * discards. The data_vio_pool is responsible for enforcing these limits. Threads submitting bios
82  * for which a data_vio or discard permit are not available will block until the necessary
83  * resources are available. The pool is also responsible for distributing resources to blocked
84  * threads and waking them. Finally, the pool attempts to batch the work of recycling data_vios by
85  * performing the work of actually assigning resources to blocked threads or placing data_vios back
86  * into the pool on a single cpu at a time.
87  *
88  * The pool contains two "limiters", one for tracking data_vios and one for tracking discard
89  * permits. The limiters also provide safe cross-thread access to pool statistics without the need
90  * to take the pool's lock. When a thread submits a bio to a vdo device, it will first attempt to
91  * get a discard permit if it is a discard, and then to get a data_vio. If the necessary resources
92  * are available, the incoming bio will be assigned to the acquired data_vio, and it will be
93  * launched. However, if either of these are unavailable, the arrival time of the bio is recorded
94  * in the bio's bi_private field, the bio and its submitter are both queued on the appropriate
95  * limiter and the submitting thread will then put itself to sleep. (note that this mechanism will
96  * break if jiffies are only 32 bits.)
97  *
98  * Whenever a data_vio has completed processing for the bio it was servicing, release_data_vio()
99  * will be called on it. This function will add the data_vio to a funnel queue, and then check the
100  * state of the pool. If the pool is not currently processing released data_vios, the pool's
101  * completion will be enqueued on a cpu queue. This obviates the need for the releasing threads to
102  * hold the pool's lock, and also batches release work while avoiding starvation of the cpu
103  * threads.
104  *
105  * Whenever the pool's completion is run on a cpu thread, it calls process_release_callback() which
106  * processes a batch of returned data_vios (currently at most 32) from the pool's funnel queue. For
107  * each data_vio, it first checks whether that data_vio was processing a discard. If so, and there
108  * is a blocked bio waiting for a discard permit, that permit is notionally transferred to the
109  * eldest discard waiter, and that waiter is moved to the end of the list of discard bios waiting
110  * for a data_vio. If there are no discard waiters, the discard permit is returned to the pool.
111  * Next, the data_vio is assigned to the oldest blocked bio which either has a discard permit, or
112  * doesn't need one and relaunched. If neither of these exist, the data_vio is returned to the
113  * pool. Finally, if any waiting bios were launched, the threads which blocked trying to submit
114  * them are awakened.
115  */
116
117 #define DATA_VIO_RELEASE_BATCH_SIZE 128
118
119 static const unsigned int VDO_SECTORS_PER_BLOCK_MASK = VDO_SECTORS_PER_BLOCK - 1;
120 static const u32 COMPRESSION_STATUS_MASK = 0xff;
121 static const u32 MAY_NOT_COMPRESS_MASK = 0x80000000;
122
123 struct limiter;
124 typedef void (*assigner_fn)(struct limiter *limiter);
125
126 /* Bookkeeping structure for a single type of resource. */
127 struct limiter {
128         /* The data_vio_pool to which this limiter belongs */
129         struct data_vio_pool *pool;
130         /* The maximum number of data_vios available */
131         data_vio_count_t limit;
132         /* The number of resources in use */
133         data_vio_count_t busy;
134         /* The maximum number of resources ever simultaneously in use */
135         data_vio_count_t max_busy;
136         /* The number of resources to release */
137         data_vio_count_t release_count;
138         /* The number of waiters to wake */
139         data_vio_count_t wake_count;
140         /* The list of waiting bios which are known to process_release_callback() */
141         struct bio_list waiters;
142         /* The list of waiting bios which are not yet known to process_release_callback() */
143         struct bio_list new_waiters;
144         /* The list of waiters which have their permits */
145         struct bio_list *permitted_waiters;
146         /* The function for assigning a resource to a waiter */
147         assigner_fn assigner;
148         /* The queue of blocked threads */
149         wait_queue_head_t blocked_threads;
150         /* The arrival time of the eldest waiter */
151         u64 arrival;
152 };
153
154 /*
155  * A data_vio_pool is a collection of preallocated data_vios which may be acquired from any thread,
156  * and are released in batches.
157  */
158 struct data_vio_pool {
159         /* Completion for scheduling releases */
160         struct vdo_completion completion;
161         /* The administrative state of the pool */
162         struct admin_state state;
163         /* Lock protecting the pool */
164         spinlock_t lock;
165         /* The main limiter controlling the total data_vios in the pool. */
166         struct limiter limiter;
167         /* The limiter controlling data_vios for discard */
168         struct limiter discard_limiter;
169         /* The list of bios which have discard permits but still need a data_vio */
170         struct bio_list permitted_discards;
171         /* The list of available data_vios */
172         struct list_head available;
173         /* The queue of data_vios waiting to be returned to the pool */
174         struct funnel_queue *queue;
175         /* Whether the pool is processing, or scheduled to process releases */
176         atomic_t processing;
177         /* The data vios in the pool */
178         struct data_vio data_vios[];
179 };
180
181 static const char * const ASYNC_OPERATION_NAMES[] = {
182         "launch",
183         "acknowledge_write",
184         "acquire_hash_lock",
185         "attempt_logical_block_lock",
186         "lock_duplicate_pbn",
187         "check_for_duplication",
188         "cleanup",
189         "compress_data_vio",
190         "find_block_map_slot",
191         "get_mapped_block_for_read",
192         "get_mapped_block_for_write",
193         "hash_data_vio",
194         "journal_remapping",
195         "vdo_attempt_packing",
196         "put_mapped_block",
197         "read_data_vio",
198         "update_dedupe_index",
199         "update_reference_counts",
200         "verify_duplication",
201         "write_data_vio",
202 };
203
204 /* The steps taken cleaning up a VIO, in the order they are performed. */
205 enum data_vio_cleanup_stage {
206         VIO_CLEANUP_START,
207         VIO_RELEASE_HASH_LOCK = VIO_CLEANUP_START,
208         VIO_RELEASE_ALLOCATED,
209         VIO_RELEASE_RECOVERY_LOCKS,
210         VIO_RELEASE_LOGICAL,
211         VIO_CLEANUP_DONE
212 };
213
214 static inline struct data_vio_pool * __must_check
215 as_data_vio_pool(struct vdo_completion *completion)
216 {
217         vdo_assert_completion_type(completion, VDO_DATA_VIO_POOL_COMPLETION);
218         return container_of(completion, struct data_vio_pool, completion);
219 }
220
221 static inline u64 get_arrival_time(struct bio *bio)
222 {
223         return (u64) bio->bi_private;
224 }
225
226 /**
227  * check_for_drain_complete_locked() - Check whether a data_vio_pool has no outstanding data_vios
228  *                                     or waiters while holding the pool's lock.
229  */
230 static bool check_for_drain_complete_locked(struct data_vio_pool *pool)
231 {
232         if (pool->limiter.busy > 0)
233                 return false;
234
235         VDO_ASSERT_LOG_ONLY((pool->discard_limiter.busy == 0),
236                             "no outstanding discard permits");
237
238         return (bio_list_empty(&pool->limiter.new_waiters) &&
239                 bio_list_empty(&pool->discard_limiter.new_waiters));
240 }
241
242 static void initialize_lbn_lock(struct data_vio *data_vio, logical_block_number_t lbn)
243 {
244         struct vdo *vdo = vdo_from_data_vio(data_vio);
245         zone_count_t zone_number;
246         struct lbn_lock *lock = &data_vio->logical;
247
248         lock->lbn = lbn;
249         lock->locked = false;
250         vdo_waitq_init(&lock->waiters);
251         zone_number = vdo_compute_logical_zone(data_vio);
252         lock->zone = &vdo->logical_zones->zones[zone_number];
253 }
254
255 static void launch_locked_request(struct data_vio *data_vio)
256 {
257         data_vio->logical.locked = true;
258         if (data_vio->write) {
259                 struct vdo *vdo = vdo_from_data_vio(data_vio);
260
261                 if (vdo_is_read_only(vdo)) {
262                         continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
263                         return;
264                 }
265         }
266
267         data_vio->last_async_operation = VIO_ASYNC_OP_FIND_BLOCK_MAP_SLOT;
268         vdo_find_block_map_slot(data_vio);
269 }
270
271 static void acknowledge_data_vio(struct data_vio *data_vio)
272 {
273         struct vdo *vdo = vdo_from_data_vio(data_vio);
274         struct bio *bio = data_vio->user_bio;
275         int error = vdo_status_to_errno(data_vio->vio.completion.result);
276
277         if (bio == NULL)
278                 return;
279
280         VDO_ASSERT_LOG_ONLY((data_vio->remaining_discard <=
281                              (u32) (VDO_BLOCK_SIZE - data_vio->offset)),
282                             "data_vio to acknowledge is not an incomplete discard");
283
284         data_vio->user_bio = NULL;
285         vdo_count_bios(&vdo->stats.bios_acknowledged, bio);
286         if (data_vio->is_partial)
287                 vdo_count_bios(&vdo->stats.bios_acknowledged_partial, bio);
288
289         bio->bi_status = errno_to_blk_status(error);
290         bio_endio(bio);
291 }
292
293 static void copy_to_bio(struct bio *bio, char *data_ptr)
294 {
295         struct bio_vec biovec;
296         struct bvec_iter iter;
297
298         bio_for_each_segment(biovec, bio, iter) {
299                 memcpy_to_bvec(&biovec, data_ptr);
300                 data_ptr += biovec.bv_len;
301         }
302 }
303
304 struct data_vio_compression_status get_data_vio_compression_status(struct data_vio *data_vio)
305 {
306         u32 packed = atomic_read(&data_vio->compression.status);
307
308         /* pairs with cmpxchg in set_data_vio_compression_status */
309         smp_rmb();
310         return (struct data_vio_compression_status) {
311                 .stage = packed & COMPRESSION_STATUS_MASK,
312                 .may_not_compress = ((packed & MAY_NOT_COMPRESS_MASK) != 0),
313         };
314 }
315
316 /**
317  * pack_status() - Convert a data_vio_compression_status into a u32 which may be stored
318  *                 atomically.
319  * @status: The state to convert.
320  *
321  * Return: The compression state packed into a u32.
322  */
323 static u32 __must_check pack_status(struct data_vio_compression_status status)
324 {
325         return status.stage | (status.may_not_compress ? MAY_NOT_COMPRESS_MASK : 0);
326 }
327
328 /**
329  * set_data_vio_compression_status() - Set the compression status of a data_vio.
330  * @state: The expected current status of the data_vio.
331  * @new_state: The status to set.
332  *
333  * Return: true if the new status was set, false if the data_vio's compression status did not
334  *         match the expected state, and so was left unchanged.
335  */
336 static bool __must_check
337 set_data_vio_compression_status(struct data_vio *data_vio,
338                                 struct data_vio_compression_status status,
339                                 struct data_vio_compression_status new_status)
340 {
341         u32 actual;
342         u32 expected = pack_status(status);
343         u32 replacement = pack_status(new_status);
344
345         /*
346          * Extra barriers because this was original developed using a CAS operation that implicitly
347          * had them.
348          */
349         smp_mb__before_atomic();
350         actual = atomic_cmpxchg(&data_vio->compression.status, expected, replacement);
351         /* same as before_atomic */
352         smp_mb__after_atomic();
353         return (expected == actual);
354 }
355
356 struct data_vio_compression_status advance_data_vio_compression_stage(struct data_vio *data_vio)
357 {
358         for (;;) {
359                 struct data_vio_compression_status status =
360                         get_data_vio_compression_status(data_vio);
361                 struct data_vio_compression_status new_status = status;
362
363                 if (status.stage == DATA_VIO_POST_PACKER) {
364                         /* We're already in the last stage. */
365                         return status;
366                 }
367
368                 if (status.may_not_compress) {
369                         /*
370                          * Compression has been dis-allowed for this VIO, so skip the rest of the
371                          * path and go to the end.
372                          */
373                         new_status.stage = DATA_VIO_POST_PACKER;
374                 } else {
375                         /* Go to the next state. */
376                         new_status.stage++;
377                 }
378
379                 if (set_data_vio_compression_status(data_vio, status, new_status))
380                         return new_status;
381
382                 /* Another thread changed the status out from under us so try again. */
383         }
384 }
385
386 /**
387  * cancel_data_vio_compression() - Prevent this data_vio from being compressed or packed.
388  *
389  * Return: true if the data_vio is in the packer and the caller was the first caller to cancel it.
390  */
391 bool cancel_data_vio_compression(struct data_vio *data_vio)
392 {
393         struct data_vio_compression_status status, new_status;
394
395         for (;;) {
396                 status = get_data_vio_compression_status(data_vio);
397                 if (status.may_not_compress || (status.stage == DATA_VIO_POST_PACKER)) {
398                         /* This data_vio is already set up to not block in the packer. */
399                         break;
400                 }
401
402                 new_status.stage = status.stage;
403                 new_status.may_not_compress = true;
404
405                 if (set_data_vio_compression_status(data_vio, status, new_status))
406                         break;
407         }
408
409         return ((status.stage == DATA_VIO_PACKING) && !status.may_not_compress);
410 }
411
412 /**
413  * attempt_logical_block_lock() - Attempt to acquire the lock on a logical block.
414  * @completion: The data_vio for an external data request as a completion.
415  *
416  * This is the start of the path for all external requests. It is registered in launch_data_vio().
417  */
418 static void attempt_logical_block_lock(struct vdo_completion *completion)
419 {
420         struct data_vio *data_vio = as_data_vio(completion);
421         struct lbn_lock *lock = &data_vio->logical;
422         struct vdo *vdo = vdo_from_data_vio(data_vio);
423         struct data_vio *lock_holder;
424         int result;
425
426         assert_data_vio_in_logical_zone(data_vio);
427
428         if (data_vio->logical.lbn >= vdo->states.vdo.config.logical_blocks) {
429                 continue_data_vio_with_error(data_vio, VDO_OUT_OF_RANGE);
430                 return;
431         }
432
433         result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn,
434                                  data_vio, false, (void **) &lock_holder);
435         if (result != VDO_SUCCESS) {
436                 continue_data_vio_with_error(data_vio, result);
437                 return;
438         }
439
440         if (lock_holder == NULL) {
441                 /* We got the lock */
442                 launch_locked_request(data_vio);
443                 return;
444         }
445
446         result = VDO_ASSERT(lock_holder->logical.locked, "logical block lock held");
447         if (result != VDO_SUCCESS) {
448                 continue_data_vio_with_error(data_vio, result);
449                 return;
450         }
451
452         /*
453          * If the new request is a pure read request (not read-modify-write) and the lock_holder is
454          * writing and has received an allocation, service the read request immediately by copying
455          * data from the lock_holder to avoid having to flush the write out of the packer just to
456          * prevent the read from waiting indefinitely. If the lock_holder does not yet have an
457          * allocation, prevent it from blocking in the packer and wait on it. This is necessary in
458          * order to prevent returning data that may not have actually been written.
459          */
460         if (!data_vio->write && READ_ONCE(lock_holder->allocation_succeeded)) {
461                 copy_to_bio(data_vio->user_bio, lock_holder->vio.data + data_vio->offset);
462                 acknowledge_data_vio(data_vio);
463                 complete_data_vio(completion);
464                 return;
465         }
466
467         data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_LOGICAL_BLOCK_LOCK;
468         vdo_waitq_enqueue_waiter(&lock_holder->logical.waiters, &data_vio->waiter);
469
470         /*
471          * Prevent writes and read-modify-writes from blocking indefinitely on lock holders in the
472          * packer.
473          */
474         if (lock_holder->write && cancel_data_vio_compression(lock_holder)) {
475                 data_vio->compression.lock_holder = lock_holder;
476                 launch_data_vio_packer_callback(data_vio,
477                                                 vdo_remove_lock_holder_from_packer);
478         }
479 }
480
481 /**
482  * launch_data_vio() - (Re)initialize a data_vio to have a new logical block number, keeping the
483  *                     same parent and other state and send it on its way.
484  */
485 static void launch_data_vio(struct data_vio *data_vio, logical_block_number_t lbn)
486 {
487         struct vdo_completion *completion = &data_vio->vio.completion;
488
489         /*
490          * Clearing the tree lock must happen before initializing the LBN lock, which also adds
491          * information to the tree lock.
492          */
493         memset(&data_vio->tree_lock, 0, sizeof(data_vio->tree_lock));
494         initialize_lbn_lock(data_vio, lbn);
495         INIT_LIST_HEAD(&data_vio->hash_lock_entry);
496         INIT_LIST_HEAD(&data_vio->write_entry);
497
498         memset(&data_vio->allocation, 0, sizeof(data_vio->allocation));
499
500         data_vio->is_duplicate = false;
501
502         memset(&data_vio->record_name, 0, sizeof(data_vio->record_name));
503         memset(&data_vio->duplicate, 0, sizeof(data_vio->duplicate));
504         vdo_reset_completion(completion);
505         completion->error_handler = handle_data_vio_error;
506         set_data_vio_logical_callback(data_vio, attempt_logical_block_lock);
507         vdo_enqueue_completion(completion, VDO_DEFAULT_Q_MAP_BIO_PRIORITY);
508 }
509
510 static bool is_zero_block(char *block)
511 {
512         int i;
513
514         for (i = 0; i < VDO_BLOCK_SIZE; i += sizeof(u64)) {
515                 if (*((u64 *) &block[i]))
516                         return false;
517         }
518
519         return true;
520 }
521
522 static void copy_from_bio(struct bio *bio, char *data_ptr)
523 {
524         struct bio_vec biovec;
525         struct bvec_iter iter;
526
527         bio_for_each_segment(biovec, bio, iter) {
528                 memcpy_from_bvec(data_ptr, &biovec);
529                 data_ptr += biovec.bv_len;
530         }
531 }
532
533 static void launch_bio(struct vdo *vdo, struct data_vio *data_vio, struct bio *bio)
534 {
535         logical_block_number_t lbn;
536         /*
537          * Zero out the fields which don't need to be preserved (i.e. which are not pointers to
538          * separately allocated objects).
539          */
540         memset(data_vio, 0, offsetof(struct data_vio, vio));
541         memset(&data_vio->compression, 0, offsetof(struct compression_state, block));
542
543         data_vio->user_bio = bio;
544         data_vio->offset = to_bytes(bio->bi_iter.bi_sector & VDO_SECTORS_PER_BLOCK_MASK);
545         data_vio->is_partial = (bio->bi_iter.bi_size < VDO_BLOCK_SIZE) || (data_vio->offset != 0);
546
547         /*
548          * Discards behave very differently than other requests when coming in from device-mapper.
549          * We have to be able to handle any size discards and various sector offsets within a
550          * block.
551          */
552         if (bio_op(bio) == REQ_OP_DISCARD) {
553                 data_vio->remaining_discard = bio->bi_iter.bi_size;
554                 data_vio->write = true;
555                 data_vio->is_discard = true;
556                 if (data_vio->is_partial) {
557                         vdo_count_bios(&vdo->stats.bios_in_partial, bio);
558                         data_vio->read = true;
559                 }
560         } else if (data_vio->is_partial) {
561                 vdo_count_bios(&vdo->stats.bios_in_partial, bio);
562                 data_vio->read = true;
563                 if (bio_data_dir(bio) == WRITE)
564                         data_vio->write = true;
565         } else if (bio_data_dir(bio) == READ) {
566                 data_vio->read = true;
567         } else {
568                 /*
569                  * Copy the bio data to a char array so that we can continue to use the data after
570                  * we acknowledge the bio.
571                  */
572                 copy_from_bio(bio, data_vio->vio.data);
573                 data_vio->is_zero = is_zero_block(data_vio->vio.data);
574                 data_vio->write = true;
575         }
576
577         if (data_vio->user_bio->bi_opf & REQ_FUA)
578                 data_vio->fua = true;
579
580         lbn = (bio->bi_iter.bi_sector - vdo->starting_sector_offset) / VDO_SECTORS_PER_BLOCK;
581         launch_data_vio(data_vio, lbn);
582 }
583
584 static void assign_data_vio(struct limiter *limiter, struct data_vio *data_vio)
585 {
586         struct bio *bio = bio_list_pop(limiter->permitted_waiters);
587
588         launch_bio(limiter->pool->completion.vdo, data_vio, bio);
589         limiter->wake_count++;
590
591         bio = bio_list_peek(limiter->permitted_waiters);
592         limiter->arrival = ((bio == NULL) ? U64_MAX : get_arrival_time(bio));
593 }
594
595 static void assign_discard_permit(struct limiter *limiter)
596 {
597         struct bio *bio = bio_list_pop(&limiter->waiters);
598
599         if (limiter->arrival == U64_MAX)
600                 limiter->arrival = get_arrival_time(bio);
601
602         bio_list_add(limiter->permitted_waiters, bio);
603 }
604
605 static void get_waiters(struct limiter *limiter)
606 {
607         bio_list_merge(&limiter->waiters, &limiter->new_waiters);
608         bio_list_init(&limiter->new_waiters);
609 }
610
611 static inline struct data_vio *get_available_data_vio(struct data_vio_pool *pool)
612 {
613         struct data_vio *data_vio =
614                 list_first_entry(&pool->available, struct data_vio, pool_entry);
615
616         list_del_init(&data_vio->pool_entry);
617         return data_vio;
618 }
619
620 static void assign_data_vio_to_waiter(struct limiter *limiter)
621 {
622         assign_data_vio(limiter, get_available_data_vio(limiter->pool));
623 }
624
625 static void update_limiter(struct limiter *limiter)
626 {
627         struct bio_list *waiters = &limiter->waiters;
628         data_vio_count_t available = limiter->limit - limiter->busy;
629
630         VDO_ASSERT_LOG_ONLY((limiter->release_count <= limiter->busy),
631                             "Release count %u is not more than busy count %u",
632                             limiter->release_count, limiter->busy);
633
634         get_waiters(limiter);
635         for (; (limiter->release_count > 0) && !bio_list_empty(waiters); limiter->release_count--)
636                 limiter->assigner(limiter);
637
638         if (limiter->release_count > 0) {
639                 WRITE_ONCE(limiter->busy, limiter->busy - limiter->release_count);
640                 limiter->release_count = 0;
641                 return;
642         }
643
644         for (; (available > 0) && !bio_list_empty(waiters); available--)
645                 limiter->assigner(limiter);
646
647         WRITE_ONCE(limiter->busy, limiter->limit - available);
648         if (limiter->max_busy < limiter->busy)
649                 WRITE_ONCE(limiter->max_busy, limiter->busy);
650 }
651
652 /**
653  * schedule_releases() - Ensure that release processing is scheduled.
654  *
655  * If this call switches the state to processing, enqueue. Otherwise, some other thread has already
656  * done so.
657  */
658 static void schedule_releases(struct data_vio_pool *pool)
659 {
660         /* Pairs with the barrier in process_release_callback(). */
661         smp_mb__before_atomic();
662         if (atomic_cmpxchg(&pool->processing, false, true))
663                 return;
664
665         pool->completion.requeue = true;
666         vdo_launch_completion_with_priority(&pool->completion,
667                                             CPU_Q_COMPLETE_VIO_PRIORITY);
668 }
669
670 static void reuse_or_release_resources(struct data_vio_pool *pool,
671                                        struct data_vio *data_vio,
672                                        struct list_head *returned)
673 {
674         if (data_vio->remaining_discard > 0) {
675                 if (bio_list_empty(&pool->discard_limiter.waiters)) {
676                         /* Return the data_vio's discard permit. */
677                         pool->discard_limiter.release_count++;
678                 } else {
679                         assign_discard_permit(&pool->discard_limiter);
680                 }
681         }
682
683         if (pool->limiter.arrival < pool->discard_limiter.arrival) {
684                 assign_data_vio(&pool->limiter, data_vio);
685         } else if (pool->discard_limiter.arrival < U64_MAX) {
686                 assign_data_vio(&pool->discard_limiter, data_vio);
687         } else {
688                 list_add(&data_vio->pool_entry, returned);
689                 pool->limiter.release_count++;
690         }
691 }
692
693 /**
694  * process_release_callback() - Process a batch of data_vio releases.
695  * @completion: The pool with data_vios to release.
696  */
697 static void process_release_callback(struct vdo_completion *completion)
698 {
699         struct data_vio_pool *pool = as_data_vio_pool(completion);
700         bool reschedule;
701         bool drained;
702         data_vio_count_t processed;
703         data_vio_count_t to_wake;
704         data_vio_count_t discards_to_wake;
705         LIST_HEAD(returned);
706
707         spin_lock(&pool->lock);
708         get_waiters(&pool->discard_limiter);
709         get_waiters(&pool->limiter);
710         spin_unlock(&pool->lock);
711
712         if (pool->limiter.arrival == U64_MAX) {
713                 struct bio *bio = bio_list_peek(&pool->limiter.waiters);
714
715                 if (bio != NULL)
716                         pool->limiter.arrival = get_arrival_time(bio);
717         }
718
719         for (processed = 0; processed < DATA_VIO_RELEASE_BATCH_SIZE; processed++) {
720                 struct data_vio *data_vio;
721                 struct funnel_queue_entry *entry = vdo_funnel_queue_poll(pool->queue);
722
723                 if (entry == NULL)
724                         break;
725
726                 data_vio = as_data_vio(container_of(entry, struct vdo_completion,
727                                                     work_queue_entry_link));
728                 acknowledge_data_vio(data_vio);
729                 reuse_or_release_resources(pool, data_vio, &returned);
730         }
731
732         spin_lock(&pool->lock);
733         /*
734          * There is a race where waiters could be added while we are in the unlocked section above.
735          * Those waiters could not see the resources we are now about to release, so we assign
736          * those resources now as we have no guarantee of being rescheduled. This is handled in
737          * update_limiter().
738          */
739         update_limiter(&pool->discard_limiter);
740         list_splice(&returned, &pool->available);
741         update_limiter(&pool->limiter);
742         to_wake = pool->limiter.wake_count;
743         pool->limiter.wake_count = 0;
744         discards_to_wake = pool->discard_limiter.wake_count;
745         pool->discard_limiter.wake_count = 0;
746
747         atomic_set(&pool->processing, false);
748         /* Pairs with the barrier in schedule_releases(). */
749         smp_mb();
750
751         reschedule = !vdo_is_funnel_queue_empty(pool->queue);
752         drained = (!reschedule &&
753                    vdo_is_state_draining(&pool->state) &&
754                    check_for_drain_complete_locked(pool));
755         spin_unlock(&pool->lock);
756
757         if (to_wake > 0)
758                 wake_up_nr(&pool->limiter.blocked_threads, to_wake);
759
760         if (discards_to_wake > 0)
761                 wake_up_nr(&pool->discard_limiter.blocked_threads, discards_to_wake);
762
763         if (reschedule)
764                 schedule_releases(pool);
765         else if (drained)
766                 vdo_finish_draining(&pool->state);
767 }
768
769 static void initialize_limiter(struct limiter *limiter, struct data_vio_pool *pool,
770                                assigner_fn assigner, data_vio_count_t limit)
771 {
772         limiter->pool = pool;
773         limiter->assigner = assigner;
774         limiter->limit = limit;
775         limiter->arrival = U64_MAX;
776         init_waitqueue_head(&limiter->blocked_threads);
777 }
778
779 /**
780  * initialize_data_vio() - Allocate the components of a data_vio.
781  *
782  * The caller is responsible for cleaning up the data_vio on error.
783  *
784  * Return: VDO_SUCCESS or an error.
785  */
786 static int initialize_data_vio(struct data_vio *data_vio, struct vdo *vdo)
787 {
788         struct bio *bio;
789         int result;
790
791         BUILD_BUG_ON(VDO_BLOCK_SIZE > PAGE_SIZE);
792         result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "data_vio data",
793                                      &data_vio->vio.data);
794         if (result != VDO_SUCCESS)
795                 return vdo_log_error_strerror(result,
796                                               "data_vio data allocation failure");
797
798         result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "compressed block",
799                                      &data_vio->compression.block);
800         if (result != VDO_SUCCESS) {
801                 return vdo_log_error_strerror(result,
802                                               "data_vio compressed block allocation failure");
803         }
804
805         result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "vio scratch",
806                                      &data_vio->scratch_block);
807         if (result != VDO_SUCCESS)
808                 return vdo_log_error_strerror(result,
809                                               "data_vio scratch allocation failure");
810
811         result = vdo_create_bio(&bio);
812         if (result != VDO_SUCCESS)
813                 return vdo_log_error_strerror(result,
814                                               "data_vio data bio allocation failure");
815
816         vdo_initialize_completion(&data_vio->decrement_completion, vdo,
817                                   VDO_DECREMENT_COMPLETION);
818         initialize_vio(&data_vio->vio, bio, 1, VIO_TYPE_DATA, VIO_PRIORITY_DATA, vdo);
819
820         return VDO_SUCCESS;
821 }
822
823 static void destroy_data_vio(struct data_vio *data_vio)
824 {
825         if (data_vio == NULL)
826                 return;
827
828         vdo_free_bio(vdo_forget(data_vio->vio.bio));
829         vdo_free(vdo_forget(data_vio->vio.data));
830         vdo_free(vdo_forget(data_vio->compression.block));
831         vdo_free(vdo_forget(data_vio->scratch_block));
832 }
833
834 /**
835  * make_data_vio_pool() - Initialize a data_vio pool.
836  * @vdo: The vdo to which the pool will belong.
837  * @pool_size: The number of data_vios in the pool.
838  * @discard_limit: The maximum number of data_vios which may be used for discards.
839  * @pool: A pointer to hold the newly allocated pool.
840  */
841 int make_data_vio_pool(struct vdo *vdo, data_vio_count_t pool_size,
842                        data_vio_count_t discard_limit, struct data_vio_pool **pool_ptr)
843 {
844         int result;
845         struct data_vio_pool *pool;
846         data_vio_count_t i;
847
848         result = vdo_allocate_extended(struct data_vio_pool, pool_size, struct data_vio,
849                                        __func__, &pool);
850         if (result != VDO_SUCCESS)
851                 return result;
852
853         VDO_ASSERT_LOG_ONLY((discard_limit <= pool_size),
854                             "discard limit does not exceed pool size");
855         initialize_limiter(&pool->discard_limiter, pool, assign_discard_permit,
856                            discard_limit);
857         pool->discard_limiter.permitted_waiters = &pool->permitted_discards;
858         initialize_limiter(&pool->limiter, pool, assign_data_vio_to_waiter, pool_size);
859         pool->limiter.permitted_waiters = &pool->limiter.waiters;
860         INIT_LIST_HEAD(&pool->available);
861         spin_lock_init(&pool->lock);
862         vdo_set_admin_state_code(&pool->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
863         vdo_initialize_completion(&pool->completion, vdo, VDO_DATA_VIO_POOL_COMPLETION);
864         vdo_prepare_completion(&pool->completion, process_release_callback,
865                                process_release_callback, vdo->thread_config.cpu_thread,
866                                NULL);
867
868         result = vdo_make_funnel_queue(&pool->queue);
869         if (result != VDO_SUCCESS) {
870                 free_data_vio_pool(vdo_forget(pool));
871                 return result;
872         }
873
874         for (i = 0; i < pool_size; i++) {
875                 struct data_vio *data_vio = &pool->data_vios[i];
876
877                 result = initialize_data_vio(data_vio, vdo);
878                 if (result != VDO_SUCCESS) {
879                         destroy_data_vio(data_vio);
880                         free_data_vio_pool(pool);
881                         return result;
882                 }
883
884                 list_add(&data_vio->pool_entry, &pool->available);
885         }
886
887         *pool_ptr = pool;
888         return VDO_SUCCESS;
889 }
890
891 /**
892  * free_data_vio_pool() - Free a data_vio_pool and the data_vios in it.
893  *
894  * All data_vios must be returned to the pool before calling this function.
895  */
896 void free_data_vio_pool(struct data_vio_pool *pool)
897 {
898         struct data_vio *data_vio, *tmp;
899
900         if (pool == NULL)
901                 return;
902
903         /*
904          * Pairs with the barrier in process_release_callback(). Possibly not needed since it
905          * caters to an enqueue vs. free race.
906          */
907         smp_mb();
908         BUG_ON(atomic_read(&pool->processing));
909
910         spin_lock(&pool->lock);
911         VDO_ASSERT_LOG_ONLY((pool->limiter.busy == 0),
912                             "data_vio pool must not have %u busy entries when being freed",
913                             pool->limiter.busy);
914         VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->limiter.waiters) &&
915                              bio_list_empty(&pool->limiter.new_waiters)),
916                             "data_vio pool must not have threads waiting to read or write when being freed");
917         VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->discard_limiter.waiters) &&
918                              bio_list_empty(&pool->discard_limiter.new_waiters)),
919                             "data_vio pool must not have threads waiting to discard when being freed");
920         spin_unlock(&pool->lock);
921
922         list_for_each_entry_safe(data_vio, tmp, &pool->available, pool_entry) {
923                 list_del_init(&data_vio->pool_entry);
924                 destroy_data_vio(data_vio);
925         }
926
927         vdo_free_funnel_queue(vdo_forget(pool->queue));
928         vdo_free(pool);
929 }
930
931 static bool acquire_permit(struct limiter *limiter)
932 {
933         if (limiter->busy >= limiter->limit)
934                 return false;
935
936         WRITE_ONCE(limiter->busy, limiter->busy + 1);
937         if (limiter->max_busy < limiter->busy)
938                 WRITE_ONCE(limiter->max_busy, limiter->busy);
939         return true;
940 }
941
942 static void wait_permit(struct limiter *limiter, struct bio *bio)
943         __releases(&limiter->pool->lock)
944 {
945         DEFINE_WAIT(wait);
946
947         bio_list_add(&limiter->new_waiters, bio);
948         prepare_to_wait_exclusive(&limiter->blocked_threads, &wait,
949                                   TASK_UNINTERRUPTIBLE);
950         spin_unlock(&limiter->pool->lock);
951         io_schedule();
952         finish_wait(&limiter->blocked_threads, &wait);
953 }
954
955 /**
956  * vdo_launch_bio() - Acquire a data_vio from the pool, assign the bio to it, and launch it.
957  *
958  * This will block if data_vios or discard permits are not available.
959  */
960 void vdo_launch_bio(struct data_vio_pool *pool, struct bio *bio)
961 {
962         struct data_vio *data_vio;
963
964         VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&pool->state),
965                             "data_vio_pool not quiescent on acquire");
966
967         bio->bi_private = (void *) jiffies;
968         spin_lock(&pool->lock);
969         if ((bio_op(bio) == REQ_OP_DISCARD) &&
970             !acquire_permit(&pool->discard_limiter)) {
971                 wait_permit(&pool->discard_limiter, bio);
972                 return;
973         }
974
975         if (!acquire_permit(&pool->limiter)) {
976                 wait_permit(&pool->limiter, bio);
977                 return;
978         }
979
980         data_vio = get_available_data_vio(pool);
981         spin_unlock(&pool->lock);
982         launch_bio(pool->completion.vdo, data_vio, bio);
983 }
984
985 /* Implements vdo_admin_initiator_fn. */
986 static void initiate_drain(struct admin_state *state)
987 {
988         bool drained;
989         struct data_vio_pool *pool = container_of(state, struct data_vio_pool, state);
990
991         spin_lock(&pool->lock);
992         drained = check_for_drain_complete_locked(pool);
993         spin_unlock(&pool->lock);
994
995         if (drained)
996                 vdo_finish_draining(state);
997 }
998
999 static void assert_on_vdo_cpu_thread(const struct vdo *vdo, const char *name)
1000 {
1001         VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.cpu_thread),
1002                             "%s called on cpu thread", name);
1003 }
1004
1005 /**
1006  * drain_data_vio_pool() - Wait asynchronously for all data_vios to be returned to the pool.
1007  * @completion: The completion to notify when the pool has drained.
1008  */
1009 void drain_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion)
1010 {
1011         assert_on_vdo_cpu_thread(completion->vdo, __func__);
1012         vdo_start_draining(&pool->state, VDO_ADMIN_STATE_SUSPENDING, completion,
1013                            initiate_drain);
1014 }
1015
1016 /**
1017  * resume_data_vio_pool() - Resume a data_vio pool.
1018  * @completion: The completion to notify when the pool has resumed.
1019  */
1020 void resume_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion)
1021 {
1022         assert_on_vdo_cpu_thread(completion->vdo, __func__);
1023         vdo_continue_completion(completion, vdo_resume_if_quiescent(&pool->state));
1024 }
1025
1026 static void dump_limiter(const char *name, struct limiter *limiter)
1027 {
1028         vdo_log_info("%s: %u of %u busy (max %u), %s", name, limiter->busy,
1029                      limiter->limit, limiter->max_busy,
1030                      ((bio_list_empty(&limiter->waiters) &&
1031                        bio_list_empty(&limiter->new_waiters)) ?
1032                       "no waiters" : "has waiters"));
1033 }
1034
1035 /**
1036  * dump_data_vio_pool() - Dump a data_vio pool to the log.
1037  * @dump_vios: Whether to dump the details of each busy data_vio as well.
1038  */
1039 void dump_data_vio_pool(struct data_vio_pool *pool, bool dump_vios)
1040 {
1041         /*
1042          * In order that syslog can empty its buffer, sleep after 35 elements for 4ms (till the
1043          * second clock tick).  These numbers were picked based on experiments with lab machines.
1044          */
1045         static const int ELEMENTS_PER_BATCH = 35;
1046         static const int SLEEP_FOR_SYSLOG = 4000;
1047
1048         if (pool == NULL)
1049                 return;
1050
1051         spin_lock(&pool->lock);
1052         dump_limiter("data_vios", &pool->limiter);
1053         dump_limiter("discard permits", &pool->discard_limiter);
1054         if (dump_vios) {
1055                 int i;
1056                 int dumped = 0;
1057
1058                 for (i = 0; i < pool->limiter.limit; i++) {
1059                         struct data_vio *data_vio = &pool->data_vios[i];
1060
1061                         if (!list_empty(&data_vio->pool_entry))
1062                                 continue;
1063
1064                         dump_data_vio(data_vio);
1065                         if (++dumped >= ELEMENTS_PER_BATCH) {
1066                                 spin_unlock(&pool->lock);
1067                                 dumped = 0;
1068                                 fsleep(SLEEP_FOR_SYSLOG);
1069                                 spin_lock(&pool->lock);
1070                         }
1071                 }
1072         }
1073
1074         spin_unlock(&pool->lock);
1075 }
1076
1077 data_vio_count_t get_data_vio_pool_active_discards(struct data_vio_pool *pool)
1078 {
1079         return READ_ONCE(pool->discard_limiter.busy);
1080 }
1081
1082 data_vio_count_t get_data_vio_pool_discard_limit(struct data_vio_pool *pool)
1083 {
1084         return READ_ONCE(pool->discard_limiter.limit);
1085 }
1086
1087 data_vio_count_t get_data_vio_pool_maximum_discards(struct data_vio_pool *pool)
1088 {
1089         return READ_ONCE(pool->discard_limiter.max_busy);
1090 }
1091
1092 int set_data_vio_pool_discard_limit(struct data_vio_pool *pool, data_vio_count_t limit)
1093 {
1094         if (get_data_vio_pool_request_limit(pool) < limit) {
1095                 // The discard limit may not be higher than the data_vio limit.
1096                 return -EINVAL;
1097         }
1098
1099         spin_lock(&pool->lock);
1100         pool->discard_limiter.limit = limit;
1101         spin_unlock(&pool->lock);
1102
1103         return VDO_SUCCESS;
1104 }
1105
1106 data_vio_count_t get_data_vio_pool_active_requests(struct data_vio_pool *pool)
1107 {
1108         return READ_ONCE(pool->limiter.busy);
1109 }
1110
1111 data_vio_count_t get_data_vio_pool_request_limit(struct data_vio_pool *pool)
1112 {
1113         return READ_ONCE(pool->limiter.limit);
1114 }
1115
1116 data_vio_count_t get_data_vio_pool_maximum_requests(struct data_vio_pool *pool)
1117 {
1118         return READ_ONCE(pool->limiter.max_busy);
1119 }
1120
1121 static void update_data_vio_error_stats(struct data_vio *data_vio)
1122 {
1123         u8 index = 0;
1124         static const char * const operations[] = {
1125                 [0] = "empty",
1126                 [1] = "read",
1127                 [2] = "write",
1128                 [3] = "read-modify-write",
1129                 [5] = "read+fua",
1130                 [6] = "write+fua",
1131                 [7] = "read-modify-write+fua",
1132         };
1133
1134         if (data_vio->read)
1135                 index = 1;
1136
1137         if (data_vio->write)
1138                 index += 2;
1139
1140         if (data_vio->fua)
1141                 index += 4;
1142
1143         update_vio_error_stats(&data_vio->vio,
1144                                "Completing %s vio for LBN %llu with error after %s",
1145                                operations[index],
1146                                (unsigned long long) data_vio->logical.lbn,
1147                                get_data_vio_operation_name(data_vio));
1148 }
1149
1150 static void perform_cleanup_stage(struct data_vio *data_vio,
1151                                   enum data_vio_cleanup_stage stage);
1152
1153 /**
1154  * release_allocated_lock() - Release the PBN lock and/or the reference on the allocated block at
1155  *                            the end of processing a data_vio.
1156  */
1157 static void release_allocated_lock(struct vdo_completion *completion)
1158 {
1159         struct data_vio *data_vio = as_data_vio(completion);
1160
1161         assert_data_vio_in_allocated_zone(data_vio);
1162         release_data_vio_allocation_lock(data_vio, false);
1163         perform_cleanup_stage(data_vio, VIO_RELEASE_RECOVERY_LOCKS);
1164 }
1165
1166 /** release_lock() - Release an uncontended LBN lock. */
1167 static void release_lock(struct data_vio *data_vio, struct lbn_lock *lock)
1168 {
1169         struct int_map *lock_map = lock->zone->lbn_operations;
1170         struct data_vio *lock_holder;
1171
1172         if (!lock->locked) {
1173                 /*  The lock is not locked, so it had better not be registered in the lock map. */
1174                 struct data_vio *lock_holder = vdo_int_map_get(lock_map, lock->lbn);
1175
1176                 VDO_ASSERT_LOG_ONLY((data_vio != lock_holder),
1177                                     "no logical block lock held for block %llu",
1178                                     (unsigned long long) lock->lbn);
1179                 return;
1180         }
1181
1182         /* Release the lock by removing the lock from the map. */
1183         lock_holder = vdo_int_map_remove(lock_map, lock->lbn);
1184         VDO_ASSERT_LOG_ONLY((data_vio == lock_holder),
1185                             "logical block lock mismatch for block %llu",
1186                             (unsigned long long) lock->lbn);
1187         lock->locked = false;
1188 }
1189
1190 /** transfer_lock() - Transfer a contended LBN lock to the eldest waiter. */
1191 static void transfer_lock(struct data_vio *data_vio, struct lbn_lock *lock)
1192 {
1193         struct data_vio *lock_holder, *next_lock_holder;
1194         int result;
1195
1196         VDO_ASSERT_LOG_ONLY(lock->locked, "lbn_lock with waiters is not locked");
1197
1198         /* Another data_vio is waiting for the lock, transfer it in a single lock map operation. */
1199         next_lock_holder =
1200                 vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters));
1201
1202         /* Transfer the remaining lock waiters to the next lock holder. */
1203         vdo_waitq_transfer_all_waiters(&lock->waiters,
1204                                        &next_lock_holder->logical.waiters);
1205
1206         result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn,
1207                                  next_lock_holder, true, (void **) &lock_holder);
1208         if (result != VDO_SUCCESS) {
1209                 continue_data_vio_with_error(next_lock_holder, result);
1210                 return;
1211         }
1212
1213         VDO_ASSERT_LOG_ONLY((lock_holder == data_vio),
1214                             "logical block lock mismatch for block %llu",
1215                             (unsigned long long) lock->lbn);
1216         lock->locked = false;
1217
1218         /*
1219          * If there are still waiters, other data_vios must be trying to get the lock we just
1220          * transferred. We must ensure that the new lock holder doesn't block in the packer.
1221          */
1222         if (vdo_waitq_has_waiters(&next_lock_holder->logical.waiters))
1223                 cancel_data_vio_compression(next_lock_holder);
1224
1225         /*
1226          * Avoid stack overflow on lock transfer.
1227          * FIXME: this is only an issue in the 1 thread config.
1228          */
1229         next_lock_holder->vio.completion.requeue = true;
1230         launch_locked_request(next_lock_holder);
1231 }
1232
1233 /**
1234  * release_logical_lock() - Release the logical block lock and flush generation lock at the end of
1235  *                          processing a data_vio.
1236  */
1237 static void release_logical_lock(struct vdo_completion *completion)
1238 {
1239         struct data_vio *data_vio = as_data_vio(completion);
1240         struct lbn_lock *lock = &data_vio->logical;
1241
1242         assert_data_vio_in_logical_zone(data_vio);
1243
1244         if (vdo_waitq_has_waiters(&lock->waiters))
1245                 transfer_lock(data_vio, lock);
1246         else
1247                 release_lock(data_vio, lock);
1248
1249         vdo_release_flush_generation_lock(data_vio);
1250         perform_cleanup_stage(data_vio, VIO_CLEANUP_DONE);
1251 }
1252
1253 /** clean_hash_lock() - Release the hash lock at the end of processing a data_vio. */
1254 static void clean_hash_lock(struct vdo_completion *completion)
1255 {
1256         struct data_vio *data_vio = as_data_vio(completion);
1257
1258         assert_data_vio_in_hash_zone(data_vio);
1259         if (completion->result != VDO_SUCCESS) {
1260                 vdo_clean_failed_hash_lock(data_vio);
1261                 return;
1262         }
1263
1264         vdo_release_hash_lock(data_vio);
1265         perform_cleanup_stage(data_vio, VIO_RELEASE_LOGICAL);
1266 }
1267
1268 /**
1269  * finish_cleanup() - Make some assertions about a data_vio which has finished cleaning up.
1270  *
1271  * If it is part of a multi-block discard, starts on the next block, otherwise, returns it to the
1272  * pool.
1273  */
1274 static void finish_cleanup(struct data_vio *data_vio)
1275 {
1276         struct vdo_completion *completion = &data_vio->vio.completion;
1277
1278         VDO_ASSERT_LOG_ONLY(data_vio->allocation.lock == NULL,
1279                             "complete data_vio has no allocation lock");
1280         VDO_ASSERT_LOG_ONLY(data_vio->hash_lock == NULL,
1281                             "complete data_vio has no hash lock");
1282         if ((data_vio->remaining_discard <= VDO_BLOCK_SIZE) ||
1283             (completion->result != VDO_SUCCESS)) {
1284                 struct data_vio_pool *pool = completion->vdo->data_vio_pool;
1285
1286                 vdo_funnel_queue_put(pool->queue, &completion->work_queue_entry_link);
1287                 schedule_releases(pool);
1288                 return;
1289         }
1290
1291         data_vio->remaining_discard -= min_t(u32, data_vio->remaining_discard,
1292                                              VDO_BLOCK_SIZE - data_vio->offset);
1293         data_vio->is_partial = (data_vio->remaining_discard < VDO_BLOCK_SIZE);
1294         data_vio->read = data_vio->is_partial;
1295         data_vio->offset = 0;
1296         completion->requeue = true;
1297         launch_data_vio(data_vio, data_vio->logical.lbn + 1);
1298 }
1299
1300 /** perform_cleanup_stage() - Perform the next step in the process of cleaning up a data_vio. */
1301 static void perform_cleanup_stage(struct data_vio *data_vio,
1302                                   enum data_vio_cleanup_stage stage)
1303 {
1304         struct vdo *vdo = vdo_from_data_vio(data_vio);
1305
1306         switch (stage) {
1307         case VIO_RELEASE_HASH_LOCK:
1308                 if (data_vio->hash_lock != NULL) {
1309                         launch_data_vio_hash_zone_callback(data_vio, clean_hash_lock);
1310                         return;
1311                 }
1312                 fallthrough;
1313
1314         case VIO_RELEASE_ALLOCATED:
1315                 if (data_vio_has_allocation(data_vio)) {
1316                         launch_data_vio_allocated_zone_callback(data_vio,
1317                                                                 release_allocated_lock);
1318                         return;
1319                 }
1320                 fallthrough;
1321
1322         case VIO_RELEASE_RECOVERY_LOCKS:
1323                 if ((data_vio->recovery_sequence_number > 0) &&
1324                     (READ_ONCE(vdo->read_only_notifier.read_only_error) == VDO_SUCCESS) &&
1325                     (data_vio->vio.completion.result != VDO_READ_ONLY))
1326                         vdo_log_warning("VDO not read-only when cleaning data_vio with RJ lock");
1327                 fallthrough;
1328
1329         case VIO_RELEASE_LOGICAL:
1330                 launch_data_vio_logical_callback(data_vio, release_logical_lock);
1331                 return;
1332
1333         default:
1334                 finish_cleanup(data_vio);
1335         }
1336 }
1337
1338 void complete_data_vio(struct vdo_completion *completion)
1339 {
1340         struct data_vio *data_vio = as_data_vio(completion);
1341
1342         completion->error_handler = NULL;
1343         data_vio->last_async_operation = VIO_ASYNC_OP_CLEANUP;
1344         perform_cleanup_stage(data_vio,
1345                               (data_vio->write ? VIO_CLEANUP_START : VIO_RELEASE_LOGICAL));
1346 }
1347
1348 static void enter_read_only_mode(struct vdo_completion *completion)
1349 {
1350         if (vdo_is_read_only(completion->vdo))
1351                 return;
1352
1353         if (completion->result != VDO_READ_ONLY) {
1354                 struct data_vio *data_vio = as_data_vio(completion);
1355
1356                 vdo_log_error_strerror(completion->result,
1357                                        "Preparing to enter read-only mode: data_vio for LBN %llu (becoming mapped to %llu, previously mapped to %llu, allocated %llu) is completing with a fatal error after operation %s",
1358                                        (unsigned long long) data_vio->logical.lbn,
1359                                        (unsigned long long) data_vio->new_mapped.pbn,
1360                                        (unsigned long long) data_vio->mapped.pbn,
1361                                        (unsigned long long) data_vio->allocation.pbn,
1362                                        get_data_vio_operation_name(data_vio));
1363         }
1364
1365         vdo_enter_read_only_mode(completion->vdo, completion->result);
1366 }
1367
1368 void handle_data_vio_error(struct vdo_completion *completion)
1369 {
1370         struct data_vio *data_vio = as_data_vio(completion);
1371
1372         if ((completion->result == VDO_READ_ONLY) || (data_vio->user_bio == NULL))
1373                 enter_read_only_mode(completion);
1374
1375         update_data_vio_error_stats(data_vio);
1376         complete_data_vio(completion);
1377 }
1378
1379 /**
1380  * get_data_vio_operation_name() - Get the name of the last asynchronous operation performed on a
1381  *                                 data_vio.
1382  */
1383 const char *get_data_vio_operation_name(struct data_vio *data_vio)
1384 {
1385         BUILD_BUG_ON((MAX_VIO_ASYNC_OPERATION_NUMBER - MIN_VIO_ASYNC_OPERATION_NUMBER) !=
1386                      ARRAY_SIZE(ASYNC_OPERATION_NAMES));
1387
1388         return ((data_vio->last_async_operation < MAX_VIO_ASYNC_OPERATION_NUMBER) ?
1389                 ASYNC_OPERATION_NAMES[data_vio->last_async_operation] :
1390                 "unknown async operation");
1391 }
1392
1393 /**
1394  * data_vio_allocate_data_block() - Allocate a data block.
1395  *
1396  * @write_lock_type: The type of write lock to obtain on the block.
1397  * @callback: The callback which will attempt an allocation in the current zone and continue if it
1398  *            succeeds.
1399  * @error_handler: The handler for errors while allocating.
1400  */
1401 void data_vio_allocate_data_block(struct data_vio *data_vio,
1402                                   enum pbn_lock_type write_lock_type,
1403                                   vdo_action_fn callback, vdo_action_fn error_handler)
1404 {
1405         struct allocation *allocation = &data_vio->allocation;
1406
1407         VDO_ASSERT_LOG_ONLY((allocation->pbn == VDO_ZERO_BLOCK),
1408                             "data_vio does not have an allocation");
1409         allocation->write_lock_type = write_lock_type;
1410         allocation->zone = vdo_get_next_allocation_zone(data_vio->logical.zone);
1411         allocation->first_allocation_zone = allocation->zone->zone_number;
1412
1413         data_vio->vio.completion.error_handler = error_handler;
1414         launch_data_vio_allocated_zone_callback(data_vio, callback);
1415 }
1416
1417 /**
1418  * release_data_vio_allocation_lock() - Release the PBN lock on a data_vio's allocated block.
1419  * @reset: If true, the allocation will be reset (i.e. any allocated pbn will be forgotten).
1420  *
1421  * If the reference to the locked block is still provisional, it will be released as well.
1422  */
1423 void release_data_vio_allocation_lock(struct data_vio *data_vio, bool reset)
1424 {
1425         struct allocation *allocation = &data_vio->allocation;
1426         physical_block_number_t locked_pbn = allocation->pbn;
1427
1428         assert_data_vio_in_allocated_zone(data_vio);
1429
1430         if (reset || vdo_pbn_lock_has_provisional_reference(allocation->lock))
1431                 allocation->pbn = VDO_ZERO_BLOCK;
1432
1433         vdo_release_physical_zone_pbn_lock(allocation->zone, locked_pbn,
1434                                            vdo_forget(allocation->lock));
1435 }
1436
1437 /**
1438  * uncompress_data_vio() - Uncompress the data a data_vio has just read.
1439  * @mapping_state: The mapping state indicating which fragment to decompress.
1440  * @buffer: The buffer to receive the uncompressed data.
1441  */
1442 int uncompress_data_vio(struct data_vio *data_vio,
1443                         enum block_mapping_state mapping_state, char *buffer)
1444 {
1445         int size;
1446         u16 fragment_offset, fragment_size;
1447         struct compressed_block *block = data_vio->compression.block;
1448         int result = vdo_get_compressed_block_fragment(mapping_state, block,
1449                                                        &fragment_offset, &fragment_size);
1450
1451         if (result != VDO_SUCCESS) {
1452                 vdo_log_debug("%s: compressed fragment error %d", __func__, result);
1453                 return result;
1454         }
1455
1456         size = LZ4_decompress_safe((block->data + fragment_offset), buffer,
1457                                    fragment_size, VDO_BLOCK_SIZE);
1458         if (size != VDO_BLOCK_SIZE) {
1459                 vdo_log_debug("%s: lz4 error", __func__);
1460                 return VDO_INVALID_FRAGMENT;
1461         }
1462
1463         return VDO_SUCCESS;
1464 }
1465
1466 /**
1467  * modify_for_partial_write() - Do the modify-write part of a read-modify-write cycle.
1468  * @completion: The data_vio which has just finished its read.
1469  *
1470  * This callback is registered in read_block().
1471  */
1472 static void modify_for_partial_write(struct vdo_completion *completion)
1473 {
1474         struct data_vio *data_vio = as_data_vio(completion);
1475         char *data = data_vio->vio.data;
1476         struct bio *bio = data_vio->user_bio;
1477
1478         assert_data_vio_on_cpu_thread(data_vio);
1479
1480         if (bio_op(bio) == REQ_OP_DISCARD) {
1481                 memset(data + data_vio->offset, '\0', min_t(u32,
1482                                                             data_vio->remaining_discard,
1483                                                             VDO_BLOCK_SIZE - data_vio->offset));
1484         } else {
1485                 copy_from_bio(bio, data + data_vio->offset);
1486         }
1487
1488         data_vio->is_zero = is_zero_block(data);
1489         data_vio->read = false;
1490         launch_data_vio_logical_callback(data_vio,
1491                                          continue_data_vio_with_block_map_slot);
1492 }
1493
1494 static void complete_read(struct vdo_completion *completion)
1495 {
1496         struct data_vio *data_vio = as_data_vio(completion);
1497         char *data = data_vio->vio.data;
1498         bool compressed = vdo_is_state_compressed(data_vio->mapped.state);
1499
1500         assert_data_vio_on_cpu_thread(data_vio);
1501
1502         if (compressed) {
1503                 int result = uncompress_data_vio(data_vio, data_vio->mapped.state, data);
1504
1505                 if (result != VDO_SUCCESS) {
1506                         continue_data_vio_with_error(data_vio, result);
1507                         return;
1508                 }
1509         }
1510
1511         if (data_vio->write) {
1512                 modify_for_partial_write(completion);
1513                 return;
1514         }
1515
1516         if (compressed || data_vio->is_partial)
1517                 copy_to_bio(data_vio->user_bio, data + data_vio->offset);
1518
1519         acknowledge_data_vio(data_vio);
1520         complete_data_vio(completion);
1521 }
1522
1523 static void read_endio(struct bio *bio)
1524 {
1525         struct data_vio *data_vio = vio_as_data_vio(bio->bi_private);
1526         int result = blk_status_to_errno(bio->bi_status);
1527
1528         vdo_count_completed_bios(bio);
1529         if (result != VDO_SUCCESS) {
1530                 continue_data_vio_with_error(data_vio, result);
1531                 return;
1532         }
1533
1534         launch_data_vio_cpu_callback(data_vio, complete_read,
1535                                      CPU_Q_COMPLETE_READ_PRIORITY);
1536 }
1537
1538 static void complete_zero_read(struct vdo_completion *completion)
1539 {
1540         struct data_vio *data_vio = as_data_vio(completion);
1541
1542         assert_data_vio_on_cpu_thread(data_vio);
1543
1544         if (data_vio->is_partial) {
1545                 memset(data_vio->vio.data, 0, VDO_BLOCK_SIZE);
1546                 if (data_vio->write) {
1547                         modify_for_partial_write(completion);
1548                         return;
1549                 }
1550         } else {
1551                 zero_fill_bio(data_vio->user_bio);
1552         }
1553
1554         complete_read(completion);
1555 }
1556
1557 /**
1558  * read_block() - Read a block asynchronously.
1559  *
1560  * This is the callback registered in read_block_mapping().
1561  */
1562 static void read_block(struct vdo_completion *completion)
1563 {
1564         struct data_vio *data_vio = as_data_vio(completion);
1565         struct vio *vio = as_vio(completion);
1566         int result = VDO_SUCCESS;
1567
1568         if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) {
1569                 launch_data_vio_cpu_callback(data_vio, complete_zero_read,
1570                                              CPU_Q_COMPLETE_VIO_PRIORITY);
1571                 return;
1572         }
1573
1574         data_vio->last_async_operation = VIO_ASYNC_OP_READ_DATA_VIO;
1575         if (vdo_is_state_compressed(data_vio->mapped.state)) {
1576                 result = vio_reset_bio(vio, (char *) data_vio->compression.block,
1577                                        read_endio, REQ_OP_READ, data_vio->mapped.pbn);
1578         } else {
1579                 blk_opf_t opf = ((data_vio->user_bio->bi_opf & PASSTHROUGH_FLAGS) | REQ_OP_READ);
1580
1581                 if (data_vio->is_partial) {
1582                         result = vio_reset_bio(vio, vio->data, read_endio, opf,
1583                                                data_vio->mapped.pbn);
1584                 } else {
1585                         /* A full 4k read. Use the incoming bio to avoid having to copy the data */
1586                         bio_reset(vio->bio, vio->bio->bi_bdev, opf);
1587                         bio_init_clone(data_vio->user_bio->bi_bdev, vio->bio,
1588                                        data_vio->user_bio, GFP_KERNEL);
1589
1590                         /* Copy over the original bio iovec and opflags. */
1591                         vdo_set_bio_properties(vio->bio, vio, read_endio, opf,
1592                                                data_vio->mapped.pbn);
1593                 }
1594         }
1595
1596         if (result != VDO_SUCCESS) {
1597                 continue_data_vio_with_error(data_vio, result);
1598                 return;
1599         }
1600
1601         vdo_submit_data_vio(data_vio);
1602 }
1603
1604 static inline struct data_vio *
1605 reference_count_update_completion_as_data_vio(struct vdo_completion *completion)
1606 {
1607         if (completion->type == VIO_COMPLETION)
1608                 return as_data_vio(completion);
1609
1610         return container_of(completion, struct data_vio, decrement_completion);
1611 }
1612
1613 /**
1614  * update_block_map() - Rendezvous of the data_vio and decrement completions after each has
1615  *                      made its reference updates. Handle any error from either, or proceed
1616  *                      to updating the block map.
1617  * @completion: The completion of the write in progress.
1618  */
1619 static void update_block_map(struct vdo_completion *completion)
1620 {
1621         struct data_vio *data_vio = reference_count_update_completion_as_data_vio(completion);
1622
1623         assert_data_vio_in_logical_zone(data_vio);
1624
1625         if (!data_vio->first_reference_operation_complete) {
1626                 /* Rendezvous, we're first */
1627                 data_vio->first_reference_operation_complete = true;
1628                 return;
1629         }
1630
1631         completion = &data_vio->vio.completion;
1632         vdo_set_completion_result(completion, data_vio->decrement_completion.result);
1633         if (completion->result != VDO_SUCCESS) {
1634                 handle_data_vio_error(completion);
1635                 return;
1636         }
1637
1638         completion->error_handler = handle_data_vio_error;
1639         if (data_vio->hash_lock != NULL)
1640                 set_data_vio_hash_zone_callback(data_vio, vdo_continue_hash_lock);
1641         else
1642                 completion->callback = complete_data_vio;
1643
1644         data_vio->last_async_operation = VIO_ASYNC_OP_PUT_MAPPED_BLOCK;
1645         vdo_put_mapped_block(data_vio);
1646 }
1647
1648 static void decrement_reference_count(struct vdo_completion *completion)
1649 {
1650         struct data_vio *data_vio = container_of(completion, struct data_vio,
1651                                                  decrement_completion);
1652
1653         assert_data_vio_in_mapped_zone(data_vio);
1654
1655         vdo_set_completion_callback(completion, update_block_map,
1656                                     data_vio->logical.zone->thread_id);
1657         completion->error_handler = update_block_map;
1658         vdo_modify_reference_count(completion, &data_vio->decrement_updater);
1659 }
1660
1661 static void increment_reference_count(struct vdo_completion *completion)
1662 {
1663         struct data_vio *data_vio = as_data_vio(completion);
1664
1665         assert_data_vio_in_new_mapped_zone(data_vio);
1666
1667         if (data_vio->downgrade_allocation_lock) {
1668                 /*
1669                  * Now that the data has been written, it's safe to deduplicate against the
1670                  * block. Downgrade the allocation lock to a read lock so it can be used later by
1671                  * the hash lock. This is done here since it needs to happen sometime before we
1672                  * return to the hash zone, and we are currently on the correct thread. For
1673                  * compressed blocks, the downgrade will have already been done.
1674                  */
1675                 vdo_downgrade_pbn_write_lock(data_vio->allocation.lock, false);
1676         }
1677
1678         set_data_vio_logical_callback(data_vio, update_block_map);
1679         completion->error_handler = update_block_map;
1680         vdo_modify_reference_count(completion, &data_vio->increment_updater);
1681 }
1682
1683 /** journal_remapping() - Add a recovery journal entry for a data remapping. */
1684 static void journal_remapping(struct vdo_completion *completion)
1685 {
1686         struct data_vio *data_vio = as_data_vio(completion);
1687
1688         assert_data_vio_in_journal_zone(data_vio);
1689
1690         data_vio->decrement_updater.operation = VDO_JOURNAL_DATA_REMAPPING;
1691         data_vio->decrement_updater.zpbn = data_vio->mapped;
1692         if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) {
1693                 data_vio->first_reference_operation_complete = true;
1694                 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK)
1695                         set_data_vio_logical_callback(data_vio, update_block_map);
1696         } else {
1697                 set_data_vio_new_mapped_zone_callback(data_vio,
1698                                                       increment_reference_count);
1699         }
1700
1701         if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) {
1702                 data_vio->first_reference_operation_complete = true;
1703         } else {
1704                 vdo_set_completion_callback(&data_vio->decrement_completion,
1705                                             decrement_reference_count,
1706                                             data_vio->mapped.zone->thread_id);
1707         }
1708
1709         data_vio->last_async_operation = VIO_ASYNC_OP_JOURNAL_REMAPPING;
1710         vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio);
1711 }
1712
1713 /**
1714  * read_old_block_mapping() - Get the previous PBN/LBN mapping of an in-progress write.
1715  *
1716  * Gets the previous PBN mapped to this LBN from the block map, so as to make an appropriate
1717  * journal entry referencing the removal of this LBN->PBN mapping.
1718  */
1719 static void read_old_block_mapping(struct vdo_completion *completion)
1720 {
1721         struct data_vio *data_vio = as_data_vio(completion);
1722
1723         assert_data_vio_in_logical_zone(data_vio);
1724
1725         data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_WRITE;
1726         set_data_vio_journal_callback(data_vio, journal_remapping);
1727         vdo_get_mapped_block(data_vio);
1728 }
1729
1730 void update_metadata_for_data_vio_write(struct data_vio *data_vio, struct pbn_lock *lock)
1731 {
1732         data_vio->increment_updater = (struct reference_updater) {
1733                 .operation = VDO_JOURNAL_DATA_REMAPPING,
1734                 .increment = true,
1735                 .zpbn = data_vio->new_mapped,
1736                 .lock = lock,
1737         };
1738
1739         launch_data_vio_logical_callback(data_vio, read_old_block_mapping);
1740 }
1741
1742 /**
1743  * pack_compressed_data() - Attempt to pack the compressed data_vio into a block.
1744  *
1745  * This is the callback registered in launch_compress_data_vio().
1746  */
1747 static void pack_compressed_data(struct vdo_completion *completion)
1748 {
1749         struct data_vio *data_vio = as_data_vio(completion);
1750
1751         assert_data_vio_in_packer_zone(data_vio);
1752
1753         if (!vdo_get_compressing(vdo_from_data_vio(data_vio)) ||
1754             get_data_vio_compression_status(data_vio).may_not_compress) {
1755                 write_data_vio(data_vio);
1756                 return;
1757         }
1758
1759         data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_PACKING;
1760         vdo_attempt_packing(data_vio);
1761 }
1762
1763 /**
1764  * compress_data_vio() - Do the actual work of compressing the data on a CPU queue.
1765  *
1766  * This callback is registered in launch_compress_data_vio().
1767  */
1768 static void compress_data_vio(struct vdo_completion *completion)
1769 {
1770         struct data_vio *data_vio = as_data_vio(completion);
1771         int size;
1772
1773         assert_data_vio_on_cpu_thread(data_vio);
1774
1775         /*
1776          * By putting the compressed data at the start of the compressed block data field, we won't
1777          * need to copy it if this data_vio becomes a compressed write agent.
1778          */
1779         size = LZ4_compress_default(data_vio->vio.data,
1780                                     data_vio->compression.block->data, VDO_BLOCK_SIZE,
1781                                     VDO_MAX_COMPRESSED_FRAGMENT_SIZE,
1782                                     (char *) vdo_get_work_queue_private_data());
1783         if ((size > 0) && (size < VDO_COMPRESSED_BLOCK_DATA_SIZE)) {
1784                 data_vio->compression.size = size;
1785                 launch_data_vio_packer_callback(data_vio, pack_compressed_data);
1786                 return;
1787         }
1788
1789         write_data_vio(data_vio);
1790 }
1791
1792 /**
1793  * launch_compress_data_vio() - Continue a write by attempting to compress the data.
1794  *
1795  * This is a re-entry point to vio_write used by hash locks.
1796  */
1797 void launch_compress_data_vio(struct data_vio *data_vio)
1798 {
1799         VDO_ASSERT_LOG_ONLY(!data_vio->is_duplicate, "compressing a non-duplicate block");
1800         VDO_ASSERT_LOG_ONLY(data_vio->hash_lock != NULL,
1801                             "data_vio to compress has a hash_lock");
1802         VDO_ASSERT_LOG_ONLY(data_vio_has_allocation(data_vio),
1803                             "data_vio to compress has an allocation");
1804
1805         /*
1806          * There are 4 reasons why a data_vio which has reached this point will not be eligible for
1807          * compression:
1808          *
1809          * 1) Since data_vios can block indefinitely in the packer, it would be bad to do so if the
1810          * write request also requests FUA.
1811          *
1812          * 2) A data_vio should not be compressed when compression is disabled for the vdo.
1813          *
1814          * 3) A data_vio could be doing a partial write on behalf of a larger discard which has not
1815          * yet been acknowledged and hence blocking in the packer would be bad.
1816          *
1817          * 4) Some other data_vio may be waiting on this data_vio in which case blocking in the
1818          * packer would also be bad.
1819          */
1820         if (data_vio->fua ||
1821             !vdo_get_compressing(vdo_from_data_vio(data_vio)) ||
1822             ((data_vio->user_bio != NULL) && (bio_op(data_vio->user_bio) == REQ_OP_DISCARD)) ||
1823             (advance_data_vio_compression_stage(data_vio).stage != DATA_VIO_COMPRESSING)) {
1824                 write_data_vio(data_vio);
1825                 return;
1826         }
1827
1828         data_vio->last_async_operation = VIO_ASYNC_OP_COMPRESS_DATA_VIO;
1829         launch_data_vio_cpu_callback(data_vio, compress_data_vio,
1830                                      CPU_Q_COMPRESS_BLOCK_PRIORITY);
1831 }
1832
1833 /**
1834  * hash_data_vio() - Hash the data in a data_vio and set the hash zone (which also flags the record
1835  *                   name as set).
1836
1837  * This callback is registered in prepare_for_dedupe().
1838  */
1839 static void hash_data_vio(struct vdo_completion *completion)
1840 {
1841         struct data_vio *data_vio = as_data_vio(completion);
1842
1843         assert_data_vio_on_cpu_thread(data_vio);
1844         VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "zero blocks should not be hashed");
1845
1846         murmurhash3_128(data_vio->vio.data, VDO_BLOCK_SIZE, 0x62ea60be,
1847                         &data_vio->record_name);
1848
1849         data_vio->hash_zone = vdo_select_hash_zone(vdo_from_data_vio(data_vio)->hash_zones,
1850                                                    &data_vio->record_name);
1851         data_vio->last_async_operation = VIO_ASYNC_OP_ACQUIRE_VDO_HASH_LOCK;
1852         launch_data_vio_hash_zone_callback(data_vio, vdo_acquire_hash_lock);
1853 }
1854
1855 /** prepare_for_dedupe() - Prepare for the dedupe path after attempting to get an allocation. */
1856 static void prepare_for_dedupe(struct data_vio *data_vio)
1857 {
1858         /* We don't care what thread we are on. */
1859         VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "must not prepare to dedupe zero blocks");
1860
1861         /*
1862          * Before we can dedupe, we need to know the record name, so the first
1863          * step is to hash the block data.
1864          */
1865         data_vio->last_async_operation = VIO_ASYNC_OP_HASH_DATA_VIO;
1866         launch_data_vio_cpu_callback(data_vio, hash_data_vio, CPU_Q_HASH_BLOCK_PRIORITY);
1867 }
1868
1869 /**
1870  * write_bio_finished() - This is the bio_end_io function registered in write_block() to be called
1871  *                        when a data_vio's write to the underlying storage has completed.
1872  */
1873 static void write_bio_finished(struct bio *bio)
1874 {
1875         struct data_vio *data_vio = vio_as_data_vio((struct vio *) bio->bi_private);
1876
1877         vdo_count_completed_bios(bio);
1878         vdo_set_completion_result(&data_vio->vio.completion,
1879                                   blk_status_to_errno(bio->bi_status));
1880         data_vio->downgrade_allocation_lock = true;
1881         update_metadata_for_data_vio_write(data_vio, data_vio->allocation.lock);
1882 }
1883
1884 /** write_data_vio() - Write a data block to storage without compression. */
1885 void write_data_vio(struct data_vio *data_vio)
1886 {
1887         struct data_vio_compression_status status, new_status;
1888         int result;
1889
1890         if (!data_vio_has_allocation(data_vio)) {
1891                 /*
1892                  * There was no space to write this block and we failed to deduplicate or compress
1893                  * it.
1894                  */
1895                 continue_data_vio_with_error(data_vio, VDO_NO_SPACE);
1896                 return;
1897         }
1898
1899         new_status = (struct data_vio_compression_status) {
1900                 .stage = DATA_VIO_POST_PACKER,
1901                 .may_not_compress = true,
1902         };
1903
1904         do {
1905                 status = get_data_vio_compression_status(data_vio);
1906         } while ((status.stage != DATA_VIO_POST_PACKER) &&
1907                  !set_data_vio_compression_status(data_vio, status, new_status));
1908
1909         /* Write the data from the data block buffer. */
1910         result = vio_reset_bio(&data_vio->vio, data_vio->vio.data,
1911                                write_bio_finished, REQ_OP_WRITE,
1912                                data_vio->allocation.pbn);
1913         if (result != VDO_SUCCESS) {
1914                 continue_data_vio_with_error(data_vio, result);
1915                 return;
1916         }
1917
1918         data_vio->last_async_operation = VIO_ASYNC_OP_WRITE_DATA_VIO;
1919         vdo_submit_data_vio(data_vio);
1920 }
1921
1922 /**
1923  * acknowledge_write_callback() - Acknowledge a write to the requestor.
1924  *
1925  * This callback is registered in allocate_block() and continue_write_with_block_map_slot().
1926  */
1927 static void acknowledge_write_callback(struct vdo_completion *completion)
1928 {
1929         struct data_vio *data_vio = as_data_vio(completion);
1930         struct vdo *vdo = completion->vdo;
1931
1932         VDO_ASSERT_LOG_ONLY((!vdo_uses_bio_ack_queue(vdo) ||
1933                              (vdo_get_callback_thread_id() == vdo->thread_config.bio_ack_thread)),
1934                             "%s() called on bio ack queue", __func__);
1935         VDO_ASSERT_LOG_ONLY(data_vio_has_flush_generation_lock(data_vio),
1936                             "write VIO to be acknowledged has a flush generation lock");
1937         acknowledge_data_vio(data_vio);
1938         if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) {
1939                 /* This is a zero write or discard */
1940                 update_metadata_for_data_vio_write(data_vio, NULL);
1941                 return;
1942         }
1943
1944         prepare_for_dedupe(data_vio);
1945 }
1946
1947 /**
1948  * allocate_block() - Attempt to allocate a block in the current allocation zone.
1949  *
1950  * This callback is registered in continue_write_with_block_map_slot().
1951  */
1952 static void allocate_block(struct vdo_completion *completion)
1953 {
1954         struct data_vio *data_vio = as_data_vio(completion);
1955
1956         assert_data_vio_in_allocated_zone(data_vio);
1957
1958         if (!vdo_allocate_block_in_zone(data_vio))
1959                 return;
1960
1961         completion->error_handler = handle_data_vio_error;
1962         WRITE_ONCE(data_vio->allocation_succeeded, true);
1963         data_vio->new_mapped = (struct zoned_pbn) {
1964                 .zone = data_vio->allocation.zone,
1965                 .pbn = data_vio->allocation.pbn,
1966                 .state = VDO_MAPPING_STATE_UNCOMPRESSED,
1967         };
1968
1969         if (data_vio->fua) {
1970                 prepare_for_dedupe(data_vio);
1971                 return;
1972         }
1973
1974         data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE;
1975         launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback);
1976 }
1977
1978 /**
1979  * handle_allocation_error() - Handle an error attempting to allocate a block.
1980  *
1981  * This error handler is registered in continue_write_with_block_map_slot().
1982  */
1983 static void handle_allocation_error(struct vdo_completion *completion)
1984 {
1985         struct data_vio *data_vio = as_data_vio(completion);
1986
1987         if (completion->result == VDO_NO_SPACE) {
1988                 /* We failed to get an allocation, but we can try to dedupe. */
1989                 vdo_reset_completion(completion);
1990                 completion->error_handler = handle_data_vio_error;
1991                 prepare_for_dedupe(data_vio);
1992                 return;
1993         }
1994
1995         /* We got a "real" error, not just a failure to allocate, so fail the request. */
1996         handle_data_vio_error(completion);
1997 }
1998
1999 static int assert_is_discard(struct data_vio *data_vio)
2000 {
2001         int result = VDO_ASSERT(data_vio->is_discard,
2002                                 "data_vio with no block map page is a discard");
2003
2004         return ((result == VDO_SUCCESS) ? result : VDO_READ_ONLY);
2005 }
2006
2007 /**
2008  * continue_data_vio_with_block_map_slot() - Read the data_vio's mapping from the block map.
2009  *
2010  * This callback is registered in launch_read_data_vio().
2011  */
2012 void continue_data_vio_with_block_map_slot(struct vdo_completion *completion)
2013 {
2014         struct data_vio *data_vio = as_data_vio(completion);
2015
2016         assert_data_vio_in_logical_zone(data_vio);
2017         if (data_vio->read) {
2018                 set_data_vio_logical_callback(data_vio, read_block);
2019                 data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_READ;
2020                 vdo_get_mapped_block(data_vio);
2021                 return;
2022         }
2023
2024         vdo_acquire_flush_generation_lock(data_vio);
2025
2026         if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) {
2027                 /*
2028                  * This is a discard for a block on a block map page which has not been allocated, so
2029                  * there's nothing more we need to do.
2030                  */
2031                 completion->callback = complete_data_vio;
2032                 continue_data_vio_with_error(data_vio, assert_is_discard(data_vio));
2033                 return;
2034         }
2035
2036         /*
2037          * We need an allocation if this is neither a full-block discard nor a
2038          * full-block zero write.
2039          */
2040         if (!data_vio->is_zero && (!data_vio->is_discard || data_vio->is_partial)) {
2041                 data_vio_allocate_data_block(data_vio, VIO_WRITE_LOCK, allocate_block,
2042                                              handle_allocation_error);
2043                 return;
2044         }
2045
2046
2047         /*
2048          * We don't need to write any data, so skip allocation and just update the block map and
2049          * reference counts (via the journal).
2050          */
2051         data_vio->new_mapped.pbn = VDO_ZERO_BLOCK;
2052         if (data_vio->is_zero)
2053                 data_vio->new_mapped.state = VDO_MAPPING_STATE_UNCOMPRESSED;
2054
2055         if (data_vio->remaining_discard > VDO_BLOCK_SIZE) {
2056                 /* This is not the final block of a discard so we can't acknowledge it yet. */
2057                 update_metadata_for_data_vio_write(data_vio, NULL);
2058                 return;
2059         }
2060
2061         data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE;
2062         launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback);
2063 }
This page took 0.159321 seconds and 4 git commands to generate.