ウィンドウサイズに依存しないMin-Max Filterの実装

背景

1次元のデータ列に対して、近傍Wでの最大値/最小値を求めたい。たとえばW=3で以下のようなmin3,max3を計算したい。

input:  9,2,8,3,1,3,2,0
max3:     9,8,8,3,3,3
min3:     2,2,1,1,1,0

画像処理では膨張収縮、オープンクローズフィルタなんかによく使われている。とても基本的な信号処理だけど、naiveに実装するとデータ数N、窓サイズWに対してO(NW)の時間がかかる。

/**
 * naive implementations of min-max filter
 */
template<typename InputIter, typename OutputIter>
void minmax_filter(InputIter first, InputIter last, size_t w,
                   OutputIter min_values, OutputIter max_values)
{
    typedef typename std::iterator_traits<InputIter>::value_type value_t;
    size_t size = std::distance(first, last);

    for (size_t i = 0; i < size - w + 1; i++, ++first, ++min_values, ++max_values) {
        value_t min_value, max_value;
        min_value = max_value = *first;

        for (size_t j = 1; j < w; j++) {
            min_value = std::min(min_value, first[j]);
            max_value = std::max(max_value, first[j]);
        }
        *min_values = min_value;
        *max_values = max_value;
    }
}

うまく実装するとこれをO(N)に落とせるとか、min/maxを得るための比較回数の下限は1.5N回であるとかが分かっているようで、今回は比較回数3N回でシンプルに実装できるものを書いてみた。

実装

参考にしたのはこれ。2006年の論文。
Streaming maximum-minimum filter using no more than three comparisons per element
論文自体に30行くらいのC++コードが添付されているのでありがたい。google codeにも筆者がコードを公開している。
maxminfilters - Fast maximum-minimum filters implemented in C++ - Google Project Hosting

ただし上のコード、Streaming Filterと言いながらランダムアクセスしていたり、double型に限定していたりするので、もうちょっとSTL likeに書き直してみたのが以下。ほぼ論文の疑似コードそのまんま。

nyanp/fast-minmax · GitHub

/**
 * 1D Maximum-Minimum filters
 *
 * it implements fast min-max filter algorithms described in following paper:
 * D. Lemire, "Streaming maximum-minimum filter using no more than three
 * comparisons per element", Nordic Journal of Computing 13 (4) (2006) 328-339.
 *
 * @param first, last            [in]  forward iterators defining the input range
 *                                     [first, last) to process
 * @param w                      [in]  window size of the filter
 * @param min_values, max_values [out] output iterator defining the beginning
 *                                     of the destination range. output size is
 *                                     distance(first, last) - w + 1.
 */
template<typename InputIter, typename OutputIter>
void minmax_filter(InputIter first, InputIter last, size_t w,
                   OutputIter min_values, OutputIter max_values)
{
    typedef typename std::iterator_traits<InputIter>::value_type value_t;
    typedef typename std::iterator_traits<InputIter>::reference ref_t;
    std::deque<std::pair<size_t, ref_t> > U, L;

    U.emplace_back(0, *first);
    L.emplace_back(0, *first);
    value_t prev = *first++;
    size_t i;

    for (i = 1; first != last; ++i, ++first) {
        if (i >= w) {
            *min_values++ = L.front().second;
            *max_values++ = U.front().second;
        }

        if (*first > prev) {
            U.pop_back();
            while (!U.empty() && *first > U.back().second)
                U.pop_back();
        } else {
            L.pop_back();
            while (!L.empty() && *first < L.back().second) 
                L.pop_back();
        }
        U.emplace_back(i, *first);
        L.emplace_back(i, *first);

        if (i == w + U.front().first)
            U.pop_front();
        else if (i == w + L.front().first)
            L.pop_front();

        prev = *first;
    }

    if (i >= w) {
        *max_values = U.front().second;
        *min_values = L.front().second;
    }
}

ベンチマーク

100万要素のdoubleに対するmin/maxを窓サイズ変えながら計測してみる。計測コードはこんな感じ。

void benchmark() {
    vector<double> v;

    std::mt19937 engine;
    std::uniform_real_distribution<> dist;

    for (int i = 0; i < 1000000; i++) {
        v.push_back(dist(engine));
    }

    vector<double> minV, maxV;
    minV.resize(1000000);
    maxV.resize(1000000);

    for (int w = 5; w < 100; w += 5) {
        auto start = chrono::high_resolution_clock::now();
        minmax_filter(v.begin(), v.end(), w, &minV[0], &maxV[0]); 
        auto end = chrono::high_resolution_clock::now();

        auto diff = end - start;
        cout << w << ":" << chrono::duration_cast<chrono::milliseconds>(diff).count() << endl;
    }
}
window-size naive(ms) fast-1(ms) fast-2(ms)
5 12 63 62
10 31 68 66
20 63 66 66
30 91 67 71
40 125 66 68
50 155 65 67

fast-1が今回の実装、fast-2は論文筆者の実装。計測はVS2013 /O2で実施。

window-sizeが小さいうちはnaiveなほうが高速。どちらの実装もまだ最適化の余地がありそうだけど*1、今回の実装ではW=20あたりで逆転している。いろんなWが想定されるケースだと、Wに応じてアルゴリズムを切り替えたりすると良さそう。

所感

1次元のMin/Maxみたいな超基本的っぽいアルゴリズムでも、まだまだ改善されているんだなーと小学生みたいな感想。

*1:naiveなほうはSIMD使うとか、fastなほうはdequeのアロケータを入れ替えるとか