Wednesday, May 4, 2016

NFS Duplicate request hash

This post is about a rather technical point - the performance of the duplicate request cache in NFSv3 implementation for Illumos OS. The duplicate request cache was invented by Chet Juszczak in 1989 and pretty much any NFSv3 implementation uses some form of it. A short overview says:

The typical NFS version 3 protocol failure recovery model uses client time-out and retry to handle server crashes, network partitions, and lost server replies. A retried request is called a duplicate of the original.

When used in a file server context, the term idempotent can be used to distinguish between operation types. An idempotent request is one that a server can perform more than once with equivalent results (though it may in fact change, as a side effect, the access time on a file, say for READ). Some NFS operations are obviously non-idempotent. They cannot be reprocessed without special attention simply because they may fail if tried a second time. The CREATE request, for example, can be used to create a file for which the owner does not have write permission. A duplicate of this request cannot succeed if the original succeeded. Likewise, a file can be removed only once.

The side effects caused by performing a duplicate non-idempotent request can be destructive (for example, a truncate operation causing lost writes). The combination of a stateless design with the common choice of an unreliable network transport (UDP) implies the possibility of destructive replays of non-idempotent requests. Though to be more accurate, it is the inherent stateless design of the NFS version 3 protocol on top of an unreliable RPC mechanism that yields the possibility of destructive replays of non-idempotent requests, since even in an implementation of the NFS version 3 protocol over a reliable connection-oriented transport, a connection break with automatic reestablishment requires duplicate request processing (the client will retransmit the request, and the server needs to deal with a potential duplicate non-idempotent request).

The original Solaris/Illumos implementation is reasonable but is has several problems under seriously bigger workloads of today:

  • The cache size is static and should be determined up-front when the system boots. There is a documented tuneable rpcmod:maxdupreqs that can be tweaked. Its default value is 1024 which is way too small for modern applications. There is no clear idea about the optimal settings and values of 4096 or 8192 are suggested.
  • When the rate of the duplicate requests causes cache drops, performance drops significantly.
  • The implementation doesn't scale well over many CPUs typical in modern systems. It uses a single global lock to protect the hash table.

We can actually improve the implementation using some newer ideas and features available in Illumos - mostly by using the kmem caching facility of the Illumos kernel memory allocator. The idea is to increase the maximum cache size a lot and to keep entries in the cache for certain time if possible. There are two main parameters:

  • cotsmaxdupreqs is the maximum number of cached items. Should be much larger then the number of NFS daemon threads. The default value is 32768.
  • cots_dupreq_lifetime is the request time to live in the dup cache - should be enough for normal RPC timeouts.
The implementation tries to keep cached entries for cots_dupreq_lifetime but it also keeps total number of allocated entries within cotsmaxdupreqs.

Here is the description of the algorithm:


  • When a new request comes in, it is always placed in a cache (even if total number of entries exceeds  cotsmaxdupreqs)and is marked as DUP_INPROGRESS. Once the request is processed by the underlying file system it is either marked as DUP_DONE (meaning that its results can be cached) or DUP_DROP (meaning that it can't be cached). If the request is marked as DUP_DONE and the total number of cached request doesn't exceed MAXDUPREQS, we keep it in the cache. If the total maximum cache size is reached, we drop the response.
  • We combine walk of the cache bucket chain to find the entry with freeing expired entries. Also there is a separate GC process that cleans up expired entries periodically.
  • The dup cache has an interesting property - most of the checks are misses and hits are very rare. So cache miss is a fast path.
Here is the actual code snippet:


/* 
 * svc_cots_kdup searches the request cache and returns 0 if the 
 * request is not found in the cache.  If it is found, then it 
 * returns the state of the request (in progress or done) and 
 * the status or attributes that were part of the original reply. 
 *
 * If DUP_DONE (there is a duplicate) svc_cots_kdup copies over the 
 * value of the response. In that case, also return in *dupcachedp
 * whether the response free routine is cached in the dupreq - in which case
 * the caller should not be freeing it, because it will be done later
 * in the svc_cots_kdup code when the dupreq is reused.
 *
 */
static int
svc_cots_kdup(struct svc_req *req, caddr_t res, int size, struct dupreq **drpp,
   bool_t *dupcachedp)
{
   struct rpc_cots_server *stats = CLONE2STATS(req->rq_xprt);
   dupreq_t *dr = NULL;
   dupreq_t *dr_next = NULL;
   uint32_t xid = REQTOXID(req);
   uint32_t drhash;
   int status;
   clock_t now = ddi_get_lbolt();
   dupreq_t *head;
   int nfreed;
   dupcache_bucket_t *bucket = &cots_duphash[XIDHASH(xid)];
   list_t *bucket_list = &bucket->dc_entries;
   kmutex_t *bucket_lock = &bucket->dc_lock;

   RSSTAT_INCR(stats, rsdupchecks);
   stats->rsentries.value.ui64 = cotsndupreqs;
   stats->rsoverflow.value.ui64 = cots_overflows;
   stats->rspending.value.ui64 = cots_inprogress;
   stats->rsclifetime.value.ui64 = cots_cache_lifetime;

   /*
    * Allocate a new entry outside the lock.
    * We only need the new entry if there isn't one already present, but
    * usually this is the case.
    */
   dupreq_t *dr_new = dr_alloc(req, size);
   if (dr_new == NULL) {
      RSSTAT_INCR(stats, rsnomem);
      return (DUP_ERROR);
   }

   dr_new->dr_created = now;

   /*
    * Check to see whether an entry already exists in the cache.
    */
   mutex_enter(bucket_lock);
   dr = list_head(bucket_list);
   while (dr != NULL) {
      dr_next = list_next(bucket_list, dr);
      status = dr->dr_status;
      /*
       * Remove expired DUP_DONE entries
       */
      if ((status == DUP_DONE) &&
          (dr->dr_created + cots_dupreq_lifetime < now)) {
         atomic_add_64(&cots_cache_lifetime,
            now - dr->dr_created);
         list_remove(bucket_list, dr);
         dr_free(dr);
      } else if (cots_compare(req, dr)) {
         if (status == DUP_DONE) {
            bcopy(dr->dr_resp.buf, res, size);
            if (dupcachedp != NULL &&
                dr->dr_resfree != NULL) {
               *dupcachedp = B_TRUE;
            }
         } else {
            *drpp = dr;
            RSSTAT_INCR(stats, rsinprogress);
         }
         /*
          * Client retried this request so it is a good chance
          * that it will retry again, so keep this entry in the
          * cache a bit longer = move it to the end of line.
          */
         atomic_add_64(&cots_cache_lifetime,
            now - dr->dr_created);
         dr->dr_created = now;
         list_remove(bucket_list, dr);
         list_insert_tail(bucket_list, dr);
         mutex_exit(bucket_lock);
         RSSTAT_INCR(stats, rsdupreqs);
         dr_free(dr_new);
         return (status);
      }
      dr = dr_next;
   }

   /*
    * Entry not found in the cache so it is a new request.
    */
   dr = dr_new;
   list_insert_tail(bucket_list, dr);
   mutex_exit(bucket_lock);

   atomic_inc_32(&cots_inprogress);
   *drpp = dr;
   return (DUP_NEW);
}

/*
 * svc_cots_kdupdone marks the request done (DUP_DONE)
 * and stores the response.
 */
static void
svc_cots_kdupdone(struct dupreq *dr, caddr_t res, void (*dis_resfree)(),
   int size, int status)
{
   uint32_t drhash = (uint32_t)DRHASH(dr);
   dupcache_bucket_t *bucket = &cots_duphash[drhash];
   list_t *bucket_list = &bucket->dc_entries;
   kmutex_t *bucket_lock = &bucket->dc_lock;


   ASSERT(dr->dr_resfree == NULL);
   ASSERT(dr->dr_status == DUP_INPROGRESS);
   atomic_dec_32(&cots_inprogress);
   mutex_enter(bucket_lock);
   if (status != DUP_DONE) {
      dr->dr_status = status;
   } else {
      dr->dr_resfree = dis_resfree;
      bcopy(res, dr->dr_resp.buf, size);
      if (cotsndupreqs < cotsmaxdupreqs) {
         dr->dr_status = DUP_DONE;
         mutex_exit(bucket_lock);
         return;
      }
      /*
       * Cache is full. Try replacing the oldest entry in the bucket
       * If this isn't possible, don't bother and just purge the
       * current entry
       */
      atomic_inc_32(&cots_overflows);
      dupreq_t *head = list_head(bucket_list);
      if (head != NULL && head->dr_status == DUP_DONE) {
         /* Cut the head off */         atomic_add_64(&cots_cache_lifetime,
            ddi_get_lbolt() - head->dr_created);
         list_remove_head(bucket_list);
         /*
          * dr stays in the cache
          */
         dr->dr_status = DUP_DONE;
         mutex_exit(bucket_lock);
         dr_free(head);
         return;
      }

   }
   atomic_add_64(&cots_cache_lifetime,
      ddi_get_lbolt() - dr->dr_created);
   list_remove(bucket_list, dr);
   mutex_exit(bucket_lock);
   dr_free(dr);
}

/*
 * Allocate a new dupreq structure which has enough space to hold the request
 *  and response data
 */
static dupreq_t *
dr_alloc(struct svc_req *req, int resp_size)
{
   dupreq_t *dr = kmem_cache_alloc(dupreq_cache, KM_NOSLEEP);
   if (dr == NULL) {
      return (NULL);
   }

   if (dr->dr_addr.buf == NULL ||
       dr->dr_addr.maxlen < req->rq_xprt->xp_rtaddr.len) {
      if (dr->dr_addr.buf != NULL) {
         kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen);
         dr->dr_addr.buf = NULL;
      }
      dr->dr_addr.maxlen = req->rq_xprt->xp_rtaddr.len;
      dr->dr_addr.buf = kmem_alloc(dr->dr_addr.maxlen, KM_NOSLEEP);
      if (dr->dr_addr.buf == NULL) {
         dr->dr_addr.maxlen = 0;
         kmem_cache_free(dupreq_cache, dr);
         return (NULL);
      }
   }

   if (dr->dr_resp.buf == NULL ||
       dr->dr_resp.maxlen < resp_size) {
      if (dr->dr_resp.buf != NULL) {
         kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen);
         dr->dr_resp.buf = NULL;
      }
      dr->dr_resp.maxlen = (unsigned int)resp_size;
      dr->dr_resp.buf = kmem_alloc(resp_size, KM_NOSLEEP);
      if (dr->dr_resp.buf == NULL) {
         dr->dr_resp.maxlen = 0;
         kmem_cache_free(dupreq_cache, dr);
         return (NULL);
      }
   }

   dr->dr_xid = REQTOXID(req);
   dr->dr_prog = req->rq_prog;
   dr->dr_vers = req->rq_vers;
   dr->dr_proc = req->rq_proc;
   dr->dr_addr.len = req->rq_xprt->xp_rtaddr.len;
   bcopy(req->rq_xprt->xp_rtaddr.buf, dr->dr_addr.buf,
      dr->dr_addr.len);
   dr->dr_status = DUP_INPROGRESS;

   atomic_inc_32(&cotsndupreqs);
   return (dr);
}

/*
 * Cache garbage collection. Called to cleanup cache entries periodically.
 * Note that entries are also cleaned up in svc_cots_kdup().
 */
static void 
dupreq_gc(void *arg)
{
   int i;
   clock_t now = ddi_get_lbolt();

   /*
    * Walk through every bucket and free all expired entries
    */
   for (i = 0; i < DRHASHSZ; i++) {
      dupcache_bucket_t *bucket = &cots_duphash[i];
      list_t *bucket_list = &bucket->dc_entries;
      kmutex_t *bucket_lock = &bucket->dc_lock;
      dupreq_t *dr;

      mutex_enter(bucket_lock);
      dr = list_head(bucket_list);
      while ((dr != NULL) &&
         (dr->dr_created + cots_dupreq_lifetime < now)) {
         dupreq_t *dr_next = list_next(bucket_list, dr);
         int status = dr->dr_status;
         if (status == DUP_DONE) {
            atomic_add_64(&cots_cache_lifetime,
               now - dr->dr_created);
            list_remove(bucket_list, dr);
            dr_free(dr);
            atomic_inc_32(&cots_gc);
         }
         dr = dr_next;
      }
      mutex_exit(bucket_lock);
   }
   (void) timeout(dupreq_gc, NULL, cots_dupreq_lifetime);
}

static void
dr_free(dupreq_t *dr)
{
   atomic_dec_32(&cotsndupreqs);
   /* If There is custom free routine for the response data call it */
   if (dr->dr_resfree != NULL) {
      (*dr->dr_resfree)(dr->dr_resp.buf);
      dr->dr_resfree = NULL;
   }
   dr->dr_xid = -1;
   dr->dr_proc = 0;
   dr->dr_vers = 0;
   dr->dr_prog = 0;
   dr->dr_status = DUP_INPROGRESS;
   kmem_cache_free(dupreq_cache, dr);
}

/*ARGSUSED*/static int
dupreq_constructor(void *buf, void *cdrarg, int kmflags)
{
   dupreq_t *dr = buf;
   bzero(dr, sizeof (dupreq_t));
   dr->dr_xid = -1;
   dr->dr_status = DUP_INPROGRESS;
   return (0);
}

/*
 * Free cached address and response buffers 
 */
static void
dupreq_destructor(void *buf, void *cdrarg)
{
   dupreq_t *dr = buf;
   if (dr->dr_addr.buf != NULL)
      kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen);
   dr->dr_addr.buf = NULL;
   if (dr->dr_resp.buf != NULL)
      kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen);
   dr->dr_resp.buf = NULL;
}