Tuesday, September 10, 2013

A 1st specialized C++ memory pool

Specialized C++ memory pools have many important applications. They are not C++ memory allocators, but can be used in their implementation. Solaris provides a quite good and rich support to this which I'll try to take advantage of.

Perhaps the most obvious application is for nodes of varying data structures. Nevertheless, as a particular example of how useful a specialized memory pool is, consider the strategy of reference counting, which is specially useful to smart pointers. The fact is that the counters must be shared by the set of smart pointers pointing to the same object. As I said on another post, this has induced the misnaming of Boost's shared_ptr. But back to the subject, the counter requirement implies that it must be dynamically allocated. The known problem is that using the standard operators ::new and ::delete are quite inefficient, specially for an intended industrial strength version. What's needed is a replacement, such as the slab allocator by Jeff Bonwick or the Bjarne Stroustrup's User-Defined Allocator (The C++ Programming Language Third/Special, section 19.4.2), both based on the idea of caches for efficient constant time O(1) operations.

Furthermore, it should be thread-safe, preferably with non-locking atomic arithmetic. I'll see if it's possible to avoid mutexes and I intend to use atomic operations provided by the Solaris Standard C Library atomic_ops(3C) collection of functions, as indicated by Darryl Gove in his book Solaris Application Programming, section 12.7.4, Using Atomic Operations. In fact, on Multicore Application Programming: For Windows, Linux, and Oracle® Solaris, chapter 8, listings 8.4 and 8.5, also by Darryl Gove I may have the solution: instead of mutexes, use a lock-free variant that loops on the CAS of a local variable.

So, inspired by the above references, I'll start my implementation of a thread-safe and always expanding specialized pool. Internally, it will be comprised of several chunks. My intention is to make the size of each chunk fit a certain page size, which ultimately will imply how many objects slots each chunk will able to cache. The first hurdle is that this is dynamic, depending on the hardware and its multiple supported page sizes among which to choose. As such, internally, I can't declare an array of objects (whose size must be known at compile time), nor can I declare a pointer to objects (as this would decouple the list of chunks from the chunks themselves, incurring on separate dynamic memory management, one for the list and other for the chunks).

The initial (probably incomplete) version of my constant time, O(1), thread-safe, always expanding pool, has two fundamental high-level operations, request() and recycle(), in order to make it publicly useful.
The implementation idea is somewhat simple: Maintain a free list of data slots over the unused payload areas of the buffers comprising the cache.
But achieving this at the code-level isn't as simple because of the performance, space and concurrency constrains. One notorious trade-off is due to the free list pointers sharing the space of unused data slots, which implies that the minimum size of a data slot is the size of a pointer (currently 8 bytes on 64-bit Intel platforms). Thus, for instance, if the data type is int (currently 4 bytes on 64-bit Intel platforms), then 50% of space will be wasted. When there's waste, the situation is known as internal fragmentation. Hence, in terms of space efficiency:
It's best to have the size of the main (pointed to) data type (T)
as a multiple of the platform's pointer (void *) size.
Here's my 1st implementation attempt:

#include <stdexcept>
#include <cstdlib>
#include <cerrno> 

#include <alloca.h> 
#include <atomic.h> 
#include <unistd.h> 
#include <sys/shm.h>
#include <sys/mman.h> 
#include <sys/types.h>

namespace memory
{

struct bad_alloc
{
    bad_alloc( int const error ) throw() :
        error( error )
    {
    }

    int const error;
};

namespace policy
{

 
enum { locked, ism, pageable, dism };

namespace internal
{

// Base class for policy classes of memory management.
struct control
{
    control( std::size_t const page_size ) throw() :
        page_size( page_size )
    {
    }

    void hat_advise_va( void const * p ) const throw()
    {
        ::memcntl_mha mha;

        mha.mha_cmd = MHA_MAPSIZE_VA;
        mha.mha_flags = 0;
        mha.mha_pagesize = page_size;

        if
        (
            ::memcntl
            (
                static_cast< caddr_t >
                    ( const_cast< void * >( p ) ),
                page_size,
                MC_HAT_ADVISE,
                reinterpret_cast< caddr_t >( & mha ),
                0,
                0
            )
            != 0
        )
        {
            // Log the error.
        }
    }

    void lock( void
const * p ) const throw()
    {
        if ( ::mlock( p, page_size ) != 0 )
        {
            // Log the error.
        }
    }

    void unlock( void
const * p ) const throw()
    {
        if ( ::munlock( p, page_size ) != 0 )
        {
            // Log the error.
        }
    }

    // The runtime size of buffers.
    std::size_t const page_size;
};

 
// Policy class for low-level shared memory management.
// Use non-type template parameters for additional data.
// Template functions to typecast data members.

template< int F >
struct shared : control
{

    shared( std::size_t const page_size ) throw() :
        control( page_size )
    {
    }
 


    template< typename B >
    void * request() const throw( std::bad_alloc )
    {
        int const handle =
            ::shmget( IPC_PRIVATE, page_size, SHM_R | SHM_W );

        if ( handle == -1 )
        {
            // Log the error.

            throw std::bad_alloc();
        }

        void * p = ::shmat( handle, 0, F );

        if ( ! p )
        {
            // Log the error.

            if ( ::shmctl( handle, IPC_RMID, 0 ) != 0 )
            {
                // Log the error.
            }

            throw std::bad_alloc();
        }

        * const_cast< int * >
            ( & reinterpret_cast< B * >( p )->handle ) =
                handle;

        return p;
    }

    template< typename B >
    void recycle( B const *
const p ) const throw()
    {
        int const handle = p->handle;

        if ( ::shmdt( ( void * ) p ) != 0 )
        {
            // Log the error.
        }

        if ( ::shmctl( handle, IPC_RMID, 0 ) != 0 )
        {
            // Log the error.
        }
    }
};


} // namespace internal

// Policy class for low-level C++ memory management.
struct cxx : internal::control
{
    cxx( std::size_t const page_size ) throw() :
        internal::control( page_size )
    {
    }

 
    template< typename >
    void * request() const throw( std::bad_alloc )
    {
        // Unfortunately, in general, not aligned!
        // No point for locking or setting page size.

        // Unfortunately, all bets are off!
        return ::operator new ( page_size );
    }

 
    template< typename B >
    void recycle( B const *
const p ) const throw()

    {
        ::operator delete (
( void * ) p );
    }
};

// Template policy class for low-level C memory management.
template< int >
struct c;

template<>
struct c< pageable > : internal::control
{

    c( std::size_t const page_size ) throw() :
        internal::control( page_size )
    {
    }

 
    template< typename >
    void * request() const throw( std::bad_alloc )
    {
        // Solaris Standard C library to the rescue!
        void * p = ::memalign( page_size, page_size );

        if ( ! p )
            throw std::bad_alloc();

        // Advise HAT to adopt a corresponding page size.
        hat_advise_va( p );

        return p;
    }


    template< typename B >
    void recycle( B const *
const p ) const throw()
    {
        ::free( ( void * ) p );
    }
};

 
template<>
struct c< locked > : c< pageable >
{

    c( std::size_t const page_size ) throw() :
        c< pageable >( page_size )
    {
    }

 
    template< typename B >
    void * request() const throw( std::bad_alloc )
    {
        void * p = c< pageable >::request< B >();

        lock( p );

        return p;
    }
 


    template< typename B >
    void recycle( B const *
const p ) const throw()
    {
        unlock( ( void * ) p );

        c< pageable >::recycle< B >( p );
    }
};


template< int >
struct shared;
 

template<>
struct shared< ism > : internal::shared< SHM_SHARE_MMU >
{
    shared( std::size_t
const page_size ) throw() :
        internal::shared<
SHM_SHARE_MMU >( page_size )
    {
    }
};


template<>
struct shared< dism > : internal::shared< SHM_PAGEABLE >
{
    shared( std::
size_t const page_size ) throw() :
        internal::shared<
SHM_PAGEABLE >( page_size )
    {
    }
};


template<>
struct shared< pageable > : internal::shared< SHM_RND >
{
    shared( std::
size_t const page_size ) throw() :
        internal::shared<
SHM_RND >( page_size )
    {
    }

   
template< typename B >
   
void * request() const throw( std::bad_alloc )
    {
       
void * p = internal::shared< SHM_RND >::request< B >();

        // Advise HAT to adopt a corresponding page size.
        hat_advise_va( p );

       
return p;
    }
};
 


template<>
struct shared< locked > : shared< pageable >
{
    shared( std::
size_t const page_size ) throw() :
        shared< pageable >( page_size )
    {
    }

   
template< typename B >
   
void * request() const throw( std::bad_alloc )
    {
       
void * p = shared< pageable >::request< B >();

        lock( p );

       
return p;
    }

   
template< typename B >
   
void recycle( B const * const p ) const throw()
    {
        unlock( (
void * ) p );

        shared< pageable >::recycle< B >( p );
    }
};
 


} // namespace policy
 

namespace internal
{

// Template for basic-memory based buffers.
template< typename T, typename >
struct buffer
{
    buffer( buffer const * const p ) throw() :
        next( p )
    {
    }

    union
    {
        // The next buffer on the list.
        buffer const * const next;

        // Alignment enforcement for the payload.
        T * align;
    };

    // The cached objects reside beyond this offset.
    // A trick to keep everything within the same chunk.


private:


    // Just allow placement new and explicit destruction.

    static void * operator new ( std::size_t ) throw();
    static void operator delete ( void * )
throw(); 

    static void * operator new [] ( std::size_t ) throw();
    static void operator delete [] ( void * )
throw();

    buffer( buffer const & );
    buffer & operator = ( buffer const & );
};

// Partial specialization for shared-memory based buffers.
template< typename T, int S >
struct buffer< T, policy::shared< S > >
{
    buffer( buffer const * const p ) throw() :
        handle( handle ), next( p )
    {
    }

    // The shared memory associated handle.

    //
    // WATCH OUT! 
    //     This will be set in placement new
    //     even before the constructor is called!
    //
    int const handle;

    union
    {
        // The next buffer on the list.
        buffer const * const next;

        // Alignment enforcement for the payload.
        T * align;
    };

    // The cached objects reside beyond this offset.
    // A trick to keep everything within the same chunk.   


private:


   
// Just allow placement new and explicit destruction.

    static void * operator new ( std::size_t ) throw();
    static void operator delete ( void * )
throw();

    static void * operator new [] ( std::size_t ) throw();
    static void operator delete [] ( void * )
throw();

    buffer( buffer const & );
    buffer & operator = ( buffer const & );
};


} // namespace internal

template< typename >
struct strategy
{
    enum { shared = false };
 

};

template< int S >
struct strategy< policy::shared< S > >
{
    enum { shared = true };
};

template< typename T >
inline T * tmp_array( std::size_t const n ) throw()
{
    return static_cast< T * >( ::alloca( n * sizeof( T ) ) );
}

inline std::size_t largest_page() throw()
{
    std::size_t largest = ::sysconf( _SC_PAGESIZE );

    int n = ::getpagesizes( NULL, 0 );
    std::size_t * size = tmp_array< std::size_t >( n );

    if ( ::getpagesizes( size, n ) != -1 )
        while ( --n >= 0 )
            if ( size[ n ] > largest )
                largest = size[ n ];

    return largest;
}

 

// The specialized memory pool of T objects.
template 
<
    typename T, 
    typename A = policy::c< policy::pageable > 
>
struct pool
{
    // Do not pre-allocate anything.
    // This provides very fast construction.

    pool
    (
        std::size_t const page_size =
            strategy< A >::shared
                ? largest_page()
                : ::sysconf( _SC_PAGESIZE )
    )
        throw() :
            allocator( page_size ),
            segment( 0 ),
            expanding( 0 ),
            available( 0 )
    {
    }

    ~pool() throw()
    {
        // An iterative instead of a recursive deleter.
        // This assures no stack overflow will ever happen here.

        while ( segment )
        {
           
buffer const * const p = segment;

            segment = segment->next;

            p->~
buffer();

            allocator.recycle( p );
        }
    }

    // The function expand() can be delayed as much as desired.
    // It will be automatically called if absolutely necessary.
    // One thread will do the expand and others will wait.

    void expand() throw( bad_alloc )
    {
        // Serialize and minimize concurrent expansions.
        if
        (
            ::atomic_cas_ptr( & expanding, 0, ( void * ) 1 ) 

            == 
            0
        )
        {
            // The modifying thread attempts the expansion.
            // Blocked threads will get notified at end.

            try
            {

                allocate();

                // Release other threads.
                ::atomic_swap_ptr( & expanding, 0 );
            }

            catch ( std::bad_alloc const & )
            {
                // Release other threads.
                ::atomic_swap_ptr( & expanding, 0 );

                // Notifies itself about the exception.
                throw bad_alloc( ENOMEM );
            }
        }
        else
            // Wait on loop before resuming.
            // Better than throwing exceptions.

            while
            (
                ::atomic_cas_ptr( & expanding, 0, 0 )
                ==
                ( void * ) 1
            )
                ;
    }

    void * request() throw( bad_alloc )
    {
        start:

        try
        {
            slot * a;

            do
            {
                if ( ! ( a = available ) )
                    throw bad_alloc( ENOMEM );
            }
            while
            (
                ::atomic_cas_ptr( & available, a,
a->next ) != a
            );

            return a;
        }

        catch ( bad_alloc const & )
        {
            try
            {
                // Race for expansion.
                expand();
            }

            catch ( ... )
            {
                // Out of memory.
                throw;
            }
        }

        // The previous expansion succeeded.
        // Try again to fulfill the request for a slot.

        goto start;
    }

    void recycle( void * p ) throw()
    {
        slot * a;

        do
        {
            a = available;
            reinterpret_cast< slot * >( p )->next = a;
        }
        while ( ::atomic_cas_ptr( & available, a, p ) != a );
    }

private:


    void allocate() throw( bad_alloc ) 
    {
        segment =
            ::new ( allocator.request< buffer >() ) 
            buffer( segment );

        // Skip the buffer's prefix.

        slot * const p =
            reinterpret_cast< slot * >
            (
                reinterpret_cast< intptr_t >( segment )

                +
                sizeof( buffer )
            );                

               
        // Add new slots from the new buffer's payload. 

        // Is it worthy to unroll (parallelize) the loop?

        slot const * const limit = 
            p 
            + 
            ( allocator.page_size - sizeof( buffer ) )
            /
            sizeof( slot );

        slot * tail = p;
        slot * tracker = tail++;

        while ( tail < limit )
        {
            tracker->next = tail;

            tracker = tail++;
        }

        (--tail)->next = 0;


        // Prepend the new slots.

        slot * a;

        do
        {
            a = available;
            tail->next = a;
        }
        while ( ::atomic_cas_ptr( & available, a, p ) != a );

    }
 
private:

    // The low-level (OS) memory allocator.
    A const allocator;


    // Convenience.
    typedef internal::buffer< T, A > buffer;

    // The list of buffers.
    // Each node contains the slots of data.
   
buffer const * segment;

    // Expansion serialization control.
    int volatile expanding;

    // The slots of data (T objects)

    // and list of available (free) slots.
    // Reusing free slots for the list's pointers.

    union slot
    {
        // Just the T size is needed (for space reservation).

        // Avoid further dependencies around the T type.
        unsigned char data[ sizeof( T ) ];

        slot * next;
    }
    * volatile available;


private:

    pool( pool const & );
    pool & operator = ( pool const & );
}; 


// A base class for a very convenient integration
// with the standard C++ memory management operators.
template 

    typename T,  
    typename A = policy::c< policy::pageable > 
>
struct operations
{
    static void * operator new ( std::size_t ) throw()
    {
        return pool.request();
    }

    static void operator delete ( void * p ) throw()
    {
        pool.recycle( p );
    }


    // The pool must be common (static).
    static memory::pool< T, A > pool;
};


// The multiple translation unit template merging
// avoids manually defining their static declarations.
template< typename T, typename A >
pool< T, A > operations< T, A >::pool;


} // namespace memory

Next are a few raw examples just for illustration. More realistically, the pool would be internal to some other object such as an advanced C++ smart pointer in order to provide more efficient and multi-threaded allocations and deallocations.

Example 1:

template< typename T >
struct pointer
{
    ...


    // All instances must share the same pool.

    static memory::pool< std::size_t > pool;

    ...

};

// The shared pool.
template< typename T >
memory::pool< std::size_t > pointer< T >::pool;


Example 2:

void f()
{
    // An ISM cache.
    static memory::pool
    < 
        std::size_t
        memory::policy::shared< memory::policy::ism
    > 
    pool;

    ...
}

Example 3:

// On Intel x86-64, sizeof( S ) = 8 = sizeof( void * )
// So, there's no internal fragmentation.
struct S
{
    char c;
    int i;
};


void f()
{

    memory::pool
    < 
        S, 
        memory::policy::c< memory::policy::locked > 
    > 
    pool( memory::largest_page() );

    try
    {      

        // Manual pre-expand (just for illustration).
        pool.expand();

        S * p = ( S * ) pool.request();

        p->c = 'S';
        p->i = 11;

        pool.recycle( p );
    }

    catch ( ... )
    {
        ...
    }
}


Example 4:

#ifndef S_HXX
#define S_HXX

#include "memory.hxx"

struct S : memory::operations< S >
{
    int code;
    char text[ 10 ];

    S( int c = 0, char const * t = "" ) : code( c )
    {
        ::strlcpy( text, t, sizeof(text) - 1 );
    }
};
  
#endif /* S_HXX */

  

 
#include "s.hxx"

void f()
{
    S * s = new S;


    ...


    delete s;
}

 

 
#include "s.hxx"

struct derived_S : S
{
   ...
};

extern void f();

void g()
{
    S * s = new S;


    ...


    delete s;

    ...


    derived_S * ds = new derived_S;

    ...


    delete ds;

    ...


    f();
}