]> Git Repo - linux.git/blob - drivers/md/dm-vdo/dedupe.c
Merge tag 'kbuild-v6.9' of git://git.kernel.org/pub/scm/linux/kernel/git/masahiroy...
[linux.git] / drivers / md / dm-vdo / dedupe.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5
6 /**
7  * DOC:
8  *
9  * Hash Locks:
10  *
11  * A hash_lock controls and coordinates writing, index access, and dedupe among groups of data_vios
12  * concurrently writing identical blocks, allowing them to deduplicate not only against advice but
13  * also against each other. This saves on index queries and allows those data_vios to concurrently
14  * deduplicate against a single block instead of being serialized through a PBN read lock. Only one
15  * index query is needed for each hash_lock, instead of one for every data_vio.
16  *
17  * Hash_locks are assigned to hash_zones by computing a modulus on the hash itself. Each hash_zone
18  * has a single dedicated queue and thread for performing all operations on the hash_locks assigned
19  * to that zone. The concurrency guarantees of this single-threaded model allow the code to omit
20  * more fine-grained locking for the hash_lock structures.
21  *
22  * A hash_lock acts like a state machine perhaps more than as a lock. Other than the starting and
23  * ending states INITIALIZING and BYPASSING, every state represents and is held for the duration of
24  * an asynchronous operation. All state transitions are performed on the thread of the hash_zone
25  * containing the lock. An asynchronous operation is almost always performed upon entering a state,
26  * and the callback from that operation triggers exiting the state and entering a new state.
27  *
28  * In all states except DEDUPING, there is a single data_vio, called the lock agent, performing the
29  * asynchronous operations on behalf of the lock. The agent will change during the lifetime of the
30  * lock if the lock is shared by more than one data_vio. data_vios waiting to deduplicate are kept
31  * on a wait queue. Viewed a different way, the agent holds the lock exclusively until the lock
32  * enters the DEDUPING state, at which point it becomes a shared lock that all the waiters (and any
33  * new data_vios that arrive) use to share a PBN lock. In state DEDUPING, there is no agent. When
34  * the last data_vio in the lock calls back in DEDUPING, it becomes the agent and the lock becomes
35  * exclusive again. New data_vios that arrive in the lock will also go on the wait queue.
36  *
37  * The existence of lock waiters is a key factor controlling which state the lock transitions to
38  * next. When the lock is new or has waiters, it will always try to reach DEDUPING, and when it
39  * doesn't, it will try to clean up and exit.
40  *
41  * Deduping requires holding a PBN lock on a block that is known to contain data identical to the
42  * data_vios in the lock, so the lock will send the agent to the duplicate zone to acquire the PBN
43  * lock (LOCKING), to the kernel I/O threads to read and verify the data (VERIFYING), or to write a
44  * new copy of the data to a full data block or a slot in a compressed block (WRITING).
45  *
46  * Cleaning up consists of updating the index when the data location is different from the initial
47  * index query (UPDATING, triggered by stale advice, compression, and rollover), releasing the PBN
48  * lock on the duplicate block (UNLOCKING), and if the agent is the last data_vio referencing the
49  * lock, releasing the hash_lock itself back to the hash zone (BYPASSING).
50  *
51  * The shortest sequence of states is for non-concurrent writes of new data:
52  *   INITIALIZING -> QUERYING -> WRITING -> BYPASSING
53  * This sequence is short because no PBN read lock or index update is needed.
54  *
55  * Non-concurrent, finding valid advice looks like this (endpoints elided):
56  *   -> QUERYING -> LOCKING -> VERIFYING -> DEDUPING -> UNLOCKING ->
57  * Or with stale advice (endpoints elided):
58  *   -> QUERYING -> LOCKING -> VERIFYING -> UNLOCKING -> WRITING -> UPDATING ->
59  *
60  * When there are not enough available reference count increments available on a PBN for a data_vio
61  * to deduplicate, a new lock is forked and the excess waiters roll over to the new lock (which
62  * goes directly to WRITING). The new lock takes the place of the old lock in the lock map so new
63  * data_vios will be directed to it. The two locks will proceed independently, but only the new
64  * lock will have the right to update the index (unless it also forks).
65  *
66  * Since rollover happens in a lock instance, once a valid data location has been selected, it will
67  * not change. QUERYING and WRITING are only performed once per lock lifetime. All other
68  * non-endpoint states can be re-entered.
69  *
70  * The function names in this module follow a convention referencing the states and transitions in
71  * the state machine. For example, for the LOCKING state, there are start_locking() and
72  * finish_locking() functions.  start_locking() is invoked by the finish function of the state (or
73  * states) that transition to LOCKING. It performs the actual lock state change and must be invoked
74  * on the hash zone thread.  finish_locking() is called by (or continued via callback from) the
75  * code actually obtaining the lock. It does any bookkeeping or decision-making required and
76  * invokes the appropriate start function of the state being transitioned to after LOCKING.
77  *
78  * ----------------------------------------------------------------------
79  *
80  * Index Queries:
81  *
82  * A query to the UDS index is handled asynchronously by the index's threads. When the query is
83  * complete, a callback supplied with the query will be called from one of the those threads. Under
84  * heavy system load, the index may be slower to respond than is desirable for reasonable I/O
85  * throughput. Since deduplication of writes is not necessary for correct operation of a VDO
86  * device, it is acceptable to timeout out slow index queries and proceed to fulfill a write
87  * request without deduplicating. However, because the uds_request struct itself is supplied by the
88  * caller, we can not simply reuse a uds_request object which we have chosen to timeout. Hence,
89  * each hash_zone maintains a pool of dedupe_contexts which each contain a uds_request along with a
90  * reference to the data_vio on behalf of which they are performing a query.
91  *
92  * When a hash_lock needs to query the index, it attempts to acquire an unused dedupe_context from
93  * its hash_zone's pool. If one is available, that context is prepared, associated with the
94  * hash_lock's agent, added to the list of pending contexts, and then sent to the index. The
95  * context's state will be transitioned from DEDUPE_CONTEXT_IDLE to DEDUPE_CONTEXT_PENDING. If all
96  * goes well, the dedupe callback will be called by the index which will change the context's state
97  * to DEDUPE_CONTEXT_COMPLETE, and the associated data_vio will be enqueued to run back in the hash
98  * zone where the query results will be processed and the context will be put back in the idle
99  * state and returned to the hash_zone's available list.
100  *
101  * The first time an index query is launched from a given hash_zone, a timer is started. When the
102  * timer fires, the hash_zone's completion is enqueued to run in the hash_zone where the zone's
103  * pending list will be searched for any contexts in the pending state which have been running for
104  * too long. Those contexts are transitioned to the DEDUPE_CONTEXT_TIMED_OUT state and moved to the
105  * zone's timed_out list where they won't be examined again if there is a subsequent time out). The
106  * data_vios associated with timed out contexts are sent to continue processing their write
107  * operation without deduplicating. The timer is also restarted.
108  *
109  * When the dedupe callback is run for a context which is in the timed out state, that context is
110  * moved to the DEDUPE_CONTEXT_TIMED_OUT_COMPLETE state. No other action need be taken as the
111  * associated data_vios have already been dispatched.
112  *
113  * If a hash_lock needs a dedupe context, and the available list is empty, the timed_out list will
114  * be searched for any contexts which are timed out and complete. One of these will be used
115  * immediately, and the rest will be returned to the available list and marked idle.
116  */
117
118 #include "dedupe.h"
119
120 #include <linux/atomic.h>
121 #include <linux/jiffies.h>
122 #include <linux/kernel.h>
123 #include <linux/list.h>
124 #include <linux/ratelimit.h>
125 #include <linux/spinlock.h>
126 #include <linux/timer.h>
127
128 #include "logger.h"
129 #include "memory-alloc.h"
130 #include "numeric.h"
131 #include "permassert.h"
132 #include "string-utils.h"
133
134 #include "indexer.h"
135
136 #include "action-manager.h"
137 #include "admin-state.h"
138 #include "completion.h"
139 #include "constants.h"
140 #include "data-vio.h"
141 #include "int-map.h"
142 #include "io-submitter.h"
143 #include "packer.h"
144 #include "physical-zone.h"
145 #include "slab-depot.h"
146 #include "statistics.h"
147 #include "types.h"
148 #include "vdo.h"
149 #include "wait-queue.h"
150
151 struct uds_attribute {
152         struct attribute attr;
153         const char *(*show_string)(struct hash_zones *hash_zones);
154 };
155
156 #define DEDUPE_QUERY_TIMER_IDLE 0
157 #define DEDUPE_QUERY_TIMER_RUNNING 1
158 #define DEDUPE_QUERY_TIMER_FIRED 2
159
160 enum dedupe_context_state {
161         DEDUPE_CONTEXT_IDLE,
162         DEDUPE_CONTEXT_PENDING,
163         DEDUPE_CONTEXT_TIMED_OUT,
164         DEDUPE_CONTEXT_COMPLETE,
165         DEDUPE_CONTEXT_TIMED_OUT_COMPLETE,
166 };
167
168 /* Possible index states: closed, opened, or transitioning between those two. */
169 enum index_state {
170         IS_CLOSED,
171         IS_CHANGING,
172         IS_OPENED,
173 };
174
175 static const char *CLOSED = "closed";
176 static const char *CLOSING = "closing";
177 static const char *ERROR = "error";
178 static const char *OFFLINE = "offline";
179 static const char *ONLINE = "online";
180 static const char *OPENING = "opening";
181 static const char *SUSPENDED = "suspended";
182 static const char *UNKNOWN = "unknown";
183
184 /* Version 2 uses the kernel space UDS index and is limited to 16 bytes */
185 #define UDS_ADVICE_VERSION 2
186 /* version byte + state byte + 64-bit little-endian PBN */
187 #define UDS_ADVICE_SIZE (1 + 1 + sizeof(u64))
188
189 enum hash_lock_state {
190         /* State for locks that are not in use or are being initialized. */
191         VDO_HASH_LOCK_INITIALIZING,
192
193         /* This is the sequence of states typically used on the non-dedupe path. */
194         VDO_HASH_LOCK_QUERYING,
195         VDO_HASH_LOCK_WRITING,
196         VDO_HASH_LOCK_UPDATING,
197
198         /* The remaining states are typically used on the dedupe path in this order. */
199         VDO_HASH_LOCK_LOCKING,
200         VDO_HASH_LOCK_VERIFYING,
201         VDO_HASH_LOCK_DEDUPING,
202         VDO_HASH_LOCK_UNLOCKING,
203
204         /*
205          * Terminal state for locks returning to the pool. Must be last both because it's the final
206          * state, and also because it's used to count the states.
207          */
208         VDO_HASH_LOCK_BYPASSING,
209 };
210
211 static const char * const LOCK_STATE_NAMES[] = {
212         [VDO_HASH_LOCK_BYPASSING] = "BYPASSING",
213         [VDO_HASH_LOCK_DEDUPING] = "DEDUPING",
214         [VDO_HASH_LOCK_INITIALIZING] = "INITIALIZING",
215         [VDO_HASH_LOCK_LOCKING] = "LOCKING",
216         [VDO_HASH_LOCK_QUERYING] = "QUERYING",
217         [VDO_HASH_LOCK_UNLOCKING] = "UNLOCKING",
218         [VDO_HASH_LOCK_UPDATING] = "UPDATING",
219         [VDO_HASH_LOCK_VERIFYING] = "VERIFYING",
220         [VDO_HASH_LOCK_WRITING] = "WRITING",
221 };
222
223 struct hash_lock {
224         /* The block hash covered by this lock */
225         struct uds_record_name hash;
226
227         /* When the lock is unused, this list entry allows the lock to be pooled */
228         struct list_head pool_node;
229
230         /*
231          * A list containing the data VIOs sharing this lock, all having the same record name and
232          * data block contents, linked by their hash_lock_node fields.
233          */
234         struct list_head duplicate_ring;
235
236         /* The number of data_vios sharing this lock instance */
237         data_vio_count_t reference_count;
238
239         /* The maximum value of reference_count in the lifetime of this lock */
240         data_vio_count_t max_references;
241
242         /* The current state of this lock */
243         enum hash_lock_state state;
244
245         /* True if the UDS index should be updated with new advice */
246         bool update_advice;
247
248         /* True if the advice has been verified to be a true duplicate */
249         bool verified;
250
251         /* True if the lock has already accounted for an initial verification */
252         bool verify_counted;
253
254         /* True if this lock is registered in the lock map (cleared on rollover) */
255         bool registered;
256
257         /*
258          * If verified is false, this is the location of a possible duplicate. If verified is true,
259          * it is the verified location of a true duplicate.
260          */
261         struct zoned_pbn duplicate;
262
263         /* The PBN lock on the block containing the duplicate data */
264         struct pbn_lock *duplicate_lock;
265
266         /* The data_vio designated to act on behalf of the lock */
267         struct data_vio *agent;
268
269         /*
270          * Other data_vios with data identical to the agent who are currently waiting for the agent
271          * to get the information they all need to deduplicate--either against each other, or
272          * against an existing duplicate on disk.
273          */
274         struct vdo_wait_queue waiters;
275 };
276
277 #define LOCK_POOL_CAPACITY MAXIMUM_VDO_USER_VIOS
278
279 struct hash_zones {
280         struct action_manager *manager;
281         struct uds_parameters parameters;
282         struct uds_index_session *index_session;
283         struct ratelimit_state ratelimiter;
284         atomic64_t timeouts;
285         atomic64_t dedupe_context_busy;
286
287         /* This spinlock protects the state fields and the starting of dedupe requests. */
288         spinlock_t lock;
289
290         /* The fields in the next block are all protected by the lock */
291         struct vdo_completion completion;
292         enum index_state index_state;
293         enum index_state index_target;
294         struct admin_state state;
295         bool changing;
296         bool create_flag;
297         bool dedupe_flag;
298         bool error_flag;
299         u64 reported_timeouts;
300
301         /* The number of zones */
302         zone_count_t zone_count;
303         /* The hash zones themselves */
304         struct hash_zone zones[];
305 };
306
307 /* These are in milliseconds. */
308 unsigned int vdo_dedupe_index_timeout_interval = 5000;
309 unsigned int vdo_dedupe_index_min_timer_interval = 100;
310 /* Same two variables, in jiffies for easier consumption. */
311 static u64 vdo_dedupe_index_timeout_jiffies;
312 static u64 vdo_dedupe_index_min_timer_jiffies;
313
314 static inline struct hash_zone *as_hash_zone(struct vdo_completion *completion)
315 {
316         vdo_assert_completion_type(completion, VDO_HASH_ZONE_COMPLETION);
317         return container_of(completion, struct hash_zone, completion);
318 }
319
320 static inline struct hash_zones *as_hash_zones(struct vdo_completion *completion)
321 {
322         vdo_assert_completion_type(completion, VDO_HASH_ZONES_COMPLETION);
323         return container_of(completion, struct hash_zones, completion);
324 }
325
326 static inline void assert_in_hash_zone(struct hash_zone *zone, const char *name)
327 {
328         VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == zone->thread_id),
329                             "%s called on hash zone thread", name);
330 }
331
332 static inline bool change_context_state(struct dedupe_context *context, int old, int new)
333 {
334         return (atomic_cmpxchg(&context->state, old, new) == old);
335 }
336
337 static inline bool change_timer_state(struct hash_zone *zone, int old, int new)
338 {
339         return (atomic_cmpxchg(&zone->timer_state, old, new) == old);
340 }
341
342 /**
343  * return_hash_lock_to_pool() - (Re)initialize a hash lock and return it to its pool.
344  * @zone: The zone from which the lock was borrowed.
345  * @lock: The lock that is no longer in use.
346  */
347 static void return_hash_lock_to_pool(struct hash_zone *zone, struct hash_lock *lock)
348 {
349         memset(lock, 0, sizeof(*lock));
350         INIT_LIST_HEAD(&lock->pool_node);
351         INIT_LIST_HEAD(&lock->duplicate_ring);
352         vdo_waitq_init(&lock->waiters);
353         list_add_tail(&lock->pool_node, &zone->lock_pool);
354 }
355
356 /**
357  * vdo_get_duplicate_lock() - Get the PBN lock on the duplicate data location for a data_vio from
358  *                            the hash_lock the data_vio holds (if there is one).
359  * @data_vio: The data_vio to query.
360  *
361  * Return: The PBN lock on the data_vio's duplicate location.
362  */
363 struct pbn_lock *vdo_get_duplicate_lock(struct data_vio *data_vio)
364 {
365         if (data_vio->hash_lock == NULL)
366                 return NULL;
367
368         return data_vio->hash_lock->duplicate_lock;
369 }
370
371 /**
372  * hash_lock_key() - Return hash_lock's record name as a hash code.
373  * @lock: The hash lock.
374  *
375  * Return: The key to use for the int map.
376  */
377 static inline u64 hash_lock_key(struct hash_lock *lock)
378 {
379         return get_unaligned_le64(&lock->hash.name);
380 }
381
382 /**
383  * get_hash_lock_state_name() - Get the string representation of a hash lock state.
384  * @state: The hash lock state.
385  *
386  * Return: The short string representing the state
387  */
388 static const char *get_hash_lock_state_name(enum hash_lock_state state)
389 {
390         /* Catch if a state has been added without updating the name array. */
391         BUILD_BUG_ON((VDO_HASH_LOCK_BYPASSING + 1) != ARRAY_SIZE(LOCK_STATE_NAMES));
392         return (state < ARRAY_SIZE(LOCK_STATE_NAMES)) ? LOCK_STATE_NAMES[state] : "INVALID";
393 }
394
395 /**
396  * assert_hash_lock_agent() - Assert that a data_vio is the agent of its hash lock, and that this
397  *                            is being called in the hash zone.
398  * @data_vio: The data_vio expected to be the lock agent.
399  * @where: A string describing the function making the assertion.
400  */
401 static void assert_hash_lock_agent(struct data_vio *data_vio, const char *where)
402 {
403         /* Not safe to access the agent field except from the hash zone. */
404         assert_data_vio_in_hash_zone(data_vio);
405         VDO_ASSERT_LOG_ONLY(data_vio == data_vio->hash_lock->agent,
406                             "%s must be for the hash lock agent", where);
407 }
408
409 /**
410  * set_duplicate_lock() - Set the duplicate lock held by a hash lock. May only be called in the
411  *                        physical zone of the PBN lock.
412  * @hash_lock: The hash lock to update.
413  * @pbn_lock: The PBN read lock to use as the duplicate lock.
414  */
415 static void set_duplicate_lock(struct hash_lock *hash_lock, struct pbn_lock *pbn_lock)
416 {
417         VDO_ASSERT_LOG_ONLY((hash_lock->duplicate_lock == NULL),
418                             "hash lock must not already hold a duplicate lock");
419         pbn_lock->holder_count += 1;
420         hash_lock->duplicate_lock = pbn_lock;
421 }
422
423 /**
424  * dequeue_lock_waiter() - Remove the first data_vio from the lock's waitq and return it.
425  * @lock: The lock containing the wait queue.
426  *
427  * Return: The first (oldest) waiter in the queue, or NULL if the queue is empty.
428  */
429 static inline struct data_vio *dequeue_lock_waiter(struct hash_lock *lock)
430 {
431         return vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters));
432 }
433
434 /**
435  * set_hash_lock() - Set, change, or clear the hash lock a data_vio is using.
436  * @data_vio: The data_vio to update.
437  * @new_lock: The hash lock the data_vio is joining.
438  *
439  * Updates the hash lock (or locks) to reflect the change in membership.
440  */
441 static void set_hash_lock(struct data_vio *data_vio, struct hash_lock *new_lock)
442 {
443         struct hash_lock *old_lock = data_vio->hash_lock;
444
445         if (old_lock != NULL) {
446                 VDO_ASSERT_LOG_ONLY(data_vio->hash_zone != NULL,
447                                     "must have a hash zone when holding a hash lock");
448                 VDO_ASSERT_LOG_ONLY(!list_empty(&data_vio->hash_lock_entry),
449                                     "must be on a hash lock ring when holding a hash lock");
450                 VDO_ASSERT_LOG_ONLY(old_lock->reference_count > 0,
451                                     "hash lock reference must be counted");
452
453                 if ((old_lock->state != VDO_HASH_LOCK_BYPASSING) &&
454                     (old_lock->state != VDO_HASH_LOCK_UNLOCKING)) {
455                         /*
456                          * If the reference count goes to zero in a non-terminal state, we're most
457                          * likely leaking this lock.
458                          */
459                         VDO_ASSERT_LOG_ONLY(old_lock->reference_count > 1,
460                                             "hash locks should only become unreferenced in a terminal state, not state %s",
461                                             get_hash_lock_state_name(old_lock->state));
462                 }
463
464                 list_del_init(&data_vio->hash_lock_entry);
465                 old_lock->reference_count -= 1;
466
467                 data_vio->hash_lock = NULL;
468         }
469
470         if (new_lock != NULL) {
471                 /*
472                  * Keep all data_vios sharing the lock on a ring since they can complete in any
473                  * order and we'll always need a pointer to one to compare data.
474                  */
475                 list_move_tail(&data_vio->hash_lock_entry, &new_lock->duplicate_ring);
476                 new_lock->reference_count += 1;
477                 if (new_lock->max_references < new_lock->reference_count)
478                         new_lock->max_references = new_lock->reference_count;
479
480                 data_vio->hash_lock = new_lock;
481         }
482 }
483
484 /* There are loops in the state diagram, so some forward decl's are needed. */
485 static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
486                            bool agent_is_done);
487 static void start_locking(struct hash_lock *lock, struct data_vio *agent);
488 static void start_writing(struct hash_lock *lock, struct data_vio *agent);
489 static void unlock_duplicate_pbn(struct vdo_completion *completion);
490 static void transfer_allocation_lock(struct data_vio *data_vio);
491
492 /**
493  * exit_hash_lock() - Bottleneck for data_vios that have written or deduplicated and that are no
494  *                    longer needed to be an agent for the hash lock.
495  * @data_vio: The data_vio to complete and send to be cleaned up.
496  */
497 static void exit_hash_lock(struct data_vio *data_vio)
498 {
499         /* Release the hash lock now, saving a thread transition in cleanup. */
500         vdo_release_hash_lock(data_vio);
501
502         /* Complete the data_vio and start the clean-up path to release any locks it still holds. */
503         data_vio->vio.completion.callback = complete_data_vio;
504
505         continue_data_vio(data_vio);
506 }
507
508 /**
509  * set_duplicate_location() - Set the location of the duplicate block for data_vio, updating the
510  *                            is_duplicate and duplicate fields from a zoned_pbn.
511  * @data_vio: The data_vio to modify.
512  * @source: The location of the duplicate.
513  */
514 static void set_duplicate_location(struct data_vio *data_vio,
515                                    const struct zoned_pbn source)
516 {
517         data_vio->is_duplicate = (source.pbn != VDO_ZERO_BLOCK);
518         data_vio->duplicate = source;
519 }
520
521 /**
522  * retire_lock_agent() - Retire the active lock agent, replacing it with the first lock waiter, and
523  *                       make the retired agent exit the hash lock.
524  * @lock: The hash lock to update.
525  *
526  * Return: The new lock agent (which will be NULL if there was no waiter)
527  */
528 static struct data_vio *retire_lock_agent(struct hash_lock *lock)
529 {
530         struct data_vio *old_agent = lock->agent;
531         struct data_vio *new_agent = dequeue_lock_waiter(lock);
532
533         lock->agent = new_agent;
534         exit_hash_lock(old_agent);
535         if (new_agent != NULL)
536                 set_duplicate_location(new_agent, lock->duplicate);
537         return new_agent;
538 }
539
540 /**
541  * wait_on_hash_lock() - Add a data_vio to the lock's queue of waiters.
542  * @lock: The hash lock on which to wait.
543  * @data_vio: The data_vio to add to the queue.
544  */
545 static void wait_on_hash_lock(struct hash_lock *lock, struct data_vio *data_vio)
546 {
547         vdo_waitq_enqueue_waiter(&lock->waiters, &data_vio->waiter);
548
549         /*
550          * Make sure the agent doesn't block indefinitely in the packer since it now has at least
551          * one other data_vio waiting on it.
552          */
553         if ((lock->state != VDO_HASH_LOCK_WRITING) || !cancel_data_vio_compression(lock->agent))
554                 return;
555
556         /*
557          * Even though we're waiting, we also have to send ourselves as a one-way message to the
558          * packer to ensure the agent continues executing. This is safe because
559          * cancel_vio_compression() guarantees the agent won't continue executing until this
560          * message arrives in the packer, and because the wait queue link isn't used for sending
561          * the message.
562          */
563         data_vio->compression.lock_holder = lock->agent;
564         launch_data_vio_packer_callback(data_vio, vdo_remove_lock_holder_from_packer);
565 }
566
567 /**
568  * abort_waiter() - waiter_callback_fn function that shunts waiters to write their blocks without
569  *                  optimization.
570  * @waiter: The data_vio's waiter link.
571  * @context: Not used.
572  */
573 static void abort_waiter(struct vdo_waiter *waiter, void *context __always_unused)
574 {
575         write_data_vio(vdo_waiter_as_data_vio(waiter));
576 }
577
578 /**
579  * start_bypassing() - Stop using the hash lock.
580  * @lock: The hash lock.
581  * @agent: The data_vio acting as the agent for the lock.
582  *
583  * Stops using the hash lock. This is the final transition for hash locks which did not get an
584  * error.
585  */
586 static void start_bypassing(struct hash_lock *lock, struct data_vio *agent)
587 {
588         lock->state = VDO_HASH_LOCK_BYPASSING;
589         exit_hash_lock(agent);
590 }
591
592 void vdo_clean_failed_hash_lock(struct data_vio *data_vio)
593 {
594         struct hash_lock *lock = data_vio->hash_lock;
595
596         if (lock->state == VDO_HASH_LOCK_BYPASSING) {
597                 exit_hash_lock(data_vio);
598                 return;
599         }
600
601         if (lock->agent == NULL) {
602                 lock->agent = data_vio;
603         } else if (data_vio != lock->agent) {
604                 exit_hash_lock(data_vio);
605                 return;
606         }
607
608         lock->state = VDO_HASH_LOCK_BYPASSING;
609
610         /* Ensure we don't attempt to update advice when cleaning up. */
611         lock->update_advice = false;
612
613         vdo_waitq_notify_all_waiters(&lock->waiters, abort_waiter, NULL);
614
615         if (lock->duplicate_lock != NULL) {
616                 /* The agent must reference the duplicate zone to launch it. */
617                 data_vio->duplicate = lock->duplicate;
618                 launch_data_vio_duplicate_zone_callback(data_vio, unlock_duplicate_pbn);
619                 return;
620         }
621
622         lock->agent = NULL;
623         data_vio->is_duplicate = false;
624         exit_hash_lock(data_vio);
625 }
626
627 /**
628  * finish_unlocking() - Handle the result of the agent for the lock releasing a read lock on
629  *                      duplicate candidate.
630  * @completion: The completion of the data_vio acting as the lock's agent.
631  *
632  * This continuation is registered in unlock_duplicate_pbn().
633  */
634 static void finish_unlocking(struct vdo_completion *completion)
635 {
636         struct data_vio *agent = as_data_vio(completion);
637         struct hash_lock *lock = agent->hash_lock;
638
639         assert_hash_lock_agent(agent, __func__);
640
641         VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
642                             "must have released the duplicate lock for the hash lock");
643
644         if (!lock->verified) {
645                 /*
646                  * UNLOCKING -> WRITING transition: The lock we released was on an unverified
647                  * block, so it must have been a lock on advice we were verifying, not on a
648                  * location that was used for deduplication. Go write (or compress) the block to
649                  * get a location to dedupe against.
650                  */
651                 start_writing(lock, agent);
652                 return;
653         }
654
655         /*
656          * With the lock released, the verified duplicate block may already have changed and will
657          * need to be re-verified if a waiter arrived.
658          */
659         lock->verified = false;
660
661         if (vdo_waitq_has_waiters(&lock->waiters)) {
662                 /*
663                  * UNLOCKING -> LOCKING transition: A new data_vio entered the hash lock while the
664                  * agent was releasing the PBN lock. The current agent exits and the waiter has to
665                  * re-lock and re-verify the duplicate location.
666                  *
667                  * TODO: If we used the current agent to re-acquire the PBN lock we wouldn't need
668                  * to re-verify.
669                  */
670                 agent = retire_lock_agent(lock);
671                 start_locking(lock, agent);
672                 return;
673         }
674
675         /*
676          * UNLOCKING -> BYPASSING transition: The agent is done with the lock and no other
677          * data_vios reference it, so remove it from the lock map and return it to the pool.
678          */
679         start_bypassing(lock, agent);
680 }
681
682 /**
683  * unlock_duplicate_pbn() - Release a read lock on the PBN of the block that may or may not have
684  *                          contained duplicate data.
685  * @completion: The completion of the data_vio acting as the lock's agent.
686  *
687  * This continuation is launched by start_unlocking(), and calls back to finish_unlocking() on the
688  * hash zone thread.
689  */
690 static void unlock_duplicate_pbn(struct vdo_completion *completion)
691 {
692         struct data_vio *agent = as_data_vio(completion);
693         struct hash_lock *lock = agent->hash_lock;
694
695         assert_data_vio_in_duplicate_zone(agent);
696         VDO_ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
697                             "must have a duplicate lock to release");
698
699         vdo_release_physical_zone_pbn_lock(agent->duplicate.zone, agent->duplicate.pbn,
700                                            vdo_forget(lock->duplicate_lock));
701         if (lock->state == VDO_HASH_LOCK_BYPASSING) {
702                 complete_data_vio(completion);
703                 return;
704         }
705
706         launch_data_vio_hash_zone_callback(agent, finish_unlocking);
707 }
708
709 /**
710  * start_unlocking() - Release a read lock on the PBN of the block that may or may not have
711  *                     contained duplicate data.
712  * @lock: The hash lock.
713  * @agent: The data_vio currently acting as the agent for the lock.
714  */
715 static void start_unlocking(struct hash_lock *lock, struct data_vio *agent)
716 {
717         lock->state = VDO_HASH_LOCK_UNLOCKING;
718         launch_data_vio_duplicate_zone_callback(agent, unlock_duplicate_pbn);
719 }
720
721 static void release_context(struct dedupe_context *context)
722 {
723         struct hash_zone *zone = context->zone;
724
725         WRITE_ONCE(zone->active, zone->active - 1);
726         list_move(&context->list_entry, &zone->available);
727 }
728
729 static void process_update_result(struct data_vio *agent)
730 {
731         struct dedupe_context *context = agent->dedupe_context;
732
733         if ((context == NULL) ||
734             !change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE))
735                 return;
736
737         release_context(context);
738 }
739
740 /**
741  * finish_updating() - Process the result of a UDS update performed by the agent for the lock.
742  * @completion: The completion of the data_vio that performed the update
743  *
744  * This continuation is registered in start_querying().
745  */
746 static void finish_updating(struct vdo_completion *completion)
747 {
748         struct data_vio *agent = as_data_vio(completion);
749         struct hash_lock *lock = agent->hash_lock;
750
751         assert_hash_lock_agent(agent, __func__);
752
753         process_update_result(agent);
754
755         /*
756          * UDS was updated successfully, so don't update again unless the duplicate location
757          * changes due to rollover.
758          */
759         lock->update_advice = false;
760
761         if (vdo_waitq_has_waiters(&lock->waiters)) {
762                 /*
763                  * UPDATING -> DEDUPING transition: A new data_vio arrived during the UDS update.
764                  * Send it on the verified dedupe path. The agent is done with the lock, but the
765                  * lock may still need to use it to clean up after rollover.
766                  */
767                 start_deduping(lock, agent, true);
768                 return;
769         }
770
771         if (lock->duplicate_lock != NULL) {
772                 /*
773                  * UPDATING -> UNLOCKING transition: No one is waiting to dedupe, but we hold a
774                  * duplicate PBN lock, so go release it.
775                  */
776                 start_unlocking(lock, agent);
777                 return;
778         }
779
780         /*
781          * UPDATING -> BYPASSING transition: No one is waiting to dedupe and there's no lock to
782          * release.
783          */
784         start_bypassing(lock, agent);
785 }
786
787 static void query_index(struct data_vio *data_vio, enum uds_request_type operation);
788
789 /**
790  * start_updating() - Continue deduplication with the last step, updating UDS with the location of
791  *                    the duplicate that should be returned as advice in the future.
792  * @lock: The hash lock.
793  * @agent: The data_vio currently acting as the agent for the lock.
794  */
795 static void start_updating(struct hash_lock *lock, struct data_vio *agent)
796 {
797         lock->state = VDO_HASH_LOCK_UPDATING;
798
799         VDO_ASSERT_LOG_ONLY(lock->verified, "new advice should have been verified");
800         VDO_ASSERT_LOG_ONLY(lock->update_advice, "should only update advice if needed");
801
802         agent->last_async_operation = VIO_ASYNC_OP_UPDATE_DEDUPE_INDEX;
803         set_data_vio_hash_zone_callback(agent, finish_updating);
804         query_index(agent, UDS_UPDATE);
805 }
806
807 /**
808  * finish_deduping() - Handle a data_vio that has finished deduplicating against the block locked
809  *                     by the hash lock.
810  * @lock: The hash lock.
811  * @data_vio: The lock holder that has finished deduplicating.
812  *
813  * If there are other data_vios still sharing the lock, this will just release the data_vio's share
814  * of the lock and finish processing the data_vio. If this is the last data_vio holding the lock,
815  * this makes the data_vio the lock agent and uses it to advance the state of the lock so it can
816  * eventually be released.
817  */
818 static void finish_deduping(struct hash_lock *lock, struct data_vio *data_vio)
819 {
820         struct data_vio *agent = data_vio;
821
822         VDO_ASSERT_LOG_ONLY(lock->agent == NULL, "shouldn't have an agent in DEDUPING");
823         VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters),
824                             "shouldn't have any lock waiters in DEDUPING");
825
826         /* Just release the lock reference if other data_vios are still deduping. */
827         if (lock->reference_count > 1) {
828                 exit_hash_lock(data_vio);
829                 return;
830         }
831
832         /* The hash lock must have an agent for all other lock states. */
833         lock->agent = agent;
834         if (lock->update_advice) {
835                 /*
836                  * DEDUPING -> UPDATING transition: The location of the duplicate block changed
837                  * since the initial UDS query because of compression, rollover, or because the
838                  * query agent didn't have an allocation. The UDS update was delayed in case there
839                  * was another change in location, but with only this data_vio using the hash lock,
840                  * it's time to update the advice.
841                  */
842                 start_updating(lock, agent);
843         } else {
844                 /*
845                  * DEDUPING -> UNLOCKING transition: Release the PBN read lock on the duplicate
846                  * location so the hash lock itself can be released (contingent on no new data_vios
847                  * arriving in the lock before the agent returns).
848                  */
849                 start_unlocking(lock, agent);
850         }
851 }
852
853 /**
854  * acquire_lock() - Get the lock for a record name.
855  * @zone: The zone responsible for the hash.
856  * @hash: The hash to lock.
857  * @replace_lock: If non-NULL, the lock already registered for the hash which should be replaced by
858  *                the new lock.
859  * @lock_ptr: A pointer to receive the hash lock.
860  *
861  * Gets the lock for the hash (record name) of the data in a data_vio, or if one does not exist (or
862  * if we are explicitly rolling over), initialize a new lock for the hash and register it in the
863  * zone. This must only be called in the correct thread for the zone.
864  *
865  * Return: VDO_SUCCESS or an error code.
866  */
867 static int __must_check acquire_lock(struct hash_zone *zone,
868                                      const struct uds_record_name *hash,
869                                      struct hash_lock *replace_lock,
870                                      struct hash_lock **lock_ptr)
871 {
872         struct hash_lock *lock, *new_lock;
873         int result;
874
875         /*
876          * Borrow and prepare a lock from the pool so we don't have to do two int_map accesses
877          * in the common case of no lock contention.
878          */
879         result = VDO_ASSERT(!list_empty(&zone->lock_pool),
880                             "never need to wait for a free hash lock");
881         if (result != VDO_SUCCESS)
882                 return result;
883
884         new_lock = list_entry(zone->lock_pool.prev, struct hash_lock, pool_node);
885         list_del_init(&new_lock->pool_node);
886
887         /*
888          * Fill in the hash of the new lock so we can map it, since we have to use the hash as the
889          * map key.
890          */
891         new_lock->hash = *hash;
892
893         result = vdo_int_map_put(zone->hash_lock_map, hash_lock_key(new_lock),
894                                  new_lock, (replace_lock != NULL), (void **) &lock);
895         if (result != VDO_SUCCESS) {
896                 return_hash_lock_to_pool(zone, vdo_forget(new_lock));
897                 return result;
898         }
899
900         if (replace_lock != NULL) {
901                 /* On mismatch put the old lock back and return a severe error */
902                 VDO_ASSERT_LOG_ONLY(lock == replace_lock,
903                                     "old lock must have been in the lock map");
904                 /* TODO: Check earlier and bail out? */
905                 VDO_ASSERT_LOG_ONLY(replace_lock->registered,
906                                     "old lock must have been marked registered");
907                 replace_lock->registered = false;
908         }
909
910         if (lock == replace_lock) {
911                 lock = new_lock;
912                 lock->registered = true;
913         } else {
914                 /* There's already a lock for the hash, so we don't need the borrowed lock. */
915                 return_hash_lock_to_pool(zone, vdo_forget(new_lock));
916         }
917
918         *lock_ptr = lock;
919         return VDO_SUCCESS;
920 }
921
922 /**
923  * enter_forked_lock() - Bind the data_vio to a new hash lock.
924  *
925  * Implements waiter_callback_fn. Binds the data_vio that was waiting to a new hash lock and waits
926  * on that lock.
927  */
928 static void enter_forked_lock(struct vdo_waiter *waiter, void *context)
929 {
930         struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
931         struct hash_lock *new_lock = context;
932
933         set_hash_lock(data_vio, new_lock);
934         wait_on_hash_lock(new_lock, data_vio);
935 }
936
937 /**
938  * fork_hash_lock() - Fork a hash lock because it has run out of increments on the duplicate PBN.
939  * @old_lock: The hash lock to fork.
940  * @new_agent: The data_vio that will be the agent for the new lock.
941  *
942  * Transfers the new agent and any lock waiters to a new hash lock instance which takes the place
943  * of the old lock in the lock map. The old lock remains active, but will not update advice.
944  */
945 static void fork_hash_lock(struct hash_lock *old_lock, struct data_vio *new_agent)
946 {
947         struct hash_lock *new_lock;
948         int result;
949
950         result = acquire_lock(new_agent->hash_zone, &new_agent->record_name, old_lock,
951                               &new_lock);
952         if (result != VDO_SUCCESS) {
953                 continue_data_vio_with_error(new_agent, result);
954                 return;
955         }
956
957         /*
958          * Only one of the two locks should update UDS. The old lock is out of references, so it
959          * would be poor dedupe advice in the short term.
960          */
961         old_lock->update_advice = false;
962         new_lock->update_advice = true;
963
964         set_hash_lock(new_agent, new_lock);
965         new_lock->agent = new_agent;
966
967         vdo_waitq_notify_all_waiters(&old_lock->waiters, enter_forked_lock, new_lock);
968
969         new_agent->is_duplicate = false;
970         start_writing(new_lock, new_agent);
971 }
972
973 /**
974  * launch_dedupe() - Reserve a reference count increment for a data_vio and launch it on the dedupe
975  *                   path.
976  * @lock: The hash lock.
977  * @data_vio: The data_vio to deduplicate using the hash lock.
978  * @has_claim: true if the data_vio already has claimed an increment from the duplicate lock.
979  *
980  * If no increments are available, this will roll over to a new hash lock and launch the data_vio
981  * as the writing agent for that lock.
982  */
983 static void launch_dedupe(struct hash_lock *lock, struct data_vio *data_vio,
984                           bool has_claim)
985 {
986         if (!has_claim && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
987                 /* Out of increments, so must roll over to a new lock. */
988                 fork_hash_lock(lock, data_vio);
989                 return;
990         }
991
992         /* Deduplicate against the lock's verified location. */
993         set_duplicate_location(data_vio, lock->duplicate);
994         data_vio->new_mapped = data_vio->duplicate;
995         update_metadata_for_data_vio_write(data_vio, lock->duplicate_lock);
996 }
997
998 /**
999  * start_deduping() - Enter the hash lock state where data_vios deduplicate in parallel against a
1000  *                    true copy of their data on disk.
1001  * @lock: The hash lock.
1002  * @agent: The data_vio acting as the agent for the lock.
1003  * @agent_is_done: true only if the agent has already written or deduplicated against its data.
1004  *
1005  * If the agent itself needs to deduplicate, an increment for it must already have been claimed
1006  * from the duplicate lock, ensuring the hash lock will still have a data_vio holding it.
1007  */
1008 static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
1009                            bool agent_is_done)
1010 {
1011         lock->state = VDO_HASH_LOCK_DEDUPING;
1012
1013         /*
1014          * We don't take the downgraded allocation lock from the agent unless we actually need to
1015          * deduplicate against it.
1016          */
1017         if (lock->duplicate_lock == NULL) {
1018                 VDO_ASSERT_LOG_ONLY(!vdo_is_state_compressed(agent->new_mapped.state),
1019                                     "compression must have shared a lock");
1020                 VDO_ASSERT_LOG_ONLY(agent_is_done,
1021                                     "agent must have written the new duplicate");
1022                 transfer_allocation_lock(agent);
1023         }
1024
1025         VDO_ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(lock->duplicate_lock),
1026                             "duplicate_lock must be a PBN read lock");
1027
1028         /*
1029          * This state is not like any of the other states. There is no designated agent--the agent
1030          * transitioning to this state and all the waiters will be launched to deduplicate in
1031          * parallel.
1032          */
1033         lock->agent = NULL;
1034
1035         /*
1036          * Launch the agent (if not already deduplicated) and as many lock waiters as we have
1037          * available increments for on the dedupe path. If we run out of increments, rollover will
1038          * be triggered and the remaining waiters will be transferred to the new lock.
1039          */
1040         if (!agent_is_done) {
1041                 launch_dedupe(lock, agent, true);
1042                 agent = NULL;
1043         }
1044         while (vdo_waitq_has_waiters(&lock->waiters))
1045                 launch_dedupe(lock, dequeue_lock_waiter(lock), false);
1046
1047         if (agent_is_done) {
1048                 /*
1049                  * In the degenerate case where all the waiters rolled over to a new lock, this
1050                  * will continue to use the old agent to clean up this lock, and otherwise it just
1051                  * lets the agent exit the lock.
1052                  */
1053                 finish_deduping(lock, agent);
1054         }
1055 }
1056
1057 /**
1058  * increment_stat() - Increment a statistic counter in a non-atomic yet thread-safe manner.
1059  * @stat: The statistic field to increment.
1060  */
1061 static inline void increment_stat(u64 *stat)
1062 {
1063         /*
1064          * Must only be mutated on the hash zone thread. Prevents any compiler shenanigans from
1065          * affecting other threads reading stats.
1066          */
1067         WRITE_ONCE(*stat, *stat + 1);
1068 }
1069
1070 /**
1071  * finish_verifying() - Handle the result of the agent for the lock comparing its data to the
1072  *                      duplicate candidate.
1073  * @completion: The completion of the data_vio used to verify dedupe
1074  *
1075  * This continuation is registered in start_verifying().
1076  */
1077 static void finish_verifying(struct vdo_completion *completion)
1078 {
1079         struct data_vio *agent = as_data_vio(completion);
1080         struct hash_lock *lock = agent->hash_lock;
1081
1082         assert_hash_lock_agent(agent, __func__);
1083
1084         lock->verified = agent->is_duplicate;
1085
1086         /*
1087          * Only count the result of the initial verification of the advice as valid or stale, and
1088          * not any re-verifications due to PBN lock releases.
1089          */
1090         if (!lock->verify_counted) {
1091                 lock->verify_counted = true;
1092                 if (lock->verified)
1093                         increment_stat(&agent->hash_zone->statistics.dedupe_advice_valid);
1094                 else
1095                         increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
1096         }
1097
1098         /*
1099          * Even if the block is a verified duplicate, we can't start to deduplicate unless we can
1100          * claim a reference count increment for the agent.
1101          */
1102         if (lock->verified && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
1103                 agent->is_duplicate = false;
1104                 lock->verified = false;
1105         }
1106
1107         if (lock->verified) {
1108                 /*
1109                  * VERIFYING -> DEDUPING transition: The advice is for a true duplicate, so start
1110                  * deduplicating against it, if references are available.
1111                  */
1112                 start_deduping(lock, agent, false);
1113         } else {
1114                 /*
1115                  * VERIFYING -> UNLOCKING transition: Either the verify failed or we'd try to
1116                  * dedupe and roll over immediately, which would fail because it would leave the
1117                  * lock without an agent to release the PBN lock. In both cases, the data will have
1118                  * to be written or compressed, but first the advice PBN must be unlocked by the
1119                  * VERIFYING agent.
1120                  */
1121                 lock->update_advice = true;
1122                 start_unlocking(lock, agent);
1123         }
1124 }
1125
1126 static bool blocks_equal(char *block1, char *block2)
1127 {
1128         int i;
1129
1130         for (i = 0; i < VDO_BLOCK_SIZE; i += sizeof(u64)) {
1131                 if (*((u64 *) &block1[i]) != *((u64 *) &block2[i]))
1132                         return false;
1133         }
1134
1135         return true;
1136 }
1137
1138 static void verify_callback(struct vdo_completion *completion)
1139 {
1140         struct data_vio *agent = as_data_vio(completion);
1141
1142         agent->is_duplicate = blocks_equal(agent->vio.data, agent->scratch_block);
1143         launch_data_vio_hash_zone_callback(agent, finish_verifying);
1144 }
1145
1146 static void uncompress_and_verify(struct vdo_completion *completion)
1147 {
1148         struct data_vio *agent = as_data_vio(completion);
1149         int result;
1150
1151         result = uncompress_data_vio(agent, agent->duplicate.state,
1152                                      agent->scratch_block);
1153         if (result == VDO_SUCCESS) {
1154                 verify_callback(completion);
1155                 return;
1156         }
1157
1158         agent->is_duplicate = false;
1159         launch_data_vio_hash_zone_callback(agent, finish_verifying);
1160 }
1161
1162 static void verify_endio(struct bio *bio)
1163 {
1164         struct data_vio *agent = vio_as_data_vio(bio->bi_private);
1165         int result = blk_status_to_errno(bio->bi_status);
1166
1167         vdo_count_completed_bios(bio);
1168         if (result != VDO_SUCCESS) {
1169                 agent->is_duplicate = false;
1170                 launch_data_vio_hash_zone_callback(agent, finish_verifying);
1171                 return;
1172         }
1173
1174         if (vdo_is_state_compressed(agent->duplicate.state)) {
1175                 launch_data_vio_cpu_callback(agent, uncompress_and_verify,
1176                                              CPU_Q_COMPRESS_BLOCK_PRIORITY);
1177                 return;
1178         }
1179
1180         launch_data_vio_cpu_callback(agent, verify_callback,
1181                                      CPU_Q_COMPLETE_READ_PRIORITY);
1182 }
1183
1184 /**
1185  * start_verifying() - Begin the data verification phase.
1186  * @lock: The hash lock (must be LOCKING).
1187  * @agent: The data_vio to use to read and compare candidate data.
1188  *
1189  * Continue the deduplication path for a hash lock by using the agent to read (and possibly
1190  * decompress) the data at the candidate duplicate location, comparing it to the data in the agent
1191  * to verify that the candidate is identical to all the data_vios sharing the hash. If so, it can
1192  * be deduplicated against, otherwise a data_vio allocation will have to be written to and used for
1193  * dedupe.
1194  */
1195 static void start_verifying(struct hash_lock *lock, struct data_vio *agent)
1196 {
1197         int result;
1198         struct vio *vio = &agent->vio;
1199         char *buffer = (vdo_is_state_compressed(agent->duplicate.state) ?
1200                         (char *) agent->compression.block :
1201                         agent->scratch_block);
1202
1203         lock->state = VDO_HASH_LOCK_VERIFYING;
1204         VDO_ASSERT_LOG_ONLY(!lock->verified, "hash lock only verifies advice once");
1205
1206         agent->last_async_operation = VIO_ASYNC_OP_VERIFY_DUPLICATION;
1207         result = vio_reset_bio(vio, buffer, verify_endio, REQ_OP_READ,
1208                                agent->duplicate.pbn);
1209         if (result != VDO_SUCCESS) {
1210                 set_data_vio_hash_zone_callback(agent, finish_verifying);
1211                 continue_data_vio_with_error(agent, result);
1212                 return;
1213         }
1214
1215         set_data_vio_bio_zone_callback(agent, vdo_submit_vio);
1216         vdo_launch_completion_with_priority(&vio->completion, BIO_Q_VERIFY_PRIORITY);
1217 }
1218
1219 /**
1220  * finish_locking() - Handle the result of the agent for the lock attempting to obtain a PBN read
1221  *                    lock on the candidate duplicate block.
1222  * @completion: The completion of the data_vio that attempted to get the read lock.
1223  *
1224  * This continuation is registered in lock_duplicate_pbn().
1225  */
1226 static void finish_locking(struct vdo_completion *completion)
1227 {
1228         struct data_vio *agent = as_data_vio(completion);
1229         struct hash_lock *lock = agent->hash_lock;
1230
1231         assert_hash_lock_agent(agent, __func__);
1232
1233         if (!agent->is_duplicate) {
1234                 VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
1235                                     "must not hold duplicate_lock if not flagged as a duplicate");
1236                 /*
1237                  * LOCKING -> WRITING transition: The advice block is being modified or has no
1238                  * available references, so try to write or compress the data, remembering to
1239                  * update UDS later with the new advice.
1240                  */
1241                 increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
1242                 lock->update_advice = true;
1243                 start_writing(lock, agent);
1244                 return;
1245         }
1246
1247         VDO_ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
1248                             "must hold duplicate_lock if flagged as a duplicate");
1249
1250         if (!lock->verified) {
1251                 /*
1252                  * LOCKING -> VERIFYING transition: Continue on the unverified dedupe path, reading
1253                  * the candidate duplicate and comparing it to the agent's data to decide whether
1254                  * it is a true duplicate or stale advice.
1255                  */
1256                 start_verifying(lock, agent);
1257                 return;
1258         }
1259
1260         if (!vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
1261                 /*
1262                  * LOCKING -> UNLOCKING transition: The verified block was re-locked, but has no
1263                  * available increments left. Must first release the useless PBN read lock before
1264                  * rolling over to a new copy of the block.
1265                  */
1266                 agent->is_duplicate = false;
1267                 lock->verified = false;
1268                 lock->update_advice = true;
1269                 start_unlocking(lock, agent);
1270                 return;
1271         }
1272
1273         /*
1274          * LOCKING -> DEDUPING transition: Continue on the verified dedupe path, deduplicating
1275          * against a location that was previously verified or written to.
1276          */
1277         start_deduping(lock, agent, false);
1278 }
1279
1280 static bool acquire_provisional_reference(struct data_vio *agent, struct pbn_lock *lock,
1281                                           struct slab_depot *depot)
1282 {
1283         /* Ensure that the newly-locked block is referenced. */
1284         struct vdo_slab *slab = vdo_get_slab(depot, agent->duplicate.pbn);
1285         int result = vdo_acquire_provisional_reference(slab, agent->duplicate.pbn, lock);
1286
1287         if (result == VDO_SUCCESS)
1288                 return true;
1289
1290         vdo_log_warning_strerror(result,
1291                                  "Error acquiring provisional reference for dedupe candidate; aborting dedupe");
1292         agent->is_duplicate = false;
1293         vdo_release_physical_zone_pbn_lock(agent->duplicate.zone,
1294                                            agent->duplicate.pbn, lock);
1295         continue_data_vio_with_error(agent, result);
1296         return false;
1297 }
1298
1299 /**
1300  * lock_duplicate_pbn() - Acquire a read lock on the PBN of the block containing candidate
1301  *                        duplicate data (compressed or uncompressed).
1302  * @completion: The completion of the data_vio attempting to acquire the physical block lock on
1303  *              behalf of its hash lock.
1304  *
1305  * If the PBN is already locked for writing, the lock attempt is abandoned and is_duplicate will be
1306  * cleared before calling back. This continuation is launched from start_locking(), and calls back
1307  * to finish_locking() on the hash zone thread.
1308  */
1309 static void lock_duplicate_pbn(struct vdo_completion *completion)
1310 {
1311         unsigned int increment_limit;
1312         struct pbn_lock *lock;
1313         int result;
1314
1315         struct data_vio *agent = as_data_vio(completion);
1316         struct slab_depot *depot = vdo_from_data_vio(agent)->depot;
1317         struct physical_zone *zone = agent->duplicate.zone;
1318
1319         assert_data_vio_in_duplicate_zone(agent);
1320
1321         set_data_vio_hash_zone_callback(agent, finish_locking);
1322
1323         /*
1324          * While in the zone that owns it, find out how many additional references can be made to
1325          * the block if it turns out to truly be a duplicate.
1326          */
1327         increment_limit = vdo_get_increment_limit(depot, agent->duplicate.pbn);
1328         if (increment_limit == 0) {
1329                 /*
1330                  * We could deduplicate against it later if a reference happened to be released
1331                  * during verification, but it's probably better to bail out now.
1332                  */
1333                 agent->is_duplicate = false;
1334                 continue_data_vio(agent);
1335                 return;
1336         }
1337
1338         result = vdo_attempt_physical_zone_pbn_lock(zone, agent->duplicate.pbn,
1339                                                     VIO_READ_LOCK, &lock);
1340         if (result != VDO_SUCCESS) {
1341                 continue_data_vio_with_error(agent, result);
1342                 return;
1343         }
1344
1345         if (!vdo_is_pbn_read_lock(lock)) {
1346                 /*
1347                  * There are three cases of write locks: uncompressed data block writes, compressed
1348                  * (packed) block writes, and block map page writes. In all three cases, we give up
1349                  * on trying to verify the advice and don't bother to try deduplicate against the
1350                  * data in the write lock holder.
1351                  *
1352                  * 1) We don't ever want to try to deduplicate against a block map page.
1353                  *
1354                  * 2a) It's very unlikely we'd deduplicate against an entire packed block, both
1355                  * because of the chance of matching it, and because we don't record advice for it,
1356                  * but for the uncompressed representation of all the fragments it contains. The
1357                  * only way we'd be getting lock contention is if we've written the same
1358                  * representation coincidentally before, had it become unreferenced, and it just
1359                  * happened to be packed together from compressed writes when we go to verify the
1360                  * lucky advice. Giving up is a minuscule loss of potential dedupe.
1361                  *
1362                  * 2b) If the advice is for a slot of a compressed block, it's about to get
1363                  * smashed, and the write smashing it cannot contain our data--it would have to be
1364                  * writing on behalf of our hash lock, but that's impossible since we're the lock
1365                  * agent.
1366                  *
1367                  * 3a) If the lock is held by a data_vio with different data, the advice is already
1368                  * stale or is about to become stale.
1369                  *
1370                  * 3b) If the lock is held by a data_vio that matches us, we may as well either
1371                  * write it ourselves (or reference the copy we already wrote) instead of
1372                  * potentially having many duplicates wait for the lock holder to write, journal,
1373                  * hash, and finally arrive in the hash lock. We lose a chance to avoid a UDS
1374                  * update in the very rare case of advice for a free block that just happened to be
1375                  * allocated to a data_vio with the same hash. There's also a chance to save on a
1376                  * block write, at the cost of a block verify. Saving on a full block compare in
1377                  * all stale advice cases almost certainly outweighs saving a UDS update and
1378                  * trading a write for a read in a lucky case where advice would have been saved
1379                  * from becoming stale.
1380                  */
1381                 agent->is_duplicate = false;
1382                 continue_data_vio(agent);
1383                 return;
1384         }
1385
1386         if (lock->holder_count == 0) {
1387                 if (!acquire_provisional_reference(agent, lock, depot))
1388                         return;
1389
1390                 /*
1391                  * The increment limit we grabbed earlier is still valid. The lock now holds the
1392                  * rights to acquire all those references. Those rights will be claimed by hash
1393                  * locks sharing this read lock.
1394                  */
1395                 lock->increment_limit = increment_limit;
1396         }
1397
1398         /*
1399          * We've successfully acquired a read lock on behalf of the hash lock, so mark it as such.
1400          */
1401         set_duplicate_lock(agent->hash_lock, lock);
1402
1403         /*
1404          * TODO: Optimization: We could directly launch the block verify, then switch to a hash
1405          * thread.
1406          */
1407         continue_data_vio(agent);
1408 }
1409
1410 /**
1411  * start_locking() - Continue deduplication for a hash lock that has obtained valid advice of a
1412  *                   potential duplicate through its agent.
1413  * @lock: The hash lock (currently must be QUERYING).
1414  * @agent: The data_vio bearing the dedupe advice.
1415  */
1416 static void start_locking(struct hash_lock *lock, struct data_vio *agent)
1417 {
1418         VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
1419                             "must not acquire a duplicate lock when already holding it");
1420
1421         lock->state = VDO_HASH_LOCK_LOCKING;
1422
1423         /*
1424          * TODO: Optimization: If we arrange to continue on the duplicate zone thread when
1425          * accepting the advice, and don't explicitly change lock states (or use an agent-local
1426          * state, or an atomic), we can avoid a thread transition here.
1427          */
1428         agent->last_async_operation = VIO_ASYNC_OP_LOCK_DUPLICATE_PBN;
1429         launch_data_vio_duplicate_zone_callback(agent, lock_duplicate_pbn);
1430 }
1431
1432 /**
1433  * finish_writing() - Re-entry point for the lock agent after it has finished writing or
1434  *                    compressing its copy of the data block.
1435  * @lock: The hash lock, which must be in state WRITING.
1436  * @agent: The data_vio that wrote its data for the lock.
1437  *
1438  * The agent will never need to dedupe against anything, so it's done with the lock, but the lock
1439  * may not be finished with it, as a UDS update might still be needed.
1440  *
1441  * If there are other lock holders, the agent will hand the job to one of them and exit, leaving
1442  * the lock to deduplicate against the just-written block. If there are no other lock holders, the
1443  * agent either exits (and later tears down the hash lock), or it remains the agent and updates
1444  * UDS.
1445  */
1446 static void finish_writing(struct hash_lock *lock, struct data_vio *agent)
1447 {
1448         /*
1449          * Dedupe against the data block or compressed block slot the agent wrote. Since we know
1450          * the write succeeded, there's no need to verify it.
1451          */
1452         lock->duplicate = agent->new_mapped;
1453         lock->verified = true;
1454
1455         if (vdo_is_state_compressed(lock->duplicate.state) && lock->registered) {
1456                 /*
1457                  * Compression means the location we gave in the UDS query is not the location
1458                  * we're using to deduplicate.
1459                  */
1460                 lock->update_advice = true;
1461         }
1462
1463         /* If there are any waiters, we need to start deduping them. */
1464         if (vdo_waitq_has_waiters(&lock->waiters)) {
1465                 /*
1466                  * WRITING -> DEDUPING transition: an asynchronously-written block failed to
1467                  * compress, so the PBN lock on the written copy was already transferred. The agent
1468                  * is done with the lock, but the lock may still need to use it to clean up after
1469                  * rollover.
1470                  */
1471                 start_deduping(lock, agent, true);
1472                 return;
1473         }
1474
1475         /*
1476          * There are no waiters and the agent has successfully written, so take a step towards
1477          * being able to release the hash lock (or just release it).
1478          */
1479         if (lock->update_advice) {
1480                 /*
1481                  * WRITING -> UPDATING transition: There's no waiter and a UDS update is needed, so
1482                  * retain the WRITING agent and use it to launch the update. The happens on
1483                  * compression, rollover, or the QUERYING agent not having an allocation.
1484                  */
1485                 start_updating(lock, agent);
1486         } else if (lock->duplicate_lock != NULL) {
1487                 /*
1488                  * WRITING -> UNLOCKING transition: There's no waiter and no update needed, but the
1489                  * compressed write gave us a shared duplicate lock that we must release.
1490                  */
1491                 set_duplicate_location(agent, lock->duplicate);
1492                 start_unlocking(lock, agent);
1493         } else {
1494                 /*
1495                  * WRITING -> BYPASSING transition: There's no waiter, no update needed, and no
1496                  * duplicate lock held, so both the agent and lock have no more work to do. The
1497                  * agent will release its allocation lock in cleanup.
1498                  */
1499                 start_bypassing(lock, agent);
1500         }
1501 }
1502
1503 /**
1504  * select_writing_agent() - Search through the lock waiters for a data_vio that has an allocation.
1505  * @lock: The hash lock to modify.
1506  *
1507  * If an allocation is found, swap agents, put the old agent at the head of the wait queue, then
1508  * return the new agent. Otherwise, just return the current agent.
1509  */
1510 static struct data_vio *select_writing_agent(struct hash_lock *lock)
1511 {
1512         struct vdo_wait_queue temp_queue;
1513         struct data_vio *data_vio;
1514
1515         vdo_waitq_init(&temp_queue);
1516
1517         /*
1518          * Move waiters to the temp queue one-by-one until we find an allocation. Not ideal to
1519          * search, but it only happens when nearly out of space.
1520          */
1521         while (((data_vio = dequeue_lock_waiter(lock)) != NULL) &&
1522                !data_vio_has_allocation(data_vio)) {
1523                 /* Use the lower-level enqueue since we're just moving waiters around. */
1524                 vdo_waitq_enqueue_waiter(&temp_queue, &data_vio->waiter);
1525         }
1526
1527         if (data_vio != NULL) {
1528                 /*
1529                  * Move the rest of the waiters over to the temp queue, preserving the order they
1530                  * arrived at the lock.
1531                  */
1532                 vdo_waitq_transfer_all_waiters(&lock->waiters, &temp_queue);
1533
1534                 /*
1535                  * The current agent is being replaced and will have to wait to dedupe; make it the
1536                  * first waiter since it was the first to reach the lock.
1537                  */
1538                 vdo_waitq_enqueue_waiter(&lock->waiters, &lock->agent->waiter);
1539                 lock->agent = data_vio;
1540         } else {
1541                 /* No one has an allocation, so keep the current agent. */
1542                 data_vio = lock->agent;
1543         }
1544
1545         /* Swap all the waiters back onto the lock's queue. */
1546         vdo_waitq_transfer_all_waiters(&temp_queue, &lock->waiters);
1547         return data_vio;
1548 }
1549
1550 /**
1551  * start_writing() - Begin the non-duplicate write path.
1552  * @lock: The hash lock (currently must be QUERYING).
1553  * @agent: The data_vio currently acting as the agent for the lock.
1554  *
1555  * Begins the non-duplicate write path for a hash lock that had no advice, selecting a data_vio
1556  * with an allocation as a new agent, if necessary, then resuming the agent on the data_vio write
1557  * path.
1558  */
1559 static void start_writing(struct hash_lock *lock, struct data_vio *agent)
1560 {
1561         lock->state = VDO_HASH_LOCK_WRITING;
1562
1563         /*
1564          * The agent might not have received an allocation and so can't be used for writing, but
1565          * it's entirely possible that one of the waiters did.
1566          */
1567         if (!data_vio_has_allocation(agent)) {
1568                 agent = select_writing_agent(lock);
1569                 /* If none of the waiters had an allocation, the writes all have to fail. */
1570                 if (!data_vio_has_allocation(agent)) {
1571                         /*
1572                          * TODO: Should we keep a variant of BYPASSING that causes new arrivals to
1573                          * fail immediately if they don't have an allocation? It might be possible
1574                          * that on some path there would be non-waiters still referencing the lock,
1575                          * so it would remain in the map as everything is currently spelled, even
1576                          * if the agent and all waiters release.
1577                          */
1578                         continue_data_vio_with_error(agent, VDO_NO_SPACE);
1579                         return;
1580                 }
1581         }
1582
1583         /*
1584          * If the agent compresses, it might wait indefinitely in the packer, which would be bad if
1585          * there are any other data_vios waiting.
1586          */
1587         if (vdo_waitq_has_waiters(&lock->waiters))
1588                 cancel_data_vio_compression(agent);
1589
1590         /*
1591          * Send the agent to the compress/pack/write path in vioWrite. If it succeeds, it will
1592          * return to the hash lock via vdo_continue_hash_lock() and call finish_writing().
1593          */
1594         launch_compress_data_vio(agent);
1595 }
1596
1597 /*
1598  * Decode VDO duplicate advice from the old_metadata field of a UDS request.
1599  * Returns true if valid advice was found and decoded
1600  */
1601 static bool decode_uds_advice(struct dedupe_context *context)
1602 {
1603         const struct uds_request *request = &context->request;
1604         struct data_vio *data_vio = context->requestor;
1605         size_t offset = 0;
1606         const struct uds_record_data *encoding = &request->old_metadata;
1607         struct vdo *vdo = vdo_from_data_vio(data_vio);
1608         struct zoned_pbn *advice = &data_vio->duplicate;
1609         u8 version;
1610         int result;
1611
1612         if ((request->status != UDS_SUCCESS) || !request->found)
1613                 return false;
1614
1615         version = encoding->data[offset++];
1616         if (version != UDS_ADVICE_VERSION) {
1617                 vdo_log_error("invalid UDS advice version code %u", version);
1618                 return false;
1619         }
1620
1621         advice->state = encoding->data[offset++];
1622         advice->pbn = get_unaligned_le64(&encoding->data[offset]);
1623         offset += sizeof(u64);
1624         BUG_ON(offset != UDS_ADVICE_SIZE);
1625
1626         /* Don't use advice that's clearly meaningless. */
1627         if ((advice->state == VDO_MAPPING_STATE_UNMAPPED) || (advice->pbn == VDO_ZERO_BLOCK)) {
1628                 vdo_log_debug("Invalid advice from deduplication server: pbn %llu, state %u. Giving up on deduplication of logical block %llu",
1629                               (unsigned long long) advice->pbn, advice->state,
1630                               (unsigned long long) data_vio->logical.lbn);
1631                 atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
1632                 return false;
1633         }
1634
1635         result = vdo_get_physical_zone(vdo, advice->pbn, &advice->zone);
1636         if ((result != VDO_SUCCESS) || (advice->zone == NULL)) {
1637                 vdo_log_debug("Invalid physical block number from deduplication server: %llu, giving up on deduplication of logical block %llu",
1638                               (unsigned long long) advice->pbn,
1639                               (unsigned long long) data_vio->logical.lbn);
1640                 atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
1641                 return false;
1642         }
1643
1644         return true;
1645 }
1646
1647 static void process_query_result(struct data_vio *agent)
1648 {
1649         struct dedupe_context *context = agent->dedupe_context;
1650
1651         if (context == NULL)
1652                 return;
1653
1654         if (change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE)) {
1655                 agent->is_duplicate = decode_uds_advice(context);
1656                 release_context(context);
1657         }
1658 }
1659
1660 /**
1661  * finish_querying() - Process the result of a UDS query performed by the agent for the lock.
1662  * @completion: The completion of the data_vio that performed the query.
1663  *
1664  * This continuation is registered in start_querying().
1665  */
1666 static void finish_querying(struct vdo_completion *completion)
1667 {
1668         struct data_vio *agent = as_data_vio(completion);
1669         struct hash_lock *lock = agent->hash_lock;
1670
1671         assert_hash_lock_agent(agent, __func__);
1672
1673         process_query_result(agent);
1674
1675         if (agent->is_duplicate) {
1676                 lock->duplicate = agent->duplicate;
1677                 /*
1678                  * QUERYING -> LOCKING transition: Valid advice was obtained from UDS. Use the
1679                  * QUERYING agent to start the hash lock on the unverified dedupe path, verifying
1680                  * that the advice can be used.
1681                  */
1682                 start_locking(lock, agent);
1683         } else {
1684                 /*
1685                  * The agent will be used as the duplicate if has an allocation; if it does, that
1686                  * location was posted to UDS, so no update will be needed.
1687                  */
1688                 lock->update_advice = !data_vio_has_allocation(agent);
1689                 /*
1690                  * QUERYING -> WRITING transition: There was no advice or the advice wasn't valid,
1691                  * so try to write or compress the data.
1692                  */
1693                 start_writing(lock, agent);
1694         }
1695 }
1696
1697 /**
1698  * start_querying() - Start deduplication for a hash lock.
1699  * @lock: The initialized hash lock.
1700  * @data_vio: The data_vio that has just obtained the new lock.
1701  *
1702  * Starts deduplication for a hash lock that has finished initializing by making the data_vio that
1703  * requested it the agent, entering the QUERYING state, and using the agent to perform the UDS
1704  * query on behalf of the lock.
1705  */
1706 static void start_querying(struct hash_lock *lock, struct data_vio *data_vio)
1707 {
1708         lock->agent = data_vio;
1709         lock->state = VDO_HASH_LOCK_QUERYING;
1710         data_vio->last_async_operation = VIO_ASYNC_OP_CHECK_FOR_DUPLICATION;
1711         set_data_vio_hash_zone_callback(data_vio, finish_querying);
1712         query_index(data_vio,
1713                     (data_vio_has_allocation(data_vio) ? UDS_POST : UDS_QUERY));
1714 }
1715
1716 /**
1717  * report_bogus_lock_state() - Complain that a data_vio has entered a hash_lock that is in an
1718  *                             unimplemented or unusable state and continue the data_vio with an
1719  *                             error.
1720  * @lock: The hash lock.
1721  * @data_vio: The data_vio attempting to enter the lock.
1722  */
1723 static void report_bogus_lock_state(struct hash_lock *lock, struct data_vio *data_vio)
1724 {
1725         VDO_ASSERT_LOG_ONLY(false, "hash lock must not be in unimplemented state %s",
1726                             get_hash_lock_state_name(lock->state));
1727         continue_data_vio_with_error(data_vio, VDO_LOCK_ERROR);
1728 }
1729
1730 /**
1731  * vdo_continue_hash_lock() - Continue the processing state after writing, compressing, or
1732  *                            deduplicating.
1733  * @data_vio: The data_vio to continue processing in its hash lock.
1734  *
1735  * Asynchronously continue processing a data_vio in its hash lock after it has finished writing,
1736  * compressing, or deduplicating, so it can share the result with any data_vios waiting in the hash
1737  * lock, or update the UDS index, or simply release its share of the lock.
1738  *
1739  * Context: This must only be called in the correct thread for the hash zone.
1740  */
1741 void vdo_continue_hash_lock(struct vdo_completion *completion)
1742 {
1743         struct data_vio *data_vio = as_data_vio(completion);
1744         struct hash_lock *lock = data_vio->hash_lock;
1745
1746         switch (lock->state) {
1747         case VDO_HASH_LOCK_WRITING:
1748                 VDO_ASSERT_LOG_ONLY(data_vio == lock->agent,
1749                                     "only the lock agent may continue the lock");
1750                 finish_writing(lock, data_vio);
1751                 break;
1752
1753         case VDO_HASH_LOCK_DEDUPING:
1754                 finish_deduping(lock, data_vio);
1755                 break;
1756
1757         case VDO_HASH_LOCK_BYPASSING:
1758                 /* This data_vio has finished the write path and the lock doesn't need it. */
1759                 exit_hash_lock(data_vio);
1760                 break;
1761
1762         case VDO_HASH_LOCK_INITIALIZING:
1763         case VDO_HASH_LOCK_QUERYING:
1764         case VDO_HASH_LOCK_UPDATING:
1765         case VDO_HASH_LOCK_LOCKING:
1766         case VDO_HASH_LOCK_VERIFYING:
1767         case VDO_HASH_LOCK_UNLOCKING:
1768                 /* A lock in this state should never be re-entered. */
1769                 report_bogus_lock_state(lock, data_vio);
1770                 break;
1771
1772         default:
1773                 report_bogus_lock_state(lock, data_vio);
1774         }
1775 }
1776
1777 /**
1778  * is_hash_collision() - Check to see if a hash collision has occurred.
1779  * @lock: The lock to check.
1780  * @candidate: The data_vio seeking to share the lock.
1781  *
1782  * Check whether the data in data_vios sharing a lock is different than in a data_vio seeking to
1783  * share the lock, which should only be possible in the extremely unlikely case of a hash
1784  * collision.
1785  *
1786  * Return: true if the given data_vio must not share the lock because it doesn't have the same data
1787  *         as the lock holders.
1788  */
1789 static bool is_hash_collision(struct hash_lock *lock, struct data_vio *candidate)
1790 {
1791         struct data_vio *lock_holder;
1792         struct hash_zone *zone;
1793         bool collides;
1794
1795         if (list_empty(&lock->duplicate_ring))
1796                 return false;
1797
1798         lock_holder = list_first_entry(&lock->duplicate_ring, struct data_vio,
1799                                        hash_lock_entry);
1800         zone = candidate->hash_zone;
1801         collides = !blocks_equal(lock_holder->vio.data, candidate->vio.data);
1802         if (collides)
1803                 increment_stat(&zone->statistics.concurrent_hash_collisions);
1804         else
1805                 increment_stat(&zone->statistics.concurrent_data_matches);
1806
1807         return collides;
1808 }
1809
1810 static inline int assert_hash_lock_preconditions(const struct data_vio *data_vio)
1811 {
1812         int result;
1813
1814         /* FIXME: BUG_ON() and/or enter read-only mode? */
1815         result = VDO_ASSERT(data_vio->hash_lock == NULL,
1816                             "must not already hold a hash lock");
1817         if (result != VDO_SUCCESS)
1818                 return result;
1819
1820         result = VDO_ASSERT(list_empty(&data_vio->hash_lock_entry),
1821                             "must not already be a member of a hash lock ring");
1822         if (result != VDO_SUCCESS)
1823                 return result;
1824
1825         return VDO_ASSERT(data_vio->recovery_sequence_number == 0,
1826                           "must not hold a recovery lock when getting a hash lock");
1827 }
1828
1829 /**
1830  * vdo_acquire_hash_lock() - Acquire or share a lock on a record name.
1831  * @data_vio: The data_vio acquiring a lock on its record name.
1832  *
1833  * Acquire or share a lock on the hash (record name) of the data in a data_vio, updating the
1834  * data_vio to reference the lock. This must only be called in the correct thread for the zone. In
1835  * the unlikely case of a hash collision, this function will succeed, but the data_vio will not get
1836  * a lock reference.
1837  */
1838 void vdo_acquire_hash_lock(struct vdo_completion *completion)
1839 {
1840         struct data_vio *data_vio = as_data_vio(completion);
1841         struct hash_lock *lock;
1842         int result;
1843
1844         assert_data_vio_in_hash_zone(data_vio);
1845
1846         result = assert_hash_lock_preconditions(data_vio);
1847         if (result != VDO_SUCCESS) {
1848                 continue_data_vio_with_error(data_vio, result);
1849                 return;
1850         }
1851
1852         result = acquire_lock(data_vio->hash_zone, &data_vio->record_name, NULL, &lock);
1853         if (result != VDO_SUCCESS) {
1854                 continue_data_vio_with_error(data_vio, result);
1855                 return;
1856         }
1857
1858         if (is_hash_collision(lock, data_vio)) {
1859                 /*
1860                  * Hash collisions are extremely unlikely, but the bogus dedupe would be a data
1861                  * corruption. Bypass optimization entirely. We can't compress a data_vio without
1862                  * a hash_lock as the compressed write depends on the hash_lock to manage the
1863                  * references for the compressed block.
1864                  */
1865                 write_data_vio(data_vio);
1866                 return;
1867         }
1868
1869         set_hash_lock(data_vio, lock);
1870         switch (lock->state) {
1871         case VDO_HASH_LOCK_INITIALIZING:
1872                 start_querying(lock, data_vio);
1873                 return;
1874
1875         case VDO_HASH_LOCK_QUERYING:
1876         case VDO_HASH_LOCK_WRITING:
1877         case VDO_HASH_LOCK_UPDATING:
1878         case VDO_HASH_LOCK_LOCKING:
1879         case VDO_HASH_LOCK_VERIFYING:
1880         case VDO_HASH_LOCK_UNLOCKING:
1881                 /* The lock is busy, and can't be shared yet. */
1882                 wait_on_hash_lock(lock, data_vio);
1883                 return;
1884
1885         case VDO_HASH_LOCK_BYPASSING:
1886                 /* We can't use this lock, so bypass optimization entirely. */
1887                 vdo_release_hash_lock(data_vio);
1888                 write_data_vio(data_vio);
1889                 return;
1890
1891         case VDO_HASH_LOCK_DEDUPING:
1892                 launch_dedupe(lock, data_vio, false);
1893                 return;
1894
1895         default:
1896                 /* A lock in this state should not be acquired by new VIOs. */
1897                 report_bogus_lock_state(lock, data_vio);
1898         }
1899 }
1900
1901 /**
1902  * vdo_release_hash_lock() - Release a data_vio's share of a hash lock, if held, and null out the
1903  *                           data_vio's reference to it.
1904  * @data_vio: The data_vio releasing its hash lock.
1905  *
1906  * If the data_vio is the only one holding the lock, this also releases any resources or locks used
1907  * by the hash lock (such as a PBN read lock on a block containing data with the same hash) and
1908  * returns the lock to the hash zone's lock pool.
1909  *
1910  * Context: This must only be called in the correct thread for the hash zone.
1911  */
1912 void vdo_release_hash_lock(struct data_vio *data_vio)
1913 {
1914         u64 lock_key;
1915         struct hash_lock *lock = data_vio->hash_lock;
1916         struct hash_zone *zone = data_vio->hash_zone;
1917
1918         if (lock == NULL)
1919                 return;
1920
1921         set_hash_lock(data_vio, NULL);
1922
1923         if (lock->reference_count > 0) {
1924                 /* The lock is still in use by other data_vios. */
1925                 return;
1926         }
1927
1928         lock_key = hash_lock_key(lock);
1929         if (lock->registered) {
1930                 struct hash_lock *removed;
1931
1932                 removed = vdo_int_map_remove(zone->hash_lock_map, lock_key);
1933                 VDO_ASSERT_LOG_ONLY(lock == removed,
1934                                     "hash lock being released must have been mapped");
1935         } else {
1936                 VDO_ASSERT_LOG_ONLY(lock != vdo_int_map_get(zone->hash_lock_map, lock_key),
1937                                     "unregistered hash lock must not be in the lock map");
1938         }
1939
1940         VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters),
1941                             "hash lock returned to zone must have no waiters");
1942         VDO_ASSERT_LOG_ONLY((lock->duplicate_lock == NULL),
1943                             "hash lock returned to zone must not reference a PBN lock");
1944         VDO_ASSERT_LOG_ONLY((lock->state == VDO_HASH_LOCK_BYPASSING),
1945                             "returned hash lock must not be in use with state %s",
1946                             get_hash_lock_state_name(lock->state));
1947         VDO_ASSERT_LOG_ONLY(list_empty(&lock->pool_node),
1948                             "hash lock returned to zone must not be in a pool ring");
1949         VDO_ASSERT_LOG_ONLY(list_empty(&lock->duplicate_ring),
1950                             "hash lock returned to zone must not reference DataVIOs");
1951
1952         return_hash_lock_to_pool(zone, lock);
1953 }
1954
1955 /**
1956  * transfer_allocation_lock() - Transfer a data_vio's downgraded allocation PBN lock to the
1957  *                              data_vio's hash lock, converting it to a duplicate PBN lock.
1958  * @data_vio: The data_vio holding the allocation lock to transfer.
1959  */
1960 static void transfer_allocation_lock(struct data_vio *data_vio)
1961 {
1962         struct allocation *allocation = &data_vio->allocation;
1963         struct hash_lock *hash_lock = data_vio->hash_lock;
1964
1965         VDO_ASSERT_LOG_ONLY(data_vio->new_mapped.pbn == allocation->pbn,
1966                             "transferred lock must be for the block written");
1967
1968         allocation->pbn = VDO_ZERO_BLOCK;
1969
1970         VDO_ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(allocation->lock),
1971                             "must have downgraded the allocation lock before transfer");
1972
1973         hash_lock->duplicate = data_vio->new_mapped;
1974         data_vio->duplicate = data_vio->new_mapped;
1975
1976         /*
1977          * Since the lock is being transferred, the holder count doesn't change (and isn't even
1978          * safe to examine on this thread).
1979          */
1980         hash_lock->duplicate_lock = vdo_forget(allocation->lock);
1981 }
1982
1983 /**
1984  * vdo_share_compressed_write_lock() - Make a data_vio's hash lock a shared holder of the PBN lock
1985  *                                     on the compressed block to which its data was just written.
1986  * @data_vio: The data_vio which was just compressed.
1987  * @pbn_lock: The PBN lock on the compressed block.
1988  *
1989  * If the lock is still a write lock (as it will be for the first share), it will be converted to a
1990  * read lock. This also reserves a reference count increment for the data_vio.
1991  */
1992 void vdo_share_compressed_write_lock(struct data_vio *data_vio,
1993                                      struct pbn_lock *pbn_lock)
1994 {
1995         bool claimed;
1996
1997         VDO_ASSERT_LOG_ONLY(vdo_get_duplicate_lock(data_vio) == NULL,
1998                             "a duplicate PBN lock should not exist when writing");
1999         VDO_ASSERT_LOG_ONLY(vdo_is_state_compressed(data_vio->new_mapped.state),
2000                             "lock transfer must be for a compressed write");
2001         assert_data_vio_in_new_mapped_zone(data_vio);
2002
2003         /* First sharer downgrades the lock. */
2004         if (!vdo_is_pbn_read_lock(pbn_lock))
2005                 vdo_downgrade_pbn_write_lock(pbn_lock, true);
2006
2007         /*
2008          * Get a share of the PBN lock, ensuring it cannot be released until after this data_vio
2009          * has had a chance to journal a reference.
2010          */
2011         data_vio->duplicate = data_vio->new_mapped;
2012         data_vio->hash_lock->duplicate = data_vio->new_mapped;
2013         set_duplicate_lock(data_vio->hash_lock, pbn_lock);
2014
2015         /*
2016          * Claim a reference for this data_vio. Necessary since another hash_lock might start
2017          * deduplicating against it before our incRef.
2018          */
2019         claimed = vdo_claim_pbn_lock_increment(pbn_lock);
2020         VDO_ASSERT_LOG_ONLY(claimed, "impossible to fail to claim an initial increment");
2021 }
2022
2023 static void start_uds_queue(void *ptr)
2024 {
2025         /*
2026          * Allow the UDS dedupe worker thread to do memory allocations. It will only do allocations
2027          * during the UDS calls that open or close an index, but those allocations can safely sleep
2028          * while reserving a large amount of memory. We could use an allocations_allowed boolean
2029          * (like the base threads do), but it would be an unnecessary embellishment.
2030          */
2031         struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue());
2032
2033         vdo_register_allocating_thread(&thread->allocating_thread, NULL);
2034 }
2035
2036 static void finish_uds_queue(void *ptr __always_unused)
2037 {
2038         vdo_unregister_allocating_thread();
2039 }
2040
2041 static void close_index(struct hash_zones *zones)
2042         __must_hold(&zones->lock)
2043 {
2044         int result;
2045
2046         /*
2047          * Change the index state so that get_index_statistics() will not try to use the index
2048          * session we are closing.
2049          */
2050         zones->index_state = IS_CHANGING;
2051         /* Close the index session, while not holding the lock. */
2052         spin_unlock(&zones->lock);
2053         result = uds_close_index(zones->index_session);
2054
2055         if (result != UDS_SUCCESS)
2056                 vdo_log_error_strerror(result, "Error closing index");
2057         spin_lock(&zones->lock);
2058         zones->index_state = IS_CLOSED;
2059         zones->error_flag |= result != UDS_SUCCESS;
2060         /* ASSERTION: We leave in IS_CLOSED state. */
2061 }
2062
2063 static void open_index(struct hash_zones *zones)
2064         __must_hold(&zones->lock)
2065 {
2066         /* ASSERTION: We enter in IS_CLOSED state. */
2067         int result;
2068         bool create_flag = zones->create_flag;
2069
2070         zones->create_flag = false;
2071         /*
2072          * Change the index state so that the it will be reported to the outside world as
2073          * "opening".
2074          */
2075         zones->index_state = IS_CHANGING;
2076         zones->error_flag = false;
2077
2078         /* Open the index session, while not holding the lock */
2079         spin_unlock(&zones->lock);
2080         result = uds_open_index(create_flag ? UDS_CREATE : UDS_LOAD,
2081                                 &zones->parameters, zones->index_session);
2082         if (result != UDS_SUCCESS)
2083                 vdo_log_error_strerror(result, "Error opening index");
2084
2085         spin_lock(&zones->lock);
2086         if (!create_flag) {
2087                 switch (result) {
2088                 case -ENOENT:
2089                         /*
2090                          * Either there is no index, or there is no way we can recover the index.
2091                          * We will be called again and try to create a new index.
2092                          */
2093                         zones->index_state = IS_CLOSED;
2094                         zones->create_flag = true;
2095                         return;
2096                 default:
2097                         break;
2098                 }
2099         }
2100         if (result == UDS_SUCCESS) {
2101                 zones->index_state = IS_OPENED;
2102         } else {
2103                 zones->index_state = IS_CLOSED;
2104                 zones->index_target = IS_CLOSED;
2105                 zones->error_flag = true;
2106                 spin_unlock(&zones->lock);
2107                 vdo_log_info("Setting UDS index target state to error");
2108                 spin_lock(&zones->lock);
2109         }
2110         /*
2111          * ASSERTION: On success, we leave in IS_OPENED state.
2112          * ASSERTION: On failure, we leave in IS_CLOSED state.
2113          */
2114 }
2115
2116 static void change_dedupe_state(struct vdo_completion *completion)
2117 {
2118         struct hash_zones *zones = as_hash_zones(completion);
2119
2120         spin_lock(&zones->lock);
2121
2122         /* Loop until the index is in the target state and the create flag is clear. */
2123         while (vdo_is_state_normal(&zones->state) &&
2124                ((zones->index_state != zones->index_target) || zones->create_flag)) {
2125                 if (zones->index_state == IS_OPENED)
2126                         close_index(zones);
2127                 else
2128                         open_index(zones);
2129         }
2130
2131         zones->changing = false;
2132         spin_unlock(&zones->lock);
2133 }
2134
2135 static void start_expiration_timer(struct dedupe_context *context)
2136 {
2137         u64 start_time = context->submission_jiffies;
2138         u64 end_time;
2139
2140         if (!change_timer_state(context->zone, DEDUPE_QUERY_TIMER_IDLE,
2141                                 DEDUPE_QUERY_TIMER_RUNNING))
2142                 return;
2143
2144         end_time = max(start_time + vdo_dedupe_index_timeout_jiffies,
2145                        jiffies + vdo_dedupe_index_min_timer_jiffies);
2146         mod_timer(&context->zone->timer, end_time);
2147 }
2148
2149 /**
2150  * report_dedupe_timeouts() - Record and eventually report that some dedupe requests reached their
2151  *                            expiration time without getting answers, so we timed them out.
2152  * @zones: the hash zones.
2153  * @timeouts: the number of newly timed out requests.
2154  */
2155 static void report_dedupe_timeouts(struct hash_zones *zones, unsigned int timeouts)
2156 {
2157         atomic64_add(timeouts, &zones->timeouts);
2158         spin_lock(&zones->lock);
2159         if (__ratelimit(&zones->ratelimiter)) {
2160                 u64 unreported = atomic64_read(&zones->timeouts);
2161
2162                 unreported -= zones->reported_timeouts;
2163                 vdo_log_debug("UDS index timeout on %llu requests",
2164                               (unsigned long long) unreported);
2165                 zones->reported_timeouts += unreported;
2166         }
2167         spin_unlock(&zones->lock);
2168 }
2169
2170 static int initialize_index(struct vdo *vdo, struct hash_zones *zones)
2171 {
2172         int result;
2173         off_t uds_offset;
2174         struct volume_geometry geometry = vdo->geometry;
2175         static const struct vdo_work_queue_type uds_queue_type = {
2176                 .start = start_uds_queue,
2177                 .finish = finish_uds_queue,
2178                 .max_priority = UDS_Q_MAX_PRIORITY,
2179                 .default_priority = UDS_Q_PRIORITY,
2180         };
2181
2182         vdo_set_dedupe_index_timeout_interval(vdo_dedupe_index_timeout_interval);
2183         vdo_set_dedupe_index_min_timer_interval(vdo_dedupe_index_min_timer_interval);
2184
2185         /*
2186          * Since we will save up the timeouts that would have been reported but were ratelimited,
2187          * we don't need to report ratelimiting.
2188          */
2189         ratelimit_default_init(&zones->ratelimiter);
2190         ratelimit_set_flags(&zones->ratelimiter, RATELIMIT_MSG_ON_RELEASE);
2191         uds_offset = ((vdo_get_index_region_start(geometry) -
2192                        geometry.bio_offset) * VDO_BLOCK_SIZE);
2193         zones->parameters = (struct uds_parameters) {
2194                 .bdev = vdo->device_config->owned_device->bdev,
2195                 .offset = uds_offset,
2196                 .size = (vdo_get_index_region_size(geometry) * VDO_BLOCK_SIZE),
2197                 .memory_size = geometry.index_config.mem,
2198                 .sparse = geometry.index_config.sparse,
2199                 .nonce = (u64) geometry.nonce,
2200         };
2201
2202         result = uds_create_index_session(&zones->index_session);
2203         if (result != UDS_SUCCESS)
2204                 return result;
2205
2206         result = vdo_make_thread(vdo, vdo->thread_config.dedupe_thread, &uds_queue_type,
2207                                  1, NULL);
2208         if (result != VDO_SUCCESS) {
2209                 uds_destroy_index_session(vdo_forget(zones->index_session));
2210                 vdo_log_error("UDS index queue initialization failed (%d)", result);
2211                 return result;
2212         }
2213
2214         vdo_initialize_completion(&zones->completion, vdo, VDO_HASH_ZONES_COMPLETION);
2215         vdo_set_completion_callback(&zones->completion, change_dedupe_state,
2216                                     vdo->thread_config.dedupe_thread);
2217         return VDO_SUCCESS;
2218 }
2219
2220 /**
2221  * finish_index_operation() - This is the UDS callback for index queries.
2222  * @request: The uds request which has just completed.
2223  */
2224 static void finish_index_operation(struct uds_request *request)
2225 {
2226         struct dedupe_context *context = container_of(request, struct dedupe_context,
2227                                                       request);
2228
2229         if (change_context_state(context, DEDUPE_CONTEXT_PENDING,
2230                                  DEDUPE_CONTEXT_COMPLETE)) {
2231                 /*
2232                  * This query has not timed out, so send its data_vio back to its hash zone to
2233                  * process the results.
2234                  */
2235                 continue_data_vio(context->requestor);
2236                 return;
2237         }
2238
2239         /*
2240          * This query has timed out, so try to mark it complete and hence eligible for reuse. Its
2241          * data_vio has already moved on.
2242          */
2243         if (!change_context_state(context, DEDUPE_CONTEXT_TIMED_OUT,
2244                                   DEDUPE_CONTEXT_TIMED_OUT_COMPLETE)) {
2245                 VDO_ASSERT_LOG_ONLY(false, "uds request was timed out (state %d)",
2246                                     atomic_read(&context->state));
2247         }
2248
2249         vdo_funnel_queue_put(context->zone->timed_out_complete, &context->queue_entry);
2250 }
2251
2252 /**
2253  * check_for_drain_complete() - Check whether this zone has drained.
2254  * @zone: The zone to check.
2255  */
2256 static void check_for_drain_complete(struct hash_zone *zone)
2257 {
2258         data_vio_count_t recycled = 0;
2259
2260         if (!vdo_is_state_draining(&zone->state))
2261                 return;
2262
2263         if ((atomic_read(&zone->timer_state) == DEDUPE_QUERY_TIMER_IDLE) ||
2264             change_timer_state(zone, DEDUPE_QUERY_TIMER_RUNNING,
2265                                DEDUPE_QUERY_TIMER_IDLE)) {
2266                 del_timer_sync(&zone->timer);
2267         } else {
2268                 /*
2269                  * There is an in flight time-out, which must get processed before we can continue.
2270                  */
2271                 return;
2272         }
2273
2274         for (;;) {
2275                 struct dedupe_context *context;
2276                 struct funnel_queue_entry *entry;
2277
2278                 entry = vdo_funnel_queue_poll(zone->timed_out_complete);
2279                 if (entry == NULL)
2280                         break;
2281
2282                 context = container_of(entry, struct dedupe_context, queue_entry);
2283                 atomic_set(&context->state, DEDUPE_CONTEXT_IDLE);
2284                 list_add(&context->list_entry, &zone->available);
2285                 recycled++;
2286         }
2287
2288         if (recycled > 0)
2289                 WRITE_ONCE(zone->active, zone->active - recycled);
2290         VDO_ASSERT_LOG_ONLY(READ_ONCE(zone->active) == 0, "all contexts inactive");
2291         vdo_finish_draining(&zone->state);
2292 }
2293
2294 static void timeout_index_operations_callback(struct vdo_completion *completion)
2295 {
2296         struct dedupe_context *context, *tmp;
2297         struct hash_zone *zone = as_hash_zone(completion);
2298         u64 timeout_jiffies = msecs_to_jiffies(vdo_dedupe_index_timeout_interval);
2299         unsigned long cutoff = jiffies - timeout_jiffies;
2300         unsigned int timed_out = 0;
2301
2302         atomic_set(&zone->timer_state, DEDUPE_QUERY_TIMER_IDLE);
2303         list_for_each_entry_safe(context, tmp, &zone->pending, list_entry) {
2304                 if (cutoff <= context->submission_jiffies) {
2305                         /*
2306                          * We have reached the oldest query which has not timed out yet, so restart
2307                          * the timer.
2308                          */
2309                         start_expiration_timer(context);
2310                         break;
2311                 }
2312
2313                 if (!change_context_state(context, DEDUPE_CONTEXT_PENDING,
2314                                           DEDUPE_CONTEXT_TIMED_OUT)) {
2315                         /*
2316                          * This context completed between the time the timeout fired, and now. We
2317                          * can treat it as a successful query, its requestor is already enqueued
2318                          * to process it.
2319                          */
2320                         continue;
2321                 }
2322
2323                 /*
2324                  * Remove this context from the pending list so we won't look at it again on a
2325                  * subsequent timeout. Once the index completes it, it will be reused. Meanwhile,
2326                  * send its requestor on its way.
2327                  */
2328                 list_del_init(&context->list_entry);
2329                 continue_data_vio(context->requestor);
2330                 timed_out++;
2331         }
2332
2333         if (timed_out > 0)
2334                 report_dedupe_timeouts(completion->vdo->hash_zones, timed_out);
2335
2336         check_for_drain_complete(zone);
2337 }
2338
2339 static void timeout_index_operations(struct timer_list *t)
2340 {
2341         struct hash_zone *zone = from_timer(zone, t, timer);
2342
2343         if (change_timer_state(zone, DEDUPE_QUERY_TIMER_RUNNING,
2344                                DEDUPE_QUERY_TIMER_FIRED))
2345                 vdo_launch_completion(&zone->completion);
2346 }
2347
2348 static int __must_check initialize_zone(struct vdo *vdo, struct hash_zones *zones,
2349                                         zone_count_t zone_number)
2350 {
2351         int result;
2352         data_vio_count_t i;
2353         struct hash_zone *zone = &zones->zones[zone_number];
2354
2355         result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->hash_lock_map);
2356         if (result != VDO_SUCCESS)
2357                 return result;
2358
2359         vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2360         zone->zone_number = zone_number;
2361         zone->thread_id = vdo->thread_config.hash_zone_threads[zone_number];
2362         vdo_initialize_completion(&zone->completion, vdo, VDO_HASH_ZONE_COMPLETION);
2363         vdo_set_completion_callback(&zone->completion, timeout_index_operations_callback,
2364                                     zone->thread_id);
2365         INIT_LIST_HEAD(&zone->lock_pool);
2366         result = vdo_allocate(LOCK_POOL_CAPACITY, struct hash_lock, "hash_lock array",
2367                               &zone->lock_array);
2368         if (result != VDO_SUCCESS)
2369                 return result;
2370
2371         for (i = 0; i < LOCK_POOL_CAPACITY; i++)
2372                 return_hash_lock_to_pool(zone, &zone->lock_array[i]);
2373
2374         INIT_LIST_HEAD(&zone->available);
2375         INIT_LIST_HEAD(&zone->pending);
2376         result = vdo_make_funnel_queue(&zone->timed_out_complete);
2377         if (result != VDO_SUCCESS)
2378                 return result;
2379
2380         timer_setup(&zone->timer, timeout_index_operations, 0);
2381
2382         for (i = 0; i < MAXIMUM_VDO_USER_VIOS; i++) {
2383                 struct dedupe_context *context = &zone->contexts[i];
2384
2385                 context->zone = zone;
2386                 context->request.callback = finish_index_operation;
2387                 context->request.session = zones->index_session;
2388                 list_add(&context->list_entry, &zone->available);
2389         }
2390
2391         return vdo_make_default_thread(vdo, zone->thread_id);
2392 }
2393
2394 /** get_thread_id_for_zone() - Implements vdo_zone_thread_getter_fn. */
2395 static thread_id_t get_thread_id_for_zone(void *context, zone_count_t zone_number)
2396 {
2397         struct hash_zones *zones = context;
2398
2399         return zones->zones[zone_number].thread_id;
2400 }
2401
2402 /**
2403  * vdo_make_hash_zones() - Create the hash zones.
2404  *
2405  * @vdo: The vdo to which the zone will belong.
2406  * @zones_ptr: A pointer to hold the zones.
2407  *
2408  * Return: VDO_SUCCESS or an error code.
2409  */
2410 int vdo_make_hash_zones(struct vdo *vdo, struct hash_zones **zones_ptr)
2411 {
2412         int result;
2413         struct hash_zones *zones;
2414         zone_count_t z;
2415         zone_count_t zone_count = vdo->thread_config.hash_zone_count;
2416
2417         if (zone_count == 0)
2418                 return VDO_SUCCESS;
2419
2420         result = vdo_allocate_extended(struct hash_zones, zone_count, struct hash_zone,
2421                                        __func__, &zones);
2422         if (result != VDO_SUCCESS)
2423                 return result;
2424
2425         result = initialize_index(vdo, zones);
2426         if (result != VDO_SUCCESS) {
2427                 vdo_free(zones);
2428                 return result;
2429         }
2430
2431         vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NEW);
2432
2433         zones->zone_count = zone_count;
2434         for (z = 0; z < zone_count; z++) {
2435                 result = initialize_zone(vdo, zones, z);
2436                 if (result != VDO_SUCCESS) {
2437                         vdo_free_hash_zones(zones);
2438                         return result;
2439                 }
2440         }
2441
2442         result = vdo_make_action_manager(zones->zone_count, get_thread_id_for_zone,
2443                                          vdo->thread_config.admin_thread, zones, NULL,
2444                                          vdo, &zones->manager);
2445         if (result != VDO_SUCCESS) {
2446                 vdo_free_hash_zones(zones);
2447                 return result;
2448         }
2449
2450         *zones_ptr = zones;
2451         return VDO_SUCCESS;
2452 }
2453
2454 void vdo_finish_dedupe_index(struct hash_zones *zones)
2455 {
2456         if (zones == NULL)
2457                 return;
2458
2459         uds_destroy_index_session(vdo_forget(zones->index_session));
2460 }
2461
2462 /**
2463  * vdo_free_hash_zones() - Free the hash zones.
2464  * @zones: The zone to free.
2465  */
2466 void vdo_free_hash_zones(struct hash_zones *zones)
2467 {
2468         zone_count_t i;
2469
2470         if (zones == NULL)
2471                 return;
2472
2473         vdo_free(vdo_forget(zones->manager));
2474
2475         for (i = 0; i < zones->zone_count; i++) {
2476                 struct hash_zone *zone = &zones->zones[i];
2477
2478                 vdo_free_funnel_queue(vdo_forget(zone->timed_out_complete));
2479                 vdo_int_map_free(vdo_forget(zone->hash_lock_map));
2480                 vdo_free(vdo_forget(zone->lock_array));
2481         }
2482
2483         if (zones->index_session != NULL)
2484                 vdo_finish_dedupe_index(zones);
2485
2486         ratelimit_state_exit(&zones->ratelimiter);
2487         vdo_free(zones);
2488 }
2489
2490 static void initiate_suspend_index(struct admin_state *state)
2491 {
2492         struct hash_zones *zones = container_of(state, struct hash_zones, state);
2493         enum index_state index_state;
2494
2495         spin_lock(&zones->lock);
2496         index_state = zones->index_state;
2497         spin_unlock(&zones->lock);
2498
2499         if (index_state != IS_CLOSED) {
2500                 bool save = vdo_is_state_saving(&zones->state);
2501                 int result;
2502
2503                 result = uds_suspend_index_session(zones->index_session, save);
2504                 if (result != UDS_SUCCESS)
2505                         vdo_log_error_strerror(result, "Error suspending dedupe index");
2506         }
2507
2508         vdo_finish_draining(state);
2509 }
2510
2511 /**
2512  * suspend_index() - Suspend the UDS index prior to draining hash zones.
2513  *
2514  * Implements vdo_action_preamble_fn
2515  */
2516 static void suspend_index(void *context, struct vdo_completion *completion)
2517 {
2518         struct hash_zones *zones = context;
2519
2520         vdo_start_draining(&zones->state,
2521                            vdo_get_current_manager_operation(zones->manager), completion,
2522                            initiate_suspend_index);
2523 }
2524
2525 /**
2526  * initiate_drain() - Initiate a drain.
2527  *
2528  * Implements vdo_admin_initiator_fn.
2529  */
2530 static void initiate_drain(struct admin_state *state)
2531 {
2532         check_for_drain_complete(container_of(state, struct hash_zone, state));
2533 }
2534
2535 /**
2536  * drain_hash_zone() - Drain a hash zone.
2537  *
2538  * Implements vdo_zone_action_fn.
2539  */
2540 static void drain_hash_zone(void *context, zone_count_t zone_number,
2541                             struct vdo_completion *parent)
2542 {
2543         struct hash_zones *zones = context;
2544
2545         vdo_start_draining(&zones->zones[zone_number].state,
2546                            vdo_get_current_manager_operation(zones->manager), parent,
2547                            initiate_drain);
2548 }
2549
2550 /** vdo_drain_hash_zones() - Drain all hash zones. */
2551 void vdo_drain_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
2552 {
2553         vdo_schedule_operation(zones->manager, parent->vdo->suspend_type, suspend_index,
2554                                drain_hash_zone, NULL, parent);
2555 }
2556
2557 static void launch_dedupe_state_change(struct hash_zones *zones)
2558         __must_hold(&zones->lock)
2559 {
2560         /* ASSERTION: We enter with the lock held. */
2561         if (zones->changing || !vdo_is_state_normal(&zones->state))
2562                 /* Either a change is already in progress, or changes are not allowed. */
2563                 return;
2564
2565         if (zones->create_flag || (zones->index_state != zones->index_target)) {
2566                 zones->changing = true;
2567                 vdo_launch_completion(&zones->completion);
2568                 return;
2569         }
2570
2571         /* ASSERTION: We exit with the lock held. */
2572 }
2573
2574 /**
2575  * resume_index() - Resume the UDS index prior to resuming hash zones.
2576  *
2577  * Implements vdo_action_preamble_fn
2578  */
2579 static void resume_index(void *context, struct vdo_completion *parent)
2580 {
2581         struct hash_zones *zones = context;
2582         struct device_config *config = parent->vdo->device_config;
2583         int result;
2584
2585         zones->parameters.bdev = config->owned_device->bdev;
2586         result = uds_resume_index_session(zones->index_session, zones->parameters.bdev);
2587         if (result != UDS_SUCCESS)
2588                 vdo_log_error_strerror(result, "Error resuming dedupe index");
2589
2590         spin_lock(&zones->lock);
2591         vdo_resume_if_quiescent(&zones->state);
2592
2593         if (config->deduplication) {
2594                 zones->index_target = IS_OPENED;
2595                 WRITE_ONCE(zones->dedupe_flag, true);
2596         } else {
2597                 zones->index_target = IS_CLOSED;
2598         }
2599
2600         launch_dedupe_state_change(zones);
2601         spin_unlock(&zones->lock);
2602
2603         vdo_finish_completion(parent);
2604 }
2605
2606 /**
2607  * resume_hash_zone() - Resume a hash zone.
2608  *
2609  * Implements vdo_zone_action_fn.
2610  */
2611 static void resume_hash_zone(void *context, zone_count_t zone_number,
2612                              struct vdo_completion *parent)
2613 {
2614         struct hash_zone *zone = &(((struct hash_zones *) context)->zones[zone_number]);
2615
2616         vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state));
2617 }
2618
2619 /**
2620  * vdo_resume_hash_zones() - Resume a set of hash zones.
2621  * @zones: The hash zones to resume.
2622  * @parent: The object to notify when the zones have resumed.
2623  */
2624 void vdo_resume_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
2625 {
2626         if (vdo_is_read_only(parent->vdo)) {
2627                 vdo_launch_completion(parent);
2628                 return;
2629         }
2630
2631         vdo_schedule_operation(zones->manager, VDO_ADMIN_STATE_RESUMING, resume_index,
2632                                resume_hash_zone, NULL, parent);
2633 }
2634
2635 /**
2636  * get_hash_zone_statistics() - Add the statistics for this hash zone to the tally for all zones.
2637  * @zone: The hash zone to query.
2638  * @tally: The tally
2639  */
2640 static void get_hash_zone_statistics(const struct hash_zone *zone,
2641                                      struct hash_lock_statistics *tally)
2642 {
2643         const struct hash_lock_statistics *stats = &zone->statistics;
2644
2645         tally->dedupe_advice_valid += READ_ONCE(stats->dedupe_advice_valid);
2646         tally->dedupe_advice_stale += READ_ONCE(stats->dedupe_advice_stale);
2647         tally->concurrent_data_matches += READ_ONCE(stats->concurrent_data_matches);
2648         tally->concurrent_hash_collisions += READ_ONCE(stats->concurrent_hash_collisions);
2649         tally->curr_dedupe_queries += READ_ONCE(zone->active);
2650 }
2651
2652 static void get_index_statistics(struct hash_zones *zones,
2653                                  struct index_statistics *stats)
2654 {
2655         enum index_state state;
2656         struct uds_index_stats index_stats;
2657         int result;
2658
2659         spin_lock(&zones->lock);
2660         state = zones->index_state;
2661         spin_unlock(&zones->lock);
2662
2663         if (state != IS_OPENED)
2664                 return;
2665
2666         result = uds_get_index_session_stats(zones->index_session, &index_stats);
2667         if (result != UDS_SUCCESS) {
2668                 vdo_log_error_strerror(result, "Error reading index stats");
2669                 return;
2670         }
2671
2672         stats->entries_indexed = index_stats.entries_indexed;
2673         stats->posts_found = index_stats.posts_found;
2674         stats->posts_not_found = index_stats.posts_not_found;
2675         stats->queries_found = index_stats.queries_found;
2676         stats->queries_not_found = index_stats.queries_not_found;
2677         stats->updates_found = index_stats.updates_found;
2678         stats->updates_not_found = index_stats.updates_not_found;
2679         stats->entries_discarded = index_stats.entries_discarded;
2680 }
2681
2682 /**
2683  * vdo_get_dedupe_statistics() - Tally the statistics from all the hash zones and the UDS index.
2684  * @hash_zones: The hash zones to query
2685  *
2686  * Return: The sum of the hash lock statistics from all hash zones plus the statistics from the UDS
2687  *         index
2688  */
2689 void vdo_get_dedupe_statistics(struct hash_zones *zones, struct vdo_statistics *stats)
2690
2691 {
2692         zone_count_t zone;
2693
2694         for (zone = 0; zone < zones->zone_count; zone++)
2695                 get_hash_zone_statistics(&zones->zones[zone], &stats->hash_lock);
2696
2697         get_index_statistics(zones, &stats->index);
2698
2699         /*
2700          * zones->timeouts gives the number of timeouts, and dedupe_context_busy gives the number
2701          * of queries not made because of earlier timeouts.
2702          */
2703         stats->dedupe_advice_timeouts =
2704                 (atomic64_read(&zones->timeouts) + atomic64_read(&zones->dedupe_context_busy));
2705 }
2706
2707 /**
2708  * vdo_select_hash_zone() - Select the hash zone responsible for locking a given record name.
2709  * @zones: The hash_zones from which to select.
2710  * @name: The record name.
2711  *
2712  * Return: The hash zone responsible for the record name.
2713  */
2714 struct hash_zone *vdo_select_hash_zone(struct hash_zones *zones,
2715                                        const struct uds_record_name *name)
2716 {
2717         /*
2718          * Use a fragment of the record name as a hash code. Eight bits of hash should suffice
2719          * since the number of hash zones is small.
2720          * TODO: Verify that the first byte is independent enough.
2721          */
2722         u32 hash = name->name[0];
2723
2724         /*
2725          * Scale the 8-bit hash fragment to a zone index by treating it as a binary fraction and
2726          * multiplying that by the zone count. If the hash is uniformly distributed over [0 ..
2727          * 2^8-1], then (hash * count / 2^8) should be uniformly distributed over [0 .. count-1].
2728          * The multiply and shift is much faster than a divide (modulus) on X86 CPUs.
2729          */
2730         hash = (hash * zones->zone_count) >> 8;
2731         return &zones->zones[hash];
2732 }
2733
2734 /**
2735  * dump_hash_lock() - Dump a compact description of hash_lock to the log if the lock is not on the
2736  *                    free list.
2737  * @lock: The hash lock to dump.
2738  */
2739 static void dump_hash_lock(const struct hash_lock *lock)
2740 {
2741         const char *state;
2742
2743         if (!list_empty(&lock->pool_node)) {
2744                 /* This lock is on the free list. */
2745                 return;
2746         }
2747
2748         /*
2749          * Necessarily cryptic since we can log a lot of these. First three chars of state is
2750          * unambiguous. 'U' indicates a lock not registered in the map.
2751          */
2752         state = get_hash_lock_state_name(lock->state);
2753         vdo_log_info("  hl %px: %3.3s %c%llu/%u rc=%u wc=%zu agt=%px",
2754                      lock, state, (lock->registered ? 'D' : 'U'),
2755                      (unsigned long long) lock->duplicate.pbn,
2756                      lock->duplicate.state, lock->reference_count,
2757                      vdo_waitq_num_waiters(&lock->waiters), lock->agent);
2758 }
2759
2760 static const char *index_state_to_string(struct hash_zones *zones,
2761                                          enum index_state state)
2762 {
2763         if (!vdo_is_state_normal(&zones->state))
2764                 return SUSPENDED;
2765
2766         switch (state) {
2767         case IS_CLOSED:
2768                 return zones->error_flag ? ERROR : CLOSED;
2769         case IS_CHANGING:
2770                 return zones->index_target == IS_OPENED ? OPENING : CLOSING;
2771         case IS_OPENED:
2772                 return READ_ONCE(zones->dedupe_flag) ? ONLINE : OFFLINE;
2773         default:
2774                 return UNKNOWN;
2775         }
2776 }
2777
2778 /**
2779  * dump_hash_zone() - Dump information about a hash zone to the log for debugging.
2780  * @zone: The zone to dump.
2781  */
2782 static void dump_hash_zone(const struct hash_zone *zone)
2783 {
2784         data_vio_count_t i;
2785
2786         if (zone->hash_lock_map == NULL) {
2787                 vdo_log_info("struct hash_zone %u: NULL map", zone->zone_number);
2788                 return;
2789         }
2790
2791         vdo_log_info("struct hash_zone %u: mapSize=%zu",
2792                      zone->zone_number, vdo_int_map_size(zone->hash_lock_map));
2793         for (i = 0; i < LOCK_POOL_CAPACITY; i++)
2794                 dump_hash_lock(&zone->lock_array[i]);
2795 }
2796
2797 /**
2798  * vdo_dump_hash_zones() - Dump information about the hash zones to the log for debugging.
2799  * @zones: The zones to dump.
2800  */
2801 void vdo_dump_hash_zones(struct hash_zones *zones)
2802 {
2803         const char *state, *target;
2804         zone_count_t zone;
2805
2806         spin_lock(&zones->lock);
2807         state = index_state_to_string(zones, zones->index_state);
2808         target = (zones->changing ? index_state_to_string(zones, zones->index_target) : NULL);
2809         spin_unlock(&zones->lock);
2810
2811         vdo_log_info("UDS index: state: %s", state);
2812         if (target != NULL)
2813                 vdo_log_info("UDS index: changing to state: %s", target);
2814
2815         for (zone = 0; zone < zones->zone_count; zone++)
2816                 dump_hash_zone(&zones->zones[zone]);
2817 }
2818
2819 void vdo_set_dedupe_index_timeout_interval(unsigned int value)
2820 {
2821         u64 alb_jiffies;
2822
2823         /* Arbitrary maximum value is two minutes */
2824         if (value > 120000)
2825                 value = 120000;
2826         /* Arbitrary minimum value is 2 jiffies */
2827         alb_jiffies = msecs_to_jiffies(value);
2828
2829         if (alb_jiffies < 2) {
2830                 alb_jiffies = 2;
2831                 value = jiffies_to_msecs(alb_jiffies);
2832         }
2833         vdo_dedupe_index_timeout_interval = value;
2834         vdo_dedupe_index_timeout_jiffies = alb_jiffies;
2835 }
2836
2837 void vdo_set_dedupe_index_min_timer_interval(unsigned int value)
2838 {
2839         u64 min_jiffies;
2840
2841         /* Arbitrary maximum value is one second */
2842         if (value > 1000)
2843                 value = 1000;
2844
2845         /* Arbitrary minimum value is 2 jiffies */
2846         min_jiffies = msecs_to_jiffies(value);
2847
2848         if (min_jiffies < 2) {
2849                 min_jiffies = 2;
2850                 value = jiffies_to_msecs(min_jiffies);
2851         }
2852
2853         vdo_dedupe_index_min_timer_interval = value;
2854         vdo_dedupe_index_min_timer_jiffies = min_jiffies;
2855 }
2856
2857 /**
2858  * acquire_context() - Acquire a dedupe context from a hash_zone if any are available.
2859  * @zone: the hash zone
2860  *
2861  * Return: A dedupe_context or NULL if none are available
2862  */
2863 static struct dedupe_context * __must_check acquire_context(struct hash_zone *zone)
2864 {
2865         struct dedupe_context *context;
2866         struct funnel_queue_entry *entry;
2867
2868         assert_in_hash_zone(zone, __func__);
2869
2870         if (!list_empty(&zone->available)) {
2871                 WRITE_ONCE(zone->active, zone->active + 1);
2872                 context = list_first_entry(&zone->available, struct dedupe_context,
2873                                            list_entry);
2874                 list_del_init(&context->list_entry);
2875                 return context;
2876         }
2877
2878         entry = vdo_funnel_queue_poll(zone->timed_out_complete);
2879         return ((entry == NULL) ?
2880                 NULL : container_of(entry, struct dedupe_context, queue_entry));
2881 }
2882
2883 static void prepare_uds_request(struct uds_request *request, struct data_vio *data_vio,
2884                                 enum uds_request_type operation)
2885 {
2886         request->record_name = data_vio->record_name;
2887         request->type = operation;
2888         if ((operation == UDS_POST) || (operation == UDS_UPDATE)) {
2889                 size_t offset = 0;
2890                 struct uds_record_data *encoding = &request->new_metadata;
2891
2892                 encoding->data[offset++] = UDS_ADVICE_VERSION;
2893                 encoding->data[offset++] = data_vio->new_mapped.state;
2894                 put_unaligned_le64(data_vio->new_mapped.pbn, &encoding->data[offset]);
2895                 offset += sizeof(u64);
2896                 BUG_ON(offset != UDS_ADVICE_SIZE);
2897         }
2898 }
2899
2900 /*
2901  * The index operation will inquire about data_vio.record_name, providing (if the operation is
2902  * appropriate) advice from the data_vio's new_mapped fields. The advice found in the index (or
2903  * NULL if none) will be returned via receive_data_vio_dedupe_advice(). dedupe_context.status is
2904  * set to the return status code of any asynchronous index processing.
2905  */
2906 static void query_index(struct data_vio *data_vio, enum uds_request_type operation)
2907 {
2908         int result;
2909         struct dedupe_context *context;
2910         struct vdo *vdo = vdo_from_data_vio(data_vio);
2911         struct hash_zone *zone = data_vio->hash_zone;
2912
2913         assert_data_vio_in_hash_zone(data_vio);
2914
2915         if (!READ_ONCE(vdo->hash_zones->dedupe_flag)) {
2916                 continue_data_vio(data_vio);
2917                 return;
2918         }
2919
2920         context = acquire_context(zone);
2921         if (context == NULL) {
2922                 atomic64_inc(&vdo->hash_zones->dedupe_context_busy);
2923                 continue_data_vio(data_vio);
2924                 return;
2925         }
2926
2927         data_vio->dedupe_context = context;
2928         context->requestor = data_vio;
2929         context->submission_jiffies = jiffies;
2930         prepare_uds_request(&context->request, data_vio, operation);
2931         atomic_set(&context->state, DEDUPE_CONTEXT_PENDING);
2932         list_add_tail(&context->list_entry, &zone->pending);
2933         start_expiration_timer(context);
2934         result = uds_launch_request(&context->request);
2935         if (result != UDS_SUCCESS) {
2936                 context->request.status = result;
2937                 finish_index_operation(&context->request);
2938         }
2939 }
2940
2941 static void set_target_state(struct hash_zones *zones, enum index_state target,
2942                              bool change_dedupe, bool dedupe, bool set_create)
2943 {
2944         const char *old_state, *new_state;
2945
2946         spin_lock(&zones->lock);
2947         old_state = index_state_to_string(zones, zones->index_target);
2948         if (change_dedupe)
2949                 WRITE_ONCE(zones->dedupe_flag, dedupe);
2950
2951         if (set_create)
2952                 zones->create_flag = true;
2953
2954         zones->index_target = target;
2955         launch_dedupe_state_change(zones);
2956         new_state = index_state_to_string(zones, zones->index_target);
2957         spin_unlock(&zones->lock);
2958
2959         if (old_state != new_state)
2960                 vdo_log_info("Setting UDS index target state to %s", new_state);
2961 }
2962
2963 const char *vdo_get_dedupe_index_state_name(struct hash_zones *zones)
2964 {
2965         const char *state;
2966
2967         spin_lock(&zones->lock);
2968         state = index_state_to_string(zones, zones->index_state);
2969         spin_unlock(&zones->lock);
2970
2971         return state;
2972 }
2973
2974 /* Handle a dmsetup message relevant to the index. */
2975 int vdo_message_dedupe_index(struct hash_zones *zones, const char *name)
2976 {
2977         if (strcasecmp(name, "index-close") == 0) {
2978                 set_target_state(zones, IS_CLOSED, false, false, false);
2979                 return 0;
2980         } else if (strcasecmp(name, "index-create") == 0) {
2981                 set_target_state(zones, IS_OPENED, false, false, true);
2982                 return 0;
2983         } else if (strcasecmp(name, "index-disable") == 0) {
2984                 set_target_state(zones, IS_OPENED, true, false, false);
2985                 return 0;
2986         } else if (strcasecmp(name, "index-enable") == 0) {
2987                 set_target_state(zones, IS_OPENED, true, true, false);
2988                 return 0;
2989         }
2990
2991         return -EINVAL;
2992 }
2993
2994 void vdo_set_dedupe_state_normal(struct hash_zones *zones)
2995 {
2996         vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2997 }
2998
2999 /* If create_flag, create a new index without first attempting to load an existing index. */
3000 void vdo_start_dedupe_index(struct hash_zones *zones, bool create_flag)
3001 {
3002         set_target_state(zones, IS_OPENED, true, true, create_flag);
3003 }
This page took 0.210698 seconds and 4 git commands to generate.