Alex Kolbasov technology blog
Tuesday, July 10, 2018
Anatomy of a Cinder driver
In this post we'll discuss how to implement a full NFS-based Cinder driver for world-famous ACME NFS appliance. This driver is different from many other similar drivers in that it maintains a separate NFS share per each volume. Most (if not all) other NFS drivers keep all volumes mounted under a fixed shares.
We will assume that our appliance can create share snapshots and clones. The driver fully supports snapshots and clones.
Since the driver can manage many volumes, it doesn't attempt to mount all of them on the cinder host. Instead a volume is mounted only when it is needed and then immediately unmounted.Note that it is a major difference from other NFS-based drivers.
Note that Cinder sometimes mounts volumes by itself (for example, when it creates a volume from an image). It does such mount using remotefs brick. To avoid dangling mounts we do the following:
Note that we are not using any of the existing NFS drivers.
We will use a few global variables:
We will assume that our appliance can create share snapshots and clones. The driver fully supports snapshots and clones.
Since the driver can manage many volumes, it doesn't attempt to mount all of them on the cinder host. Instead a volume is mounted only when it is needed and then immediately unmounted.Note that it is a major difference from other NFS-based drivers.
Note that Cinder sometimes mounts volumes by itself (for example, when it creates a volume from an image). It does such mount using remotefs brick. To avoid dangling mounts we do the following:
- Use the same remotefs brick for our mounts to use the same mountpoints. As a result, if a volume is already mounted by a driver, it wouldn't be mounted again by a remotefs brick.
- We explicitly mount volume in create_export() and unmount it in remove_export(). As a result, Cinder will skip already mounted volume and it will be properly unmounted when remove_export is called.
- We explicitly unmount a volume on delete. This is safe since unmounting a volume that isn't mounted is fine.
- 'id' is the acme share ID corresponding to the volume
- 'provider_location' is the share NFS export in a format acceptable for the mount.nfs command, e.g. 1.2.3.4:/foo/bar
So let's look at the actual code.
We'll need a bunch of imports.
import re
from os import path, rename
import errno
# remotefs location changed between releases
try:
from os_brick.remotefs import remotefs as remotefs_brick
except Exception:
from cinder.brick.remotefs import remotefs as remotefs_brick
from oslo_concurrency import processutils as putils
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import units
from cinder import exception, utils, context
from cinder.i18n import _LI, _LW, _
from cinder.image import image_utils
from cinder.volume import driver
from cinder.volume import utils as volume_utils
VERSION = '1.0.0'LOG = logging.getLogger(__name__)Now let's define some configuration options.
acme_OPTS = [ cfg.StrOpt('nas_user', default='', help='User name for NAS administration'), cfg.StrOpt('nas_password', default='', help='User password for NAS administration'), cfg.BoolOpt('nfs_sparsed_volumes', default=True, help=('Create volumes as sparsed files which take no space.' 'If set to False volume is created as regular file.' 'In such case volume creation takes a lot of time.')), cfg.StrOpt('nfs_mount_point_base', default='$state_path/mnt', help='Base dir containing mount points for NFS shares.'), cfg.StrOpt('nfs_mount_options', help=('Mount options passed to the NFS client. See section ' 'of the NFS man page for details.')), cfg.IntOpt('nfs_mount_attempts', default=3, help=('The number of attempts to mount NFS shares before ' 'raising an error. At least one attempt will be ' 'made to mount an NFS share, regardless of the ' 'value specified.')), cfg.StrOpt('nas_secure_file_operations', default='auto', help=('Allow network-attached storage systems to operate in a ' 'secure environment where root level access is not ' 'permitted. If set to False, access is as the root user ' 'and insecure. If set to True, access is not as root. ' 'If set to auto, a check is done to determine if this is ' 'a new installation: True is used if so, otherwise ' 'False. Default is auto.')), cfg.StrOpt('nas_secure_file_permissions', default='auto', help=('Set more secure file permissions on network-attached ' 'storage volume files to restrict broad other/world ' 'access. If set to False, volumes are created with open ' 'permissions. If set to True, volumes are created with ' 'permissions for the cinder user and group (660). If ' 'set to auto, a check is done to determine if ' 'this is a new installation: True is used if so, ' 'otherwise False. Default is auto.')),
CONF = cfg.CONF
CONF.register_opts(acme_OPTS)
Our class will inherit from a few other Cinder base classes.Note that we are not using any of the existing NFS drivers.
class acmeNfsDriver(driver.ExtendVD, driver.LocalVD, driver.TransferVD,
driver.BaseVD):
"""
acme NFS Driver
"""
volume_backend_name = 'acme_NFS'
protocol = driver_prefix = driver_volume_type = 'nfs'
VERSION = VERSION
VOLUME_FILE_NAME = 'volume'
__VENDOR = 'ACME STORAGE, Inc'
__VOLUME_PREFIX = 'cinder_'
__NFS_OPTIONS = 'v3'
The init() method is pretty simple:def __init__(self, execute=putils.execute, *args, **kwargs): self.__root_helper = utils.get_root_helper() self.__execute_as_root = True super(acmeNfsDriver, self).__init__(execute, *args, **kwargs) if self.configuration: self.configuration.append_config_values(acme_OPTS) self.base = path.realpath(getattr(self.configuration, 'nfs_mount_point_base', CONF.nfs_mount_point_base)) self._sparse_copy_volume_data = True self.reserved_percentage = self.configuration.reserved_percentage self.max_over_subscription_ratio = ( self.configuration.max_over_subscription_ratio) self.shares = {} self.__mount_paths = {}
We need to define do_setup method which wouldn't be doing anything useful here.
def do_setup(self, ctx): """acme driver initialization"""
pass
We can also check for any setup errors which we skip for simplicity:
def check_for_setup_error(self): """Check for any setup errors This method is called immediately after do_setup() """ # Perform any necessary checkspassDrivers usually define local_path() but it isn't clear who is actually using it.def local_path(self, volume): """ Return path to the volume on a local file system (from LocalVD) NOTE: It isn't clear who actually uses it. :param volume: Volume reference :type volume: cinder.objects.volume.Volume :returns: path to the volume """ return path.join(self.__get_mount_point(volume), volume['name'])
Creating and destroying the volume
Now we come to the important part - creating the volume.
def create_volume(self, volume): """Create volume When the volume is created, it has the following attributes: - proider_location: host:/path used to mount the share - provider_id: acme share UUID for the volume Every Cinder NFS volume is backed by its own acme NFS Share. The share has only the volume file in it. :param volume: volume object :type volume: cinder.objects.volume.Volume :returns: volumeDict -- dictionary of volume attributes """ LOG.debug(_LI('acme: Creating volume %s'), volume.id) # Create acme share for this volume share_name = self._get_volume_name(volume) uri = volume['provider_location'] share_uuid = None # Create backing acme share share_uuid, uri = create_acme_share(share_name) if not uri: raise exception.VolumeBackendAPIException( data='Missing share URI information') # Store share UUID in provider_id field addr, share_path = self._parse_uri(uri[0]) volume['provider_location'] = addr + ":" + share_path volume['provider_id'] = share_uuid # Create backing file try: self._mount(volume) volume_path = self.local_path(volume) self._create_sparse_file(volume_path, volume['size']) except Exception as me: LOG.warning(_LW('acme: failed create volume: %s'), me) # Failed to mount, try to delete a share try: pool_ops.delete_share(share_uuid) except Exception as e: LOG.warning(_LW('acme: failed to destroy share: %s'), e) raise try: self._set_rw_permissions(volume_path) except Exception as e: LOG.warning(_LW('acme: failed set permissions: %s'), e) # Unmount volume - we don't need it any more try: self._unmount(volume) except Exception as e: LOG.warning(_LW('acme: failed to unmount volume: %s'), e) LOG.debug(_LI('acme: Created volume %s'), volume.id) return {'provider_location': volume['provider_location'], 'provider_id': share_uuid, }
And the counterpart - deleting the volume:
def delete_volume(self, volume): """Delete Cinder volume If volume has snapshots we attemot to delete them as well. :param volume: volume to delete :type volume: cinder.objects.volume.Volume """ # Unmount the volume if it is mounted for some reason # noinspection PyBroadException LOG.debug(_LI('acme: delete_volue(%s)'), volume.id) # Unmount the volume if someone left it in the mounted state try: self._unmount(volume) except Exception as e: LOG.warning(_LW('acme: failed to unmount volume %s: %s'), volume.id, e) share_uuid = volume['provider_id'] # Are there any snapshots? If there are, attempt to delete them try: snapshots = acme_list_share_snapshots(share_uuid) except Exception as e: LOG.warning(_LW('acme: failed to get snapshot list for %s: %s'), share_uuid, e) else: # Attempt to delete snapshots. If this fails we can't destroy a # volume for s in snapshots: try: LOG.debug(_LI('acme: delete snapshot %s for share %s'), s, share_uuid) acme_delete_snapshot(share_uuid, s) except Exception as e: LOG.warning(_LW('acme: failed to delete snapshot %s'), e) raise exception.VolumeBackendAPIException(data=str(e)) # Attempt to delete share from the acme appliance try: acme_delete_share(share_uuid) except Exception as e: LOG.warning(_LW('acme: failed to delete share: %s'), e) LOG.debug(_LI('acme: Deleted volume %s'), volume.id)
Connecting to Nova instance
The following code is called when the driver is attached to a Nova instance:
def initialize_connection(self, volume, connector, initiator_data=None): """ Allow connection to connector and return connection info. This method is called when a volume is attached to a Nova instance or when a volume is used as a source for another volume. :param volume: volume reference :param connector: connector reference """ data = {'export': volume['provider_location'], 'name': volume['name'], 'options': '-o v3', } return {'driver_volume_type': self.driver_volume_type, 'data': data, 'mount_point_base': self._base(volume), }
Snapshots and clones
Creating a snapshot
def create_snapshot(self, snapshot): """ Create snapshot We store snapshot UUID in the provider_id field :param snapshot: Snapshot """ LOG.debug(_LI('acme: create_snapshot(%s)'), snapshot) volume = self.__get_snapshot_volume(snapshot) try: uuid = acme_create_share_snapshot(volume['provider_id'], snapshot['name']) except Exception as e: LOG.warning(_LW('acme: got exception %s'), e) raise else: snapshot['provider_id'] = uuid LOG.debug(_LI('acme: created_snapshot %s'), uuid) return {'provider_id': uuid}
Destroying a snapshot
def delete_snapshot(self, snapshot): """ Delete snapshot :param snapshot: Snapshot """ LOG.debug(_LI('acme: delete_snapshot(%s)'), snapshot) volume = self.__get_snapshot_volume(snapshot) share_uuid = volume['provider_id'] uuid = snapshot['provider_id'] try: acme_delete_snapshot(share_uuid, uuid) except Exception as e: LOG.warning(_LW('acme: got exception %s'), e) raise else: LOG.debug(_LI('acme: deleted_snapshot(%s)'), uuid)
Cloning a snapshot into a volume
def create_volume_from_snapshot(self, volume, snapshot): """Create new volume from other's snapshot on appliance. :param volume: reference of volume to be created :param snapshot: reference of source snapshot """ LOG.debug(_LI('acme: create_volume_from_snapshot(%s)'), snapshot) snapshot_uuid = snapshot['provider_id'] LOG.debug(_LI('acme: clone %s'), snapshot_uuid) share_uuid, uris = acme_create_share(self._get_volume_name(volume), snapshot_uuid) # Store share UUID in provider_id field addr, share_path = self._parse_uri(uris[0]) if self.__pool_address: addr = self.__pool_address provider_location = addr + ":" + share_path volume['provider_location'] = provider_location volume['provider_id'] = share_uuid self._mount(volume) # Get origin volume of the snapshot orig_volume = self.__get_snapshot_volume(snapshot) try: self._mount(orig_volume) except Exception as e: LOG.warning(_LW('acme: failed to unmount volume: %s: %s'), orig_volume.id, e) try: self._unmount(volume) except Exception as e1: LOG.warning(_LW('acme: failed to unmount volume: %s: %s'), volume.id, e1) raise # Rename the volume file # Expected file name is based on volume['name'] but after the clone # we have orig_volume['name'] new_name = self.local_path(volume) old_name = path.join(self.__get_mount_point(volume), orig_volume['name']) try: rename(old_name, new_name) except Exception as e: LOG.warning(_LW('acme: rename failed: %s'), e) raise finally: try: self._unmount(volume) self._unmount(orig_volume) except Exception as e: LOG.warning(_LW('acme: failed to unmount volume: %s'), e) LOG.debug(_LI('acme: Created volume %s'), volume) return {'provider_location': provider_location, 'provider_id': share_uuid, }
def create_cloned_volume(self, volume, src_vref): """ Creates a clone of the specified volume. :param volume: new volume reference :param src_vref: source volume reference """ LOG.info(_LI('Creating clone of volume: %s'), src_vref['id']) snapshot = {'volume_name': src_vref['name'], 'volume_id': src_vref['id'], 'name': 'cinder-snapshot=%(id)s' % volume, } # We don't delete this snapshot, because this snapshot will be origin # of new volume. This snapshot will be automatically promoted by NMS # when user will delete its origin. self.create_snapshot(snapshot) try: return self.create_volume_from_snapshot(volume, snapshot) except Exception as e: LOG.warning(_LW('acme: snapshot creation failed: %s'), e) self.delete_snapshot(snapshot) raise
Wednesday, March 8, 2017
Synchronization tricks
While working on Sentry HA project I had to solve a little synchronization puzzle that seems to be somewhat interesting.
For keeping track of waiters we need some sort of a sorted queue (so that when the value increases we don't have to go through all waiters to check who should be waken up). Priority queue seems just the right data structure. We will write the code in Java and will use PriorityBlockingQueue because it is thread-safe version of the priority queue.
We also need to make sure that we can safely wakeup a waiter before it decides to wait and in this case, subsequent call to wait() will simply return immediately. We will use Semaphore for this purpose.
For the counter value we use volatile variable. It is a little bit cleaner to use Atomic (and the actual code uses it), but it seems that volatile is sufficient, so I am using it here for demo purposes.
Problem
Suppose that there is some variable that is initialized to zero and monotonically increases over time. Only a single thread may increase the value. Many other threads need to wait until the variable reaches certain value. They should block until the value is reached and wake up once it is reached. We should guarantee that when for any value of the variable, once it reaches the value, there are no sleeping threads waiting for that value.
So the interface is pretty simple. There is one thread that periodically calls inc() method and many threads that may call waitFor(int value) method. A call to waitFor() will either return immediately if the value is already reached or will block until it is reached.
Solution
I would like to have a solution that doesn't use locks - at least explicit locks. It can use thread-safe data structures which, of course, use locks internally.
We also need to make sure that we can safely wakeup a waiter before it decides to wait and in this case, subsequent call to wait() will simply return immediately. We will use Semaphore for this purpose.
For the counter value we use volatile variable. It is a little bit cleaner to use Atomic (and the actual code uses it), but it seems that volatile is sufficient, so I am using it here for demo purposes.
Code
import net.jcip.annotations.ThreadSafe; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.Semaphore; /** * Waiting for counter to reach certain value. * The counter starts from zero and its value increases over time. * The class allows for multiple consumers waiting until the value of the * counter reaches some value interesting to them. * Consumers call {@link #waitFor(int)} which may either return * immediately if the counter reached the specified value, or block * until this value is reached. * <p> * All waiters should be waken up when the counter becomes equal or higher * then the value they are waiting for. * <p> * The counter is updated by a single updater that should only increase the * counter value. * The updater calls the {@link #inc()} method to update the counter * value and this should wake up all threads waiting for any value smaller or * equal to the new one. * <p> * The class is thread-safe. * It is designed for use by multiple waiter threads and a single * updater thread, but it will work correctly even in the presence of multiple * updater threads. */ @ThreadSafe public final class CounterWait { // Implementation notes. // // The implementation is based on: // // 1) Using an atomic counter value which guarantees consistency. // Since everyone needs only to know when the counter value reached the // certain value and the counter may only increase its value, // it is safe to update the counter by another thread after its value // was read. // // 2) Priority queue of waiters, sorted by their expected values. The smallest // value is always at the top of the queue. The priority queue itself // is thread-safe, so no locks are needed to protect access to it. // // Each waiter is implemented using a binary semaphore. // This solves the problem of a wakeup that happens before the sleep - // in this case the acquire() doesn't block and returns immediately. // // NOTE: We use PriorityBlockingQueue for waiters because it is thread-safe, // we are not using its blocking queue semantics. /** Counter value. May only increase. */ private volatile int currentId = 0; /** * Waiters sorted by the value of the counter they are waiting for. * Note that {@link PriorityBlockingQueue} is thread-safe. * We are not using this as a blocking queue, but as a synchronized * PriorityQueue. */ private final PriorityBlockingQueue<ValueEvent> waiters = new PriorityBlockingQueue<>(); /** * Update the counter value and wake up all threads waiting for this * value or any value below it. * <p> * The counter value should only increase. * An attempt to decrease the value is raising * {@link IllegalArgumentException}. * The usual case is to have a single updater thread, but we enforce this * by synchronizing the call. */ public synchronized void inc() { currentId++; // Wake up any threads waiting for a counter to reach this value. wakeup(currentId); } /** * Wait for specified counter value. * Returns immediately if the value is reached or blocks until the value * is reached. * Multiple threads can call the method concurrently. * * @param value requested counter value * @return current counter value that should be no smaller then the requested * value */ public int waitFor(int value) { // Fast path - counter value already reached, no need to block if (value <= currentId) { return currentId; } // Enqueue the waiter for this value ValueEvent eid = new ValueEvent(value); waiters.put(eid); // It is possible that between the fast path check and the time the // value event is enqueued, the counter value already reached the requested // value. In this case we return immediately. if (value <= currentId) { return currentId; } // At this point we may be sure that by the time the event was enqueued, // the counter was below the requested value. This means that update() // is guaranteed to wake us up when the counter reaches the requested value. // The wake up may actually happen before we start waiting, in this case // the event's blocking queue will be non-empty and the waitFor() below // will not block, so it is safe to wake up before the wait. // So sit tight and wait patiently. eid.waitFor(); return currentId; } /** * Wake up any threads waiting for a counter to reach specified value * Peek at the top of the queue. If the queue is empty or the top value * exceeds the current value, we are done. Otherwise wakeup the top thread, * remove the corresponding waiter and continue. * <p> * Note that the waiter may be removed under our nose by * {@link #waitFor(long)} method, but this is Ok - in this case * waiters.remove() will just return false. * * @param value current counter value */ private void wakeup(int value) { while (true) { // Get the top of the waiters queue or null if it is empty ValueEvent e = waiters.poll(); if (e == null) { // Queue is empty - return. return; } // No one to wake up, return event to the queue and exit if (e.getValue() > value) { waiters.add(e); return; } // Due for wake-up call e.wakeup(); } } /** * Return number of waiters. This is mostly useful for metrics/debugging * * @return number of sleeping waiters */ public int waitersCount() { return waiters.size(); } /** * Representation of the waiting event. * The waiting event consists of the expected value and a binary semaphore. * <p> * Each thread waiting for the given value, creates a ValueEvent and tries * to acquire a semaphore. This blocks until the semaphore is released. * <p> * ValueEvents are stored in priority queue sorted by value, so they should be * comparable by the value. */ private static class ValueEvent implements Comparable<ValueEvent> { /** Value waited for. */ private final int value; /** Binary semaphore to synchronize waiters */ private final Semaphore semaphore = new Semaphore(1); /** * Instantiates a new Value event. * * @param v the expected value */ ValueEvent(int v) { this.value = v; // Acquire the semaphore. Subsequent calls to waitFor() will block until // wakeup() releases the semaphore. semaphore.acquireUninterruptibly(); // Will not block } /** Wait until signaled. May return immediately if already signalled. */ void waitFor() { semaphore.acquireUninterruptibly(); } /** @return the value we are waiting for */ long getValue() { return value; } /** Wakeup the waiting thread. */ void wakeup() { semaphore.release(); } /** * Compare objects by value */ @Override public int compareTo(final ValueEvent o) { return value == o.value ? 0 : value < o.value ? -1 : 1; } /** * Use identity comparison of objects */ @Override public boolean equals(final Object o) { return (this == o); } @Override public String toString() { return String.valueOf(value); } } }
Wednesday, May 4, 2016
NFS Duplicate request hash
This post is about a rather technical point - the performance of the duplicate request cache in NFSv3 implementation for Illumos OS. The duplicate request cache was invented by Chet Juszczak in 1989 and pretty much any NFSv3 implementation uses some form of it. A short overview says:
The typical NFS version 3 protocol failure recovery model uses client time-out and retry to handle server crashes, network partitions, and lost server replies. A retried request is called a duplicate of the original.
When used in a file server context, the term idempotent can be used to distinguish between operation types. An idempotent request is one that a server can perform more than once with equivalent results (though it may in fact change, as a side effect, the access time on a file, say for READ). Some NFS operations are obviously non-idempotent. They cannot be reprocessed without special attention simply because they may fail if tried a second time. The CREATE request, for example, can be used to create a file for which the owner does not have write permission. A duplicate of this request cannot succeed if the original succeeded. Likewise, a file can be removed only once.
The side effects caused by performing a duplicate non-idempotent request can be destructive (for example, a truncate operation causing lost writes). The combination of a stateless design with the common choice of an unreliable network transport (UDP) implies the possibility of destructive replays of non-idempotent requests. Though to be more accurate, it is the inherent stateless design of the NFS version 3 protocol on top of an unreliable RPC mechanism that yields the possibility of destructive replays of non-idempotent requests, since even in an implementation of the NFS version 3 protocol over a reliable connection-oriented transport, a connection break with automatic reestablishment requires duplicate request processing (the client will retransmit the request, and the server needs to deal with a potential duplicate non-idempotent request).
The original Solaris/Illumos implementation is reasonable but is has several problems under seriously bigger workloads of today:
- The cache size is static and should be determined up-front when the system boots. There is a documented tuneable rpcmod:maxdupreqs that can be tweaked. Its default value is 1024 which is way too small for modern applications. There is no clear idea about the optimal settings and values of 4096 or 8192 are suggested.
- When the rate of the duplicate requests causes cache drops, performance drops significantly.
- The implementation doesn't scale well over many CPUs typical in modern systems. It uses a single global lock to protect the hash table.
We can actually improve the implementation using some newer ideas and features available in Illumos - mostly by using the kmem caching facility of the Illumos kernel memory allocator. The idea is to increase the maximum cache size a lot and to keep entries in the cache for certain time if possible. There are two main parameters:
- cotsmaxdupreqs is the maximum number of cached items. Should be much larger then the number of NFS daemon threads. The default value is 32768.
- cots_dupreq_lifetime is the request time to live in the dup cache - should be enough for normal RPC timeouts.
The implementation tries to keep cached entries for cots_dupreq_lifetime but it also keeps total number of allocated entries within cotsmaxdupreqs.
Here is the description of the algorithm:
- When a new request comes in, it is always placed in a cache (even if total number of entries exceeds cotsmaxdupreqs)and is marked as DUP_INPROGRESS. Once the request is processed by the underlying file system it is either marked as DUP_DONE (meaning that its results can be cached) or DUP_DROP (meaning that it can't be cached). If the request is marked as DUP_DONE and the total number of cached request doesn't exceed MAXDUPREQS, we keep it in the cache. If the total maximum cache size is reached, we drop the response.
- We combine walk of the cache bucket chain to find the entry with freeing expired entries. Also there is a separate GC process that cleans up expired entries periodically.
- The dup cache has an interesting property - most of the checks are misses and hits are very rare. So cache miss is a fast path.
Here is the actual code snippet:
/*
*
* svc_cots_kdup searches the request cache and returns 0 if the
* request is not found in the cache. If it is found, then it
* returns the state of the request (in progress or done) and
* the status or attributes that were part of the original reply.
*
* If DUP_DONE (there is a duplicate) svc_cots_kdup copies over the
* value of the response. In that case, also return in *dupcachedp
* whether the response free routine is cached in the dupreq - in which case
* the caller should not be freeing it, because it will be done later
* in the svc_cots_kdup code when the dupreq is reused.
*
*/
static int
svc_cots_kdup(struct svc_req *req, caddr_t res, int size, struct dupreq **drpp,
bool_t *dupcachedp)
{
struct rpc_cots_server *stats = CLONE2STATS(req->rq_xprt);
dupreq_t *dr = NULL;
dupreq_t *dr_next = NULL;
uint32_t xid = REQTOXID(req);
uint32_t drhash;
int status;
clock_t now = ddi_get_lbolt();
dupreq_t *head;
int nfreed;
dupcache_bucket_t *bucket = &cots_duphash[XIDHASH(xid)];
list_t *bucket_list = &bucket->dc_entries;
kmutex_t *bucket_lock = &bucket->dc_lock;
RSSTAT_INCR(stats, rsdupchecks);
stats->rsentries.value.ui64 = cotsndupreqs;
stats->rsoverflow.value.ui64 = cots_overflows;
stats->rspending.value.ui64 = cots_inprogress;
stats->rsclifetime.value.ui64 = cots_cache_lifetime;
/*
* Allocate a new entry outside the lock.
* We only need the new entry if there isn't one already present, but
* usually this is the case.
*/
dupreq_t *dr_new = dr_alloc(req, size); if (dr_new == NULL) { RSSTAT_INCR(stats, rsnomem); return (DUP_ERROR); } dr_new->dr_created = now; /*
* Check to see whether an entry already exists in the cache.
*/
mutex_enter(bucket_lock); dr = list_head(bucket_list); while (dr != NULL) { dr_next = list_next(bucket_list, dr); status = dr->dr_status; /*
* Remove expired DUP_DONE entries
*/
if ((status == DUP_DONE) && (dr->dr_created + cots_dupreq_lifetime < now)) { atomic_add_64(&cots_cache_lifetime, now - dr->dr_created); list_remove(bucket_list, dr); dr_free(dr); } else if (cots_compare(req, dr)) { if (status == DUP_DONE) { bcopy(dr->dr_resp.buf, res, size); if (dupcachedp != NULL && dr->dr_resfree != NULL) { *dupcachedp = B_TRUE; } } else { *drpp = dr; RSSTAT_INCR(stats, rsinprogress); } /*
* Client retried this request so it is a good chance
* that it will retry again, so keep this entry in the
* cache a bit longer = move it to the end of line.
*/
atomic_add_64(&cots_cache_lifetime, now - dr->dr_created); dr->dr_created = now; list_remove(bucket_list, dr); list_insert_tail(bucket_list, dr); mutex_exit(bucket_lock); RSSTAT_INCR(stats, rsdupreqs); dr_free(dr_new); return (status); } dr = dr_next; } /*
* Entry not found in the cache so it is a new request.
*/
dr = dr_new; list_insert_tail(bucket_list, dr); mutex_exit(bucket_lock); atomic_inc_32(&cots_inprogress); *drpp = dr; return (DUP_NEW); } /*
* svc_cots_kdupdone marks the request done (DUP_DONE)
* and stores the response.
*/
static void
svc_cots_kdupdone(struct dupreq *dr, caddr_t res, void (*dis_resfree)(), int size, int status) { uint32_t drhash = (uint32_t)DRHASH(dr); dupcache_bucket_t *bucket = &cots_duphash[drhash]; list_t *bucket_list = &bucket->dc_entries; kmutex_t *bucket_lock = &bucket->dc_lock; ASSERT(dr->dr_resfree == NULL); ASSERT(dr->dr_status == DUP_INPROGRESS); atomic_dec_32(&cots_inprogress); mutex_enter(bucket_lock); if (status != DUP_DONE) { dr->dr_status = status; } else { dr->dr_resfree = dis_resfree; bcopy(res, dr->dr_resp.buf, size); if (cotsndupreqs < cotsmaxdupreqs) { dr->dr_status = DUP_DONE; mutex_exit(bucket_lock); return; } /*
* Cache is full. Try replacing the oldest entry in the bucket
* If this isn't possible, don't bother and just purge the
* current entry
*/
atomic_inc_32(&cots_overflows);
dupreq_t *head = list_head(bucket_list);
if (head != NULL && head->dr_status == DUP_DONE) {
/* Cut the head off */ atomic_add_64(&cots_cache_lifetime,
ddi_get_lbolt() - head->dr_created);
list_remove_head(bucket_list);
/*
* dr stays in the cache
*/
dr->dr_status = DUP_DONE; mutex_exit(bucket_lock); dr_free(head); return; } } atomic_add_64(&cots_cache_lifetime, ddi_get_lbolt() - dr->dr_created); list_remove(bucket_list, dr); mutex_exit(bucket_lock); dr_free(dr); }
/*
* Allocate a new dupreq structure which has enough space to hold the request
* and response data
*/
static dupreq_t * dr_alloc(struct svc_req *req, int resp_size) { dupreq_t *dr = kmem_cache_alloc(dupreq_cache, KM_NOSLEEP); if (dr == NULL) { return (NULL); } if (dr->dr_addr.buf == NULL || dr->dr_addr.maxlen < req->rq_xprt->xp_rtaddr.len) { if (dr->dr_addr.buf != NULL) { kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen); dr->dr_addr.buf = NULL; } dr->dr_addr.maxlen = req->rq_xprt->xp_rtaddr.len; dr->dr_addr.buf = kmem_alloc(dr->dr_addr.maxlen, KM_NOSLEEP); if (dr->dr_addr.buf == NULL) { dr->dr_addr.maxlen = 0; kmem_cache_free(dupreq_cache, dr); return (NULL); } } if (dr->dr_resp.buf == NULL || dr->dr_resp.maxlen < resp_size) { if (dr->dr_resp.buf != NULL) { kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen); dr->dr_resp.buf = NULL; } dr->dr_resp.maxlen = (unsigned int)resp_size; dr->dr_resp.buf = kmem_alloc(resp_size, KM_NOSLEEP); if (dr->dr_resp.buf == NULL) { dr->dr_resp.maxlen = 0; kmem_cache_free(dupreq_cache, dr); return (NULL); } } dr->dr_xid = REQTOXID(req); dr->dr_prog = req->rq_prog; dr->dr_vers = req->rq_vers; dr->dr_proc = req->rq_proc; dr->dr_addr.len = req->rq_xprt->xp_rtaddr.len; bcopy(req->rq_xprt->xp_rtaddr.buf, dr->dr_addr.buf, dr->dr_addr.len); dr->dr_status = DUP_INPROGRESS; atomic_inc_32(&cotsndupreqs); return (dr); } /*
* Cache garbage collection. Called to cleanup cache entries periodically.
* Note that entries are also cleaned up in svc_cots_kdup().
*/
static void
dupreq_gc(void *arg) { int i; clock_t now = ddi_get_lbolt(); /*
* Walk through every bucket and free all expired entries
*/
for (i = 0; i < DRHASHSZ; i++) { dupcache_bucket_t *bucket = &cots_duphash[i]; list_t *bucket_list = &bucket->dc_entries; kmutex_t *bucket_lock = &bucket->dc_lock; dupreq_t *dr; mutex_enter(bucket_lock); dr = list_head(bucket_list); while ((dr != NULL) && (dr->dr_created + cots_dupreq_lifetime < now)) { dupreq_t *dr_next = list_next(bucket_list, dr); int status = dr->dr_status; if (status == DUP_DONE) { atomic_add_64(&cots_cache_lifetime, now - dr->dr_created); list_remove(bucket_list, dr); dr_free(dr); atomic_inc_32(&cots_gc); } dr = dr_next; } mutex_exit(bucket_lock); } (void) timeout(dupreq_gc, NULL, cots_dupreq_lifetime); } static void
dr_free(dupreq_t *dr)
{
atomic_dec_32(&cotsndupreqs);
/* If There is custom free routine for the response data call it */
if (dr->dr_resfree != NULL) { (*dr->dr_resfree)(dr->dr_resp.buf); dr->dr_resfree = NULL; } dr->dr_xid = -1; dr->dr_proc = 0; dr->dr_vers = 0; dr->dr_prog = 0; dr->dr_status = DUP_INPROGRESS; kmem_cache_free(dupreq_cache, dr); } /*ARGSUSED*/static int
dupreq_constructor(void *buf, void *cdrarg, int kmflags) { dupreq_t *dr = buf; bzero(dr, sizeof (dupreq_t)); dr->dr_xid = -1; dr->dr_status = DUP_INPROGRESS; return (0); } /*
* Free cached address and response buffers
*/
static void
dupreq_destructor(void *buf, void *cdrarg) { dupreq_t *dr = buf; if (dr->dr_addr.buf != NULL) kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen); dr->dr_addr.buf = NULL; if (dr->dr_resp.buf != NULL) kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen); dr->dr_resp.buf = NULL; }
Wednesday, April 20, 2016
More on Cinder volume creation
After my complaints in the previous post I'd like to de-mistify the action of volume creation in Cinder -land.
The Volume object passed to create_volume() has several attributes that are useful:
The Volume object passed to create_volume() has several attributes that are useful:
- size is the expected size of the new volume;
- id is the UUID of the new volume. It is already set on the call to create_volume();
- name is volume name as assigned by Cinder;
So the job of create_volume() is:
- Do whatever is needed on the backend to obtain an NFS share that can host a file of the size specified in the volume 'size' attribute.
- Create actual file of specified size. The filename should be exactly the value of the volume 'name' property and it should be created on the NFS share that you found or created.
- Set file permissions to either 666 or 660 depending on security settings provided in options.
- Return a dictionary of volume attributes that you would like to set. It should at least include 'provider_location' which is a string in 'host:/path/to/share' format (which doesn't include the actual file name!). This is used by other components (mostly Nova) to mount the file and use it as a backing store for the volume. It is very important that the share should be mountable from both the Cinder node (which needs to mount for some operations) and the Nova node (which always mounts it when the volume is attached to a VM instance).
- You can also set 'provider_id' attribute in the return dictionary which is a string that only has a meaning to the driver. In my case, for example, I use the UUID of the newly created share, It can be used later to associate the volume with something that has meaning on your backend.
Tuesday, April 19, 2016
Exploring OpenStack Cinder
Recently I was asked to write an OpenStack cinder driver for a proprietary storage appliance. This gave me a chance to look at the implementation side of OpenStack.
Well, this is rather helpful. Armed with this knowledge, writing an actual implementation is a breeze. A curious reader may wonder what is the 'volume' that is passed to a driver? Some more digging around produces the clear description:
...
fields = {'migration_status': String(default=<class 'oslo_versionedobjects.fields.UnspecifiedDefault'>,nullable=True), 'provider_id': UUID(default=<class 'oslo_versionedobjects.fields.UnspecifiedDefault'>,nullable=True), 'availability_zone': String(default=<class 'oslo_versionedobjects.fields.UnspecifiedDefault'>,nullable=True), 'terminated_at': DateTime(default=<class 'oslo_versionedobjects.fields.UnspecifiedDefault'>,nullable=True)
...
You get the idea. It turns out that Volumes are stored in a database so there is also a matching database schema in models.py which is about as useful.
So forget about documentation, let's dive in the source tree...
The first question I wanted to answer from the source was the semantics of the create_volume() call. The RemoteFsDriver provides some hints: the call returns a dictionary
This provider_location turns out to be a string of the form host:/path/to/remote/share that is used by the mount command to mount the NFS share.
A few NFS drivers that I looked at behaved in the following way:
Getting Around
Browsing OpenStack documentation we can see that the driver must support the following set of features:- Volume Create/Delete
- Volume Attach/Detach
- Snapshot Create/Delete
- Create Volume from Snapshot
- Get Volume Stats
- Copy Image to Volume
- Copy Volume to Image
- Clone Volume
So it is quite natural to start with volume creation. Unfortunately the description above doesn't tell much about the semantics of these operations. A bit more digging round points to the cinder.volume.driver documentation which says:
create_volume(volume) creates a volume. Can optionally return a Dictionary of changes to the volume object to be persisted.
...
fields = {'migration_status': String(default=<class 'oslo_versionedobjects.fields.UnspecifiedDefault'>,nullable=True), 'provider_id': UUID(default=<class 'oslo_versionedobjects.fields.UnspecifiedDefault'>,nullable=True), 'availability_zone': String(default=<class 'oslo_versionedobjects.fields.UnspecifiedDefault'>,nullable=True), 'terminated_at': DateTime(default=<class 'oslo_versionedobjects.fields.UnspecifiedDefault'>,nullable=True)
...
You get the idea. It turns out that Volumes are stored in a database so there is also a matching database schema in models.py which is about as useful.
So forget about documentation, let's dive in the source tree...
Back to the source
Since my goal was to implement NFS-based volume, I examined the existing NfsDriver which can be used by itself or as a base class for many other drivers. It is based on RemoteFsDriver which provides common code for all NFS drivers. Hopefully this should provide enough support for the new driver - I just need to add a few API calls to communicate with the actual appliance...The first question I wanted to answer from the source was the semantics of the create_volume() call. The RemoteFsDriver provides some hints: the call returns a dictionary
volume['provider_location'] = self._find_share(volume['size']) self._do_create_volume(volume) return {'provider_location': volume['provider_location']}
This provider_location turns out to be a string of the form host:/path/to/remote/share that is used by the mount command to mount the NFS share.
A few NFS drivers that I looked at behaved in the following way:
- Configuration provides location of a file that lists available shares;
- Drivers provide some code that selects share suitable for the new volume and stick its NFS path into provider_location attribute;
- The share path contains big files that represent volumes;
- All shares are always kept mounted on the cinder node;
What I wanted to do was somewhat different - I wanted to keep 1:1 relationship between a volume and a share. This means that there is no file describing the share - shares are created on demand as volumes are created. Also since we may have a lot of volumes I didn't want to keep them mounted all the time and only mount them as needed. The benefit is that it is very easy to manage snapshots and clones since they are first class citizens on the actual appliance.
It turned out that in spite of all the existing generic code around NFS drivers all of it was useless in my situation because RemoteFsDriver assumed the wrong model. So I had to do everything from scratch. The only thing I was able to reuse was the RemoteFsClient from remotefs_brick which wasn't particularly useful either but I had to use for reasons that I'll explain in another post. The only service it provides is an ability to run mount command to mount an NFS share.
Conclusions
I was actually quite surprised to see such a dismal quality of the developer documentation and the actual implementation for something as hyped as core part of OpenStack. Compare it for example, with Docker Volume Plugin documentation (and implementations) and you'll see a huge difference. Volume plugins are small, simple, can be implemented in any language and clearly described.
Subscribe to:
Comments (Atom)