65.9K
CodeProject 正在变化。 阅读更多。
Home

设计一个通用且模块化的 C++ 记忆化程序

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.88/5 (8投票s)

2017 年 10 月 16 日

CPOL

9分钟阅读

viewsIcon

19476

downloadIcon

180

本文探讨了记忆化函数的一些用例,并表明紧耦合的实现无法很好地扩展到新应用程序。然后,它为 C++ 记忆化框架提出了一个模块化设计,该框架通用、高效且可扩展到新应用程序。

引言

记忆化函数(Memoization)是一种优化技术,它存储昂贵函数调用的结果,并在再次遇到相同输入时返回缓存的结果(https://en.wikipedia.org/wiki/Memoization)。它最早由 Donald Michie 在其 1968 年发表于《自然》杂志(Nature magazine)的文章《“Memo” Functions and Machine Learning》中描述。他称之为一种“死记硬背”学习机制,可以使程序在执行过程中提高自身的效率。

虽然动态规划是记忆化函数常被引用的应用,但还有许多其他领域可以从这项技术中受益,例如分布式计算、数学建模、金融投资组合估值等。与此类技术通常的情况一样,你使用记忆化函数的次数越多,就越有可能发现它的其他应用。在本文中,我将介绍一个通用的、模块化的记忆化函数,它可以配置以满足特定应用程序的需求。

背景

本文介绍的记忆化函数广泛使用了现代 C++ 特性,如可变参数模板、右值引用、通用引用、完美转发和返回值推导。熟悉这些概念将有助于理解代码示例。

记忆化函数的组件

在开始设计模块化记忆化函数之前,思考记忆化函数的组成部分很有帮助。一个记忆化函数有两个部分:

    1. 记忆化函数(Memoizing function):这是一个高阶函数,它接受另一个函数作为参数,对其进行求值,并将其结果存储在缓存中。

    2. 结果缓存(Results cache):这是记忆化函数存储结果的地方。

尽管记忆化函数被广泛使用,但大多数记忆化函数的实现都是“全有或全无”的,记忆化函数和结果缓存之间存在紧密的耦合。它们对缓存类型有硬编码的依赖,对缓存行为的配置能力有限,并且为了通用性而做出悲观的数据结构选择。这限制了它们的应用范围,只适用于一小部分问题,并导致程序员为每一组新应用程序重新实现记忆化函数。作为用户,我们要么得不到我们想要的,要么最终为我们不需要或不使用的东西付出代价(在空间/时间复杂度方面)。而模块化或松耦合的设计则促进了代码重用,并可以轻松地根据需要适应新应用程序。

记忆化函数

让我们先思考一下我们希望从记忆化函数中获得什么。

    1. 通用性,即接受任何可调用对象:我们应该能够记忆化任何可调用对象,例如自由函数、成员函数、函数对象、lambda 表达式等。

    2. 保留类型:我们不应该被要求将函数包装在类型擦除接口(如 std::function)中。类型擦除接口使用 virtual 函数将调用委托给底层对象。除了增加一个额外的间接层级,virtual 函数还会阻止内联。虽然肯定有些情况我们需要使用类型擦除接口(例如在记忆化递归函数时),但我们不应被迫这样做。

     3. 非侵入性:我们应该能够在不修改函数的情况下对其进行记忆化。但是,对于递归函数,我们将需要做一个例外。

     4. 无硬编码依赖:这一点不言而喻。毕竟,我们正在讨论一个松耦合的记忆化函数。

现在,让我们看看一个满足所有这些要求的记忆化函数。

namespace memo {

template<typename CacheType, typename CallableType, typename... Args>
auto invoke(CacheType&& cache, CallableType&& fn, Args&&... args)
{
    auto&& pY = cache.lookup(std::tie(args...));
    if (!!pY) {
        return *pY;
    }

    auto&& y = fn(std::forward<Args>(args)...);
    cache.remember(std::tie(args...), std::forward<decltype(y)>(y));
    return y;
}

template<typename CacheType, typename CallableType>
class Memoizer {
   
public:
    template<typename C, typename... FnArgs>
    explicit Memoizer(C&& cache, FnArgs&&... args)
    : cache_(std::forward<C>(cache))
    , fn_(std::forward<FnArgs>(args)...)
    {
    }
    
    template<typename... Args>
    auto operator()(Args&&... args) const
    {
        return memo::invoke(cache_, fn_, std::forward<Args>(args)...);
    }

private:
    mutable CacheType cache_;
    mutable CallableType fn_;
};

}

这里涉及很多内容,让我们分解一下。

      1. 首先,memo::invoke 是记忆化函数——它求值给定的函数并将其结果存储在缓存中。它由 CallableTypeCacheType 参数化——因此没有硬编码的依赖。

      2. CacheTypelookup 方法的返回类型必须允许使用 operator!() 测试有效性,并且应该是可解引用的。指针,包括智能指针,都表现出这种行为,std::optional 也是如此。

      3. Memoizer 模板是函数装饰器的生成器,它将记忆化函数和结果缓存结合在一起。将此装饰器应用于函数即可获得其记忆化版本。

结果缓存

现在,让我们思考一下我们希望从结果缓存中获得什么。但在开始之前,让我们看两个基本的缓存类型,它们将 std::map 类数据结构适配到我们的记忆化函数中。

namespace memo {
namespace cache {

struct construct_in_place_t { };

template<typename Container>
class AssociativeContainerAdapter {

public:
    AssociativeContainerAdapter()
    {
    }

    template<typename... ConsArgs>
    explicit AssociativeContainerAdapter(construct_in_place_t, ConsArgs&&... args)
    : data_(std::forward<ConsArgs>(args)...)
    {
    }

    template<typename K>
    auto lookup(K&& key) const
    {
        auto iter = data_.find(std::forward<K>(key));
        if (iter == data_.end()) {
            return (decltype(&iter->second))nullptr;
        }

        return &iter->second;
    }

    template<typename K, typename V>
    void remember(K&& key, V&& value)
    {
        data_.emplace(std::forward<K>(key), std::forward<V>(value));
    }

private:
    Container data_;
};

template<typename K, typename V, typename Comparator = std::less<K> >
using BasicOrderedCache = AssociativeContainerAdapter<std::map<K, V, Comparator> >;

template<typename K, typename V, typename Hash = std::hash<K>, typename KeyEqual = std::equal_to<K> >
using BasicUnorderedCache = AssociativeContainerAdapter<std::unordered_map<K, V, Hash, KeyEqual> >;

}
}

这些缓存对于许多应用程序来说都足够了。它们还支持自定义比较器,这意味着我们可以定义自己的等价关系。这在使用浮点数时尤其有用,我们需要使用“模糊”比较。下面的列表显示了一个模糊的小于比较器。

struct FuzzyLess {

    explicit FuzzyLess(double tolerance)
    : tolerance_(tolerance)
    {
    }

    bool operator()(const std::tuple<double>& arg1, const std::tuple<double>& arg2) const
    {
        return std::get<0>(arg1) < (std::get<0>(arg2) - tolerance_);
    }

private:
    double tolerance_;
};

但是,有些应用程序需要专门的缓存。让我们看看在为这种情况选择最佳缓存类型时需要考虑的一些因素。

      1. 资源限制:应用程序需要能够处理各种资源限制。内存越来越便宜,但它仍然是有限的资源。虽然能够存储每次调用的结果会很好,但由于可用内存和结果的大小,这可能不可行。如果内存使用是一个问题,我们将需要一个带淘汰机制的缓存,例如 LRU 缓存。

      2. 缓存位置:它必须是内存缓存,还是我们需要远程缓存?内存缓存对于大多数应用程序都非常有效。但是,考虑一个具有许多工作节点的分布式计算网格。如果工作节点上执行的任务共享一些昂贵的计算子问题,我们应该对其进行记忆化。但是使用内存缓存意味着其余的工作节点将无法访问记忆化结果。另一方面,共享远程缓存将允许工作节点从彼此的工作中受益。并且请记住,有无数供应商提供远程缓存解决方案。我们希望能够以最小的改动切换供应商吗?或者也许我们想模拟缓存故障并研究其对计算网格整体吞吐量的影响。

      3. 线程安全:我们需要同步数据结构吗?是的,如果记忆化函数从多个线程调用。但是,我们不应该在单线程应用程序中承担这种成本。下面的列表显示了一个 std::map 类数据结构的同步适配器。

namespace memo {
namespace cache {

namespace detail {

template<typename T, typename Mutex>
class ContainerLockGuard {
    // on a cache miss, this allows us to hold the lock until the
    // result is memoized.

public:
    ContainerLockGuard(const T *ptr, Mutex *mutex) throw()
    : ptr_(ptr)
    , mutex_(mutex)
    {
    }

    ContainerLockGuard(const ContainerLockGuard&) = delete;
    ContainerLockGuard& operator=(const ContainerLockGuard&) = delete;

    ContainerLockGuard(ContainerLockGuard&& that)
    : ptr_(nullptr)
    , mutex_(nullptr)
    {
        std::swap(ptr_, that.ptr_);
        std::swap(mutex_, that.mutex_);
    }

    ~ContainerLockGuard()
    {
        if (mutex_) {
            mutex_->unlock();
        }
    }

    bool operator!() const
    {
        return !ptr_;
    }

    const T& operator*()
    {
        return *ptr_;
    }  

private:
    const T *ptr_;
    Mutex *mutex_;
};

}

template<typename Container, typename Mutex = std::recursive_mutex>
class SynchronizedContainerAdapter {

    typedef typename Container::mapped_type MappedType;
    friend class detail::ContainerLockGuard<MappedType, Mutex>;

public:
    SynchronizedContainerAdapter()
    {
    }

    template<typename... ConsArgs>
    explicit SynchronizedContainerAdapter(construct_in_place_t, ConsArgs&&... args)
    : data_(std::forward<ConsArgs>(args)...)
    {
    }

    template<typename K>
    auto lookup(K&& key) const
    {
        std::unique_lock<Mutex> lock(*mutex_);
        auto iter = data_.find(std::forward<K>(key));
        const MappedType *ptr = nullptr;
        if (iter != data_.end()) {
            ptr = &iter->second;
        }

        lock.release(); // dissociates from the mutex without unlocking
        return detail::ContainerLockGuard<MappedType, Mutex>(ptr, mutex_.get());
    }

    template<typename K, typename V>
    void remember(K&& key, V&& value)
    {
        std::unique_lock<Mutex> lock(*mutex_);
        data_.emplace(std::forward<K>(key), std::forward<V>(value));
    }

private:
    Container data_;
    mutable std::unique_ptr<Mutex> mutex_ = std::make_unique<Mutex>();
};

}
}

template<typename K, typename V, typename Comparator = std::less<K> >
using SynchronizedOrderedCache = SynchronizedContainerAdapter<std::map<K, V, Comparator> >;

template<typename K, typename V, typename Hash = std::hash<K>, typename KeyEqual = std::equal_to<K> >
using SynchronizedUnorderedCache = SynchronizedContainerAdapter<std::unordered_map<K, V, Hash, KeyEqual> >     

ContainerLockGuard 确保在缓存未命中时,记忆化函数在函数求值并存储结果之前会一直持有锁。否则可能导致竞态条件,多个线程为相同的输入求值函数。

      4. 利用先验知识:我们对函数了解多少,能否利用这些知识选择最佳的缓存策略?例如,如果有效输入属于一小组整数,我们可以使用数组来获得恒定的插入和查找时间。如果我们知道函数的导数,我们可以使用自定义比较器来避免为(应用程序定义的)可忽略的输入变化重新计算函数,从而有效地过滤掉“噪声”。

在紧耦合的设计中,支持所有这些用例将导致配置选项的爆炸。此外,支持新的用例需要对记忆化函数的实现进行大量更改。显然,这样的设计是脆弱的、容易出错的,并且无法很好地扩展。我们对缓存行为的配置能力有限,/或被迫使用悲观的数据结构(例如,始终使用同步数据结构)。另一方面,模块化记忆化函数使库作者不必预料到所有可能的用例,并让库用户自由地根据特定应用程序进行配置。

整合

是时候将这些组件整合在一起了。Memoizer 不会擦除类型,这意味着模板需要用要记忆化的函数的类型进行实例化。这可能会很快变得复杂,因此有一个辅助函数(memoize)可以推导类型,然后装饰函数并返回其记忆化版本。

namespace memo {

template<typename CacheType, typename CallableType>
auto memoize(CacheType&& cache, CallableType&& f)
{
    typedef Memoizer<std::remove_reference_t<CacheType>,
                     std::remove_reference_t<CallableType> > MemoizerType;
    return MemoizerType(std::forward<CacheType>(cache), std::forward<CallableType>(f));
}

}

 

现在让我们看看如何使用记忆化函数。下面的列表显示了一个简单逻辑函数的多个实现——一个自由函数、一个 lambda 表达式和一个用户定义的函数类型——它们使用 BasicOrderedCache 进行记忆化以存储结果。由于我们处理的是浮点数,因此缓存会与一个模糊的小于比较器一起实例化。

double logisticFn(double x)
{
    return 1.0 / (1.0 + std::exp(-x));
}

struct LogisticFn {

    double operator()(double x) const
    {
        return 1.0 / (1.0 + std::exp(-x));
    }

    double evaluate(double x) const
    {
        return operator()(x);
    }
};

void memoizer_test()
{
    using namespace memo;
    using namespace memo::cache;

    // there is a single input of type double, and the result type is double
    typedef BasicOrderedCache<std::tuple<double>, double, FuzzyLess> CacheType;
    auto cache = CacheType(cache::construct_in_place_t(), FuzzyLess(1e-6));
    
    // memoize a free-function
    auto logistic1 = memo::memoize(cache, &logisticFn);
    std::cout << logistic1(5.0) << std::endl;

    // memoize a lambda
    auto logistic2 = memo::memoize(cache,
                                    [](double x) -> double {
                                        return 1.0 / (1.0 + std::exp(-x));
                                    });
    std::cout << logistic2(5.0) << std::endl;

    // memoize a functor (std::function)
    auto logistic3 = memo::memoize(cache, std::function<double(double)>(logisticFn));
    std::cout << logistic3(5.0) << std::endl;

    // memoize a user-defined callable object
    auto logistic4 = memo::memoize(cache, LogisticFn());
    std::cout << logistic4(5.0) << std::endl;

    // memoize a member that is not a call operator
    auto logistic5 = memo::memoize(cache, std::bind(&LogisticFn::evaluate, 
                                                     LogisticFn(), std::placeholders::_1));
    std::cout << logistic5(5.0) << std::endl;
}

到目前为止一切顺利。但是,具有重载方法的类型会产生一个有趣的情况。让我们看一个例子,然后再深入细节。

struct LogisticFn {

    double operator()(double x) const
    {
        return 1.0 / (1.0 + std::exp(-x));
    }

    double operator()(float x) const
    {
        return 1.0 / (1.0 + std::exp(-x));
    }
};

void memoizerTest()
{
    using namespace memo;
    using namespace memo::cache;

    typedef BasicOrderedCache<std::tuple<double>, double, FuzzyLess> CacheType;
    auto cache = CacheType(construct_in_place_t(), FuzzyLess(1e-6));

    // case 1: selects a different overload based on the type of the argument
    auto logistic1 = memo::memoize(cache, LogisticFn());
    std::cout << logistic1(5.0) << std::endl; // calls double operator()(double)
    std::cout << logistic1(5.0f) << std::endl; // calls double operator()(float)

    // case 2: always calls double operator()(double)
    auto logistic2 = memo::memoize(cache,
                        std::bind((double (LogisticFn::*)(double) const)&LogisticFn::operator(),
                                  LogisticFn(), std::placeholders::_1));
    std::cout << logistic2(5.0) << std::endl; // calls double operator()(double)
    std::cout << logistic2(5.0f) << std::endl; // calls double operator()(double)
}

在第一种情况下,调用哪个重载取决于参数的类型。代码可以编译,因为重载具有相同的返回类型(否则,在 Memoizer 中返回值类型推导将失败),并且 float 可以隐式转换为缓存中输入的类型(double)。但我们不必依赖编译器。第二种情况展示了如何通过转换 operator()() 来选择特定的重载。

记忆化递归函数

好的,我们有了一个通用且模块化的记忆化函数。我们都完成了吗?别高兴太早,因为 Memoizer 不适用于递归函数。为了说明原因,请考虑下面这个尝试记忆化递归 fibonacci 函数的例子。

int fibonacci(int n)
{
    if (n == 0 || n == 1) {
        return 1;
    }
    
    return fibonacci(n - 1) + fibonacci(n - 2);
}

void recursiveTest()
{
    memo::cache::BasicOrderedCache<std::tuple<int>, int> cache;
    auto memFibonacci = memo::memoize(cache, &fibonacci);
    memFibonacci(5);
}

由于记忆化函数是非侵入性的,fibonacci 对它一无所知。当它进行递归调用时,它调用的是自身而不是其记忆化版本。我们仍然得到正确的结果,但没有记忆化的好处。

解决这个问题的方法是使用 Y 组合子。Y 组合子泛化了递归,并可用于在不支持递归的语言中实现递归函数。下面的列表显示了用于递归函数的记忆化函数的实现。

namespace memo {

template<int N>
struct placeholder {};

}

template<int N>
struct std::is_placeholder<memo::placeholder<N> > : public std::integral_constant<int, N> {
};

namespace memo {

template<typename CacheType, typename Signature>
class RecursiveMemoizer;

template<typename CacheType, typename R, typename... Args>
class RecursiveMemoizer<CacheType, R(Args...)> {

public:
    template<typename C, typename F>
    RecursiveMemoizer(C&& cache, F&& fn)
    : cache_(std::forward<C>(cache))
    , fn_(std::forward<F>(fn))
    {
        bindThis(std::make_index_sequence<sizeof...(Args)>());
    }

    RecursiveMemoizer(RecursiveMemoizer&& that)
    : cache_(std::move(that.cache_))
    , fn_(std::move(that.fn_))
    {
        bindThis(std::make_index_sequence<sizeof...(Args)>());
        that.this_ = decltype(that.this_)();
    }

    RecursiveMemoizer& operator=(RecursiveMemoizer&& that)
    {
        if (this != &that) {
            cache_ = std::move(that.cache_);
            fn_ = std::move(that.fn_);

            bindThis(std::make_index_sequence<sizeof...(Args)>());
            that.this_ = decltype(that.this_)();
        }
        return *this;
    }

<span id="cke_bm_97E" style="display: none;"> </span>    RecursiveMemoizer(const RecursiveMemoizer&) = delete;
    RecursiveMemoizer& operator=(const RecursiveMemoizer&) = delete;

    R operator()(Args... args)
    {
        auto&& pY = cache_.lookup(std::tie(args...));
        if (!!pY) {
            return *pY;
        }

        R&& y = fn_(this_, args...);
        cache_.remember(std::tie(args...), std::forward<R>(y));
        return y;
    }

private:
    template<std::size_t... Is>
    void bindThis(std::index_sequence<Is...>)
    {
        this_ = std::bind(&RecursiveMemoizer::operator(), this, memo::placeholder<Is + 1>()...);
    }

private:
    CacheType cache_;
    std::function<R(std::function<R(Args...)>&, Args...)> fn_;
    std::function<R(Args...)> this_;
};
​

在记忆化递归函数时,函数需要了解其记忆化版本,反之亦然。我们不能在编译时存在类型之间的循环依赖。RecursiveMemoizer 使用类型擦除接口(std::function)来打破循环。下面的列表展示了如何记忆化递归的 fibonacci 函数。

int fibonacci(std::function<int(int)>& self, int n)
{
    if (n == 0 || n == 1) {
        return 1;
    }
    
    return self(n - 1) + self(n - 2);
}

void recursiveTest()
{
    memo::cache::BasicOrderedCache<std::tuple<int>, int> cache;
    memo::RecursiveMemoizer<decltype(cache), int(int)> memFibonacci(cache, &fibonacci);
    std::cout << memFibonacci(15) << std::endl;
}

记忆化的先决条件

当我们记忆化一个函数时,我们隐式地对其做出以下假设:

  1. 首先,给定相同的输入,函数每次都返回相同的值,即不存在“隐藏”输入(如全局变量等)。
  2. 其次,返回值完全封装了调用该函数的效果,即没有副作用。

满足这些要求的函数称为纯函数。如果函数不是纯函数,则其记忆化是无效的。在记忆化成员函数时尤其需要小心。非静态成员函数有一个隐式的 this 参数,对象状态的变化可能会使记忆化结果失效。

关注点

  1. 紧耦合的记忆化函数无法轻松地适应新应用程序。它们还做出悲观的数据结构选择,以实现通用性。
  2. 模块化记忆化函数将缓存类型与记忆化函数解耦,并可以配置为满足特定应用程序的需求。
  3. C++ 模板允许我们编写通用的记忆化函数,而无需使用类型擦除接口(递归函数的情况除外)。
  4. 递归函数可以通过使用 Y 组合子进行记忆化。
© . All rights reserved.