ウィンドウサイズに依存しないMin-Max Filterの実装
背景
1次元のデータ列に対して、近傍Wでの最大値/最小値を求めたい。たとえばW=3で以下のようなmin3,max3を計算したい。
input: 9,2,8,3,1,3,2,0 max3: 9,8,8,3,3,3 min3: 2,2,1,1,1,0
画像処理では膨張収縮、オープンクローズフィルタなんかによく使われている。とても基本的な信号処理だけど、naiveに実装するとデータ数N、窓サイズWに対してO(NW)の時間がかかる。
/** * naive implementations of min-max filter */ template<typename InputIter, typename OutputIter> void minmax_filter(InputIter first, InputIter last, size_t w, OutputIter min_values, OutputIter max_values) { typedef typename std::iterator_traits<InputIter>::value_type value_t; size_t size = std::distance(first, last); for (size_t i = 0; i < size - w + 1; i++, ++first, ++min_values, ++max_values) { value_t min_value, max_value; min_value = max_value = *first; for (size_t j = 1; j < w; j++) { min_value = std::min(min_value, first[j]); max_value = std::max(max_value, first[j]); } *min_values = min_value; *max_values = max_value; } }
うまく実装するとこれをO(N)に落とせるとか、min/maxを得るための比較回数の下限は1.5N回であるとかが分かっているようで、今回は比較回数3N回でシンプルに実装できるものを書いてみた。
実装
参考にしたのはこれ。2006年の論文。
Streaming maximum-minimum filter using no more than three comparisons per element
論文自体に30行くらいのC++コードが添付されているのでありがたい。google codeにも筆者がコードを公開している。
maxminfilters - Fast maximum-minimum filters implemented in C++ - Google Project Hosting
ただし上のコード、Streaming Filterと言いながらランダムアクセスしていたり、double型に限定していたりするので、もうちょっとSTL likeに書き直してみたのが以下。ほぼ論文の疑似コードそのまんま。
/** * 1D Maximum-Minimum filters * * it implements fast min-max filter algorithms described in following paper: * D. Lemire, "Streaming maximum-minimum filter using no more than three * comparisons per element", Nordic Journal of Computing 13 (4) (2006) 328-339. * * @param first, last [in] forward iterators defining the input range * [first, last) to process * @param w [in] window size of the filter * @param min_values, max_values [out] output iterator defining the beginning * of the destination range. output size is * distance(first, last) - w + 1. */ template<typename InputIter, typename OutputIter> void minmax_filter(InputIter first, InputIter last, size_t w, OutputIter min_values, OutputIter max_values) { typedef typename std::iterator_traits<InputIter>::value_type value_t; typedef typename std::iterator_traits<InputIter>::reference ref_t; std::deque<std::pair<size_t, ref_t> > U, L; U.emplace_back(0, *first); L.emplace_back(0, *first); value_t prev = *first++; size_t i; for (i = 1; first != last; ++i, ++first) { if (i >= w) { *min_values++ = L.front().second; *max_values++ = U.front().second; } if (*first > prev) { U.pop_back(); while (!U.empty() && *first > U.back().second) U.pop_back(); } else { L.pop_back(); while (!L.empty() && *first < L.back().second) L.pop_back(); } U.emplace_back(i, *first); L.emplace_back(i, *first); if (i == w + U.front().first) U.pop_front(); else if (i == w + L.front().first) L.pop_front(); prev = *first; } if (i >= w) { *max_values = U.front().second; *min_values = L.front().second; } }
ベンチマーク
100万要素のdoubleに対するmin/maxを窓サイズ変えながら計測してみる。計測コードはこんな感じ。
void benchmark() { vector<double> v; std::mt19937 engine; std::uniform_real_distribution<> dist; for (int i = 0; i < 1000000; i++) { v.push_back(dist(engine)); } vector<double> minV, maxV; minV.resize(1000000); maxV.resize(1000000); for (int w = 5; w < 100; w += 5) { auto start = chrono::high_resolution_clock::now(); minmax_filter(v.begin(), v.end(), w, &minV[0], &maxV[0]); auto end = chrono::high_resolution_clock::now(); auto diff = end - start; cout << w << ":" << chrono::duration_cast<chrono::milliseconds>(diff).count() << endl; } }
window-size | naive(ms) | fast-1(ms) | fast-2(ms) |
5 | 12 | 63 | 62 |
10 | 31 | 68 | 66 |
20 | 63 | 66 | 66 |
30 | 91 | 67 | 71 |
40 | 125 | 66 | 68 |
50 | 155 | 65 | 67 |
fast-1が今回の実装、fast-2は論文筆者の実装。計測はVS2013 /O2で実施。
window-sizeが小さいうちはnaiveなほうが高速。どちらの実装もまだ最適化の余地がありそうだけど*1、今回の実装ではW=20あたりで逆転している。いろんなWが想定されるケースだと、Wに応じてアルゴリズムを切り替えたりすると良さそう。