粒子フィルターの実装 | Particles Package

NumPySciPy で粒子フィルターを実装する

Particles
Python
Author

司馬博文

Published

12/11/2023

概要
Pythonを用いて粒子フィルターを実装する方法を,Nicolas Chopinによるparticlesパッケージを参考に解説する.

Nicolas Chopin による逐次モンテカルロ法のための Python パッケージ particles の実装を参考に,NumPy, SciPy のみを用いて1から粒子フィルターを実装することで,その仕組みを理解することを目指す.

1 リサンプリングの実装

まずリサンプリング法を実装する.今回は系統的リサンプリング法 (Carpenter et al., 1999) を用いることとする.1

1.1 システマティックリサンプリング

系統的リサンプリングとは,粒子数 \(N\) から \(M\) 個のサンプルを復元抽出する方法であって,次の2段階からなる.

  1. 各区間 \(\left[\frac{n-1}{M},\frac{n}{M}\right]\subset[0,1]\;(n\in[M])\) での変動の由来をただ一つのサンプル \(U\sim\mathrm{U}([0,1])\) から取ってしまい, \[ U^{(n)}:=\frac{n-1+U}{N}\quad(n\in[M]) \] と乱数列を定める.
  2. 正規化荷重 \(\{w^{(n)}\}_{n=1}^N\) が定める累積和 \[ F(n):=\sum_{i=1}^nw^{(i)} \] に対して,この一般化逆関数 \(F^-:[0,1]\to[N]\) を通じて,\(A^{n}:=F^{-1}(U^{(n)})\;(n\in[M])\) をサンプルとする.

これにより,粒子の添字の対応 \((1,\cdots,N)\mapsto A^{1:M}\) が得られる.

Code
import numpy as np
from numba import jit

@jit(nopython=True)
def inverse_cdf(su, W):
    """Inverse CDF algorithm for a finite distribution.
    su: (M,) ndarray of sorted uniform variables
    W: (N,) ndarray of normalized weights"""
    j = 0
    s = W[0]
    M = su.shape[0]
    A = np.empty(M, dtype=np.int64)
    for n in range(M):
        while su[n] > s:
            j += 1
            s += W[j]
        A[n] = j
    return A

def systematic(W, M):
    """Systematic resampling
    W: (N,) ndarray of normalized weights
    M : number of resampled points"""
    su = (random.rand(1) + np.arange(M)) / M
    return inverse_cdf(su, W)

1.2 荷重を保持するWeightsクラス

次に,SMC に繋げるために,粒子の荷重を保持するためのクラスを定義する.粒子の荷重は極めて小さくなり得るため,対数によって保持する.このクラス内の属性として,正規化荷重もESSも得られるようにする:

  • lw:正規化されていない荷重を対数で保持
  • w:正規化された荷重
  • ESS:有効サンプル数2

これらの属性を __init__(lw) 内で計算する.加えて add(delta) メソッドで,incremental weightsを乗じるルーチンを用意する.

Code
class Weights:
    """A class to hold the N weights of the particles"""
    def __init__(self, lw=None):
        self.lw = lw  # t=0で呼ばれた際はNoneである
        if lw is not None:
            self.lw[np.isnan(self.lw)] = -np.inf  # 欠損値処理
            m = self.lw.max()
            w = np.exp(self.lw - m)  # 大きすぎる値にならないように
            s = w.sum()
            self.W = w / s  # 正規化荷重
            self.ESS = 1.0 / np.sum(self.W ** 2)
            self.log_mean = m + np.log(s / self.N)
    
    @property
    def N(self):
        """Number of particles"""
        return 0 if self.lw is None else self.lw.shape[0]

    def add(self, delta):
        """Add increment weights delta to the log weights"""
        if self.lw is None:
            return self.__class__(lw=delta)
        else:
            return self.__class__(lw=self.lw + delta)

初期化は \[ W^i=\frac{e^{\log w^i-m}}{\sum_{j=1}^Ne^{\log w^j-m}} \] \[ m:=\log\left(\max_{i\in[N]}w^i\right) \] に基づいて計算されている.log_mean\[ \begin{align*} &\log\left(\max_{i\in[N]}w^i\right)+\log\left(\frac{\sum_{j=1}^Ne^{\log w^j-m}}{N}\right)\\ &=\log\left(\frac{1}{N}\sum_{j=1}^Nw^j\right) \end{align*} \] という値である.

2 粒子の情報保持:ParticleHistoryクラス

2.1 情報を収集するCollectorクラス

SMCの結果をプロットするために,各時間で粒子の標本統計量を SMC クラス( Section 3 )から適宜抜き出して保存しておくためのクラス Summaries を作成する.抜き出すためのメソッドを Collector クラスの継承クラスとして定義する.

Code
class Collector:
    """Base class for collectors"""
    def __init__(self, **kwargs):
        self.summary = []

    def collect(self, smc):
        self.summary.append(self.fetch(smc))

class ESSs(Collector):
    summary_name = "ESSs"
    def fetch(self, smc):
        return smc.wgts.ESS

class LogLts(Collector):
    summary_name = "LogLts"
    def fetch(self, smc):
        return smc.logLt

class Rs_flags(Collector):
    summary_name = "Rs_flags"
    def fetch(self, smc):
        return smc.rs_flag

class Moments(Collector):
    """Collects empirical moments of the particles"""
    summary_name = "Moments"
    def fetch(self, smc):
        m = np.average(smc.X, weights=smc.wgts.W, axis=0)
        m2 = np.average(smc.X ** 2, weights=smc.wgts.W, axis=0)
        v = m2 - m ** 2
        return {"mean": m, "var": v}

default_collector_cls = [ESSs, LogLts, Rs_flags]

2.2 標本統計量を保持するSummariesクラス

このクラスはデフォルトで用意されている default_collector_cls に加えて,cols引数で指定されたメソッドを追加し,collect() メソッドが呼ばれるとこれらを集めて属性として保持する.

Code
class Summaries:
    """A class to hold the summaries of the SMC algorithm"""
    def __init__(self, cols):
        self._collectors = [cls() for cls in default_collector_cls]
        if cols is not None:
            self._collectors.extend(col() for col in cols)
        for col in self._collectors:
            setattr(self, col.summary_name, col.summary)

    def collect(self, smc):
        for col in self._collectors:
            col.collect(smc)

2.3 ヒストリを保持するParticleHistoryクラス

dequeオブジェクト としてヒストリを格納するためのクラスParticleHistory実装する.これにより直前 \(k\) ステップの情報だけを保持出来るように作れるが,今回はプロットのために全履歴を保持する.

Code
class ParticleHistory:
    """History of the particles
    Full history that keeps all the particle systems based on lists.
    """
    def __init__(self, fk):
        self.X, self.A, self.wgts = [], [], []
        self.fk = fk

    def save(self, smc):
        self.X.append(smc.X)
        self.A.append(smc.A)
        self.wgts.append(smc.wgts)
Code
def generate_hist_obj(option, smc):
    if option is True:
        return ParticleHistory(smc.fk)
    else:
        return None

3 実行部分:SMCクラス

このクラスがやるべきことは多い.Feynman-Kacモデル fkSection 4.1 で後述),粒子数 N,リサンプリング法 resampling を引数に取り,粒子フィルターを実行する.

最も大事なこととして,本クラスはイテレータとして定義し,__next__ メソッドを実装する.そして run() メソッドで __next__ を終了するまで繰り返し呼び出すことでイテレータプロトコルを実行する.

__next__メソッドでは,次のような処理を行う:

  1. 終了フラッグ fk.done(self) が立っているかどうかを確認する.
  2. \(t=0\) の場合,最初の粒子を初期分布 \(M_0\) から \(N\) 個サンプリングする.
  3. \(t>0\) の場合は,リサンプリングと粒子移動を行う.これは resample_move() メソッドで行う.
    1. リサンプリングフラッグ fk.time_to_resample(self) が立っている場合にリサンプリングを systematic メソッド( Section 1.1 )により行う.これにより,移動(変異)する粒子 \(A^{1:N}_t\) を確定させる.
    2. 確率核 \(M_t(X_{t-1}^{A_t^{1:N}},-)\) に従って,粒子 \(X_t^{1:N}\) をサンプリングする.
  4. 粒子の荷重を更新する.これは reweight_particles() メソッドで行う.
  5. compute_summariesメソッドを呼び出して,粒子の標本統計量を Summaries クラスに,ヒストリを Particle History クラスに追記する.
  6. 時刻 \(t\) を進めて 3.に戻る.
Code
class SMC:
    """Metaclass for SMC algorithms"""

    def __init__(
        self,
        fk=None,
        N=100,
        resampling="systematic",
        ESSrmin=0.5,
        store_history=False,
        collect=None,
    ):

        self.fk = fk
        self.N = N
        self.resampling = resampling
        self.ESSrmin = ESSrmin

        # initialisation
        self.t = 0
        self.rs_flag = False  # no resampling at time 0, by construction
        self.logLt = 0.0
        self.wgts = Weights()
        self.X, self.Xp, self.A = None, None, None

        self.summaries = Summaries(collect)
        self.hist = generate_hist_obj(store_history, self)

    def generate_particles(self):
        """Generate particles at time t=0"""
        self.X = self.fk.M0(self.N)
    
    def reset_weights(self):
        """Reset weights to uniform after a resamping step"""
        self.wgts = Weights()
    
    def resample_move(self):
        """Adaptively resample and move particles at time t"""
        self.rs_flag = self.fk.time_to_resample(self)
        if self.rs_flag:
            self.A  = systematic(self.wgts.W, M=self.N)
            self.Xp = self.X[self.A]
            self.reset_weights()
        else:
            self.A = np.arange(self.N)
            self.Xp = self.X
        self.X = self.fk.M(self.t, self.Xp)

    def reweight_particles(self):
        """Reweight particles at time t"""
        self.wgts = self.wgts.add(self.fk.logG(self.t, self.Xp, self.X))

    def compute_summaries(self):
        """Compute summaries at time t"""
        if self.t > 0:  # なぜかこれを前におかないとUnboundLocalErrorが出る
            prec_log_mean_w = self.log_mean_w
        self.log_mean_w = self.wgts.log_mean
        if self.t == 0 or self.rs_flag:
            self.loglt = self.log_mean_w
        else:
            self.loglt = self.log_mean_w - prec_log_mean_w
        self.logLt += self.loglt

        self.hist.save(self)
        self.summaries.collect(self)

    def __next__(self):
        """One step of the SMC algorithm"""
        if self.fk.done(self):
            raise StopIteration
        if self.t == 0:
            self.generate_particles()
        else:
            self.resample_move()
        self.reweight_particles()
        self.compute_summaries()
        self.t += 1

    def __iter__(self):
        return self

    def run(self):
        """Run the SMC algorithm until completion"""
        for _ in self:
            pass

4 粒子フィルタの実行:東京の年別気温データ

4.1 Feynman-Kacモデルの枠組み

particle パッケージの抽象クラス FeynmanKac は次のメソッドを持つ.3

  • M0(N): 初期分布 \(M_0\) から \(N\) 個のサンプルを生成する.
  • M(t, xp): カーネル \(M_t(x_{t-1}|-)\) から \(X_t\) をサイズ xp.shape[0] で生成する.
  • logG(t, xp, x): ポテンシャル \(G_t(x_{t-1},x_t)\) の対数を返す.

加えて,粒子フィルターの実行時に必要なフラグも用意する.

  • time_to_resample(smc): smc オブジェクトを引数に取り,その属性 smc.aux.ESS, smc.ESSrmin からリサンプリングが必要かどうかを判定する.
  • done(smc): smc オブジェクトを引数に取り,その属性 smc.t, smc.T からアルゴリズムを終了すべきかどうかを判定する.

particle パッケージを使うときは FeynmanKac クラスを継承して用いることになるが,ここでは自分で定義していく.

4.2 使用するデータ

気象庁が HP にて公開している1876年から2022年までの計147年分の東京の年別気温データを用いる.

Code
import pandas as pd

data = pd.read_csv("TemperatureDataAtTokyo.csv")
print(data.describe())
                年度         日平均         日最高        日最低          最高          最低
count   147.000000  147.000000  147.000000  147.00000  147.000000  147.000000
mean   1949.000000   14.963946   19.337415   11.12517   35.098639   -4.317687
std      42.579338    1.132396    0.875794    1.46614    1.674956    2.439366
min    1876.000000   12.900000   17.500000    8.30000   31.600000   -9.200000
25%    1912.500000   14.000000   18.700000    9.90000   34.000000   -6.150000
50%    1949.000000   14.800000   19.300000   10.80000   34.900000   -4.700000
75%    1985.500000   15.800000   19.900000   12.30000   36.200000   -2.250000
max    2022.000000   17.300000   21.300000   13.90000   39.500000    0.900000
Code
import matplotlib.pyplot as plt

plt.figure(figsize=(3.5, 3))

plt.title("temperature in Tokyo")
plt.xlabel("Year")
plt.ylabel("Temperature (Celsius)")

plt.scatter(data['年度'], data['日平均'], s=2)
plt.show()

4.3 気温の1次のトレンドモデル

気温の観測値 \(\{y_k\}\) に対して,1次元の線型Gauss状態空間モデル \[ \begin{cases} x_k=x_{k-1}+v_k,\\ y_k=x_k+w_k. \end{cases} \tag{1}\] \[ v_k\overset{\text{iid}}{\sim}\mathrm{N}(0,Q^2),\quad w_k\overset{\text{iid}}{\sim}\mathrm{N}(0,R^2), \] を想定する.このモデルを1次のトレンドモデルという (北川, 2005, p. 第11章)

これをSMCメソッド( Section 3 )に渡せるように実装するには次のようにする:

Code
from numpy import random
from scipy import stats

class Bootstrap:
    """Abstract base class for Feynman-Kac models derived from State Space Model (1).
    """

    def __init__(self, data, T, R, Q):
        self.data = data
        self.T = T
        self.R = R
        self.Q = Q
    
    def M0(self, N):
        """Sample N times from initial distribution M0 of the FK model"""
        return random.normal(loc=13.6, scale=self.Q, size=N)
    
    def M(self, t, xp):  # xp: resampled previous state
        """Sample Xt from kernel Mt conditioned on Xt-1=xp"""
        return random.normal(loc=xp, scale=self.Q, size=xp.shape[0])
    
    def logG(self, t, xp, x):  # x: current state
        """Evaluate the log potential Gt(xt-1,xt)"""
        return stats.norm.logpdf(self.data[t], loc=x, scale=self.R)
    
    def time_to_resample(self, smc):
        """Return True if resampling is needed"""
        return smc.wgts.ESS < smc.N * smc.ESSrmin
    
    def done(self, smc):
        """Return True if the algorithm is done"""
        return smc.t >= self.T

4.4 \((R,Q)=(0.2,0.1)\) の場合

仮に \((R,Q)=(0.2,0.1)\) としてみる.すなわち,システムノイズ \(Q^2=0.1\) が小さく,観測ノイズ \(R^2=0.4\) はそれよりは大きいとしている.

Code
model1 = Bootstrap(data=data['日平均'], T=data.shape[0], R=0.2, Q=0.1)
PF1 = SMC(fk=model1, N=1000, resampling="systematic", ESSrmin=0.5, collect=[Moments], store_history=True)
PF1.run()
Code
plt.figure(figsize=(3.5, 3))
plt.plot(data['日平均'], label='data', linestyle='', marker='.')
plt.plot([m['mean'] for m in PF1.summaries.Moments], label='filtered temperature trend')
plt.show()
図 1: (R,Q)=(0.2,0.1) の場合の粒子フィルターの実行結果

少し揺らぎながらも,トレンドとして気温が上昇していく様子が見られる.

4.5 \((R,Q)=(0.7,0.1)\) の場合

濾波して得たトレンドの揺らぎが少し大きいと思われたため,観測誤差はもう少し大きいものとして \((R,Q)=(0.7,0.1)\) としてみる.

Code
model4 = Bootstrap(data=data['日平均'], T=data.shape[0], R=0.7, Q=0.1)
PF4 = SMC(fk=model4, N=1000, resampling="systematic", ESSrmin=0.5, collect=[Moments], store_history=True)
PF4.run()
Code
plt.figure(figsize=(3.5, 3))
plt.plot(data['日平均'], label='data', linestyle='', marker='.')
plt.plot([m['mean'] for m in PF4.summaries.Moments], label='filtered temperature trend')
plt.show()

(R,Q)=(0.7,0.1) の場合の粒子フィルターの実行結果

こうしてトレンドとして少しばかり直線的なものが得られた.やはり上昇トレンドが見られる.

4.6 \((R,Q)=(0.2,0.01)\) の場合

\(Q^2=10^{-4}\) としてシステムノイズは極めて小さいと想定してみる.「トレンドは殆ど変化しない」という仮定を置いたことになる.

Code
model2 = Bootstrap(data=data['日平均'], T=data.shape[0], R=0.2, Q=0.01)
PF2 = SMC(fk=model2, N=1000, resampling="systematic", ESSrmin=0.5, collect=[Moments], store_history=True)
PF2.run()
Code
plt.figure(figsize=(3.5, 3))
plt.plot(data['日平均'], label='data', linestyle='', marker='.')
plt.plot([m['mean'] for m in PF2.summaries.Moments], label='filtered temperature trend')
plt.show()

(R,Q)=(0.2,0.01) の場合の粒子フィルターの実行結果

あまり良い当てはまりを見せないため,この気温の時系列を全てが観測誤差によるものだと理解するのは妥当ではないと考えられる.

4.7 \((R,Q)=(0.2,1)\) の場合

逆にシステムノイズを極めて大きい値 \(Q^2=1\) と設定する.トレンドは年別の揺らぎが大きいと想定したことになる.

Code
model3 = Bootstrap(data=data['日平均'], T=data.shape[0], R=0.2, Q=1.0)
PF3 = SMC(fk=model3, N=1000, resampling="systematic", ESSrmin=0.5, collect=[Moments], store_history=True)
PF3.run()
Code
plt.figure(figsize=(3.5, 3))
plt.plot(data['日平均'], label='data', linestyle='', marker='.')
plt.plot([m['mean'] for m in PF3.summaries.Moments], label='filtered temperature trend')
plt.show()

(R,Q)=(0.2,1) の場合の粒子フィルターの実行結果

とんでもない過適応を見せて,全てをトレンドとして説明してしまっており,これもまた妥当ではないと考えられる.

4.8 カルマンフィルタとの比較

線型Gaussモデルを想定しているため,粒子フィルターは \(N\to\infty\) の極限で最適フィルターであるカルマンフィルターに一致するはずである.そこで,pykalman パッケージを用いてこれを実装する.\((R,Q)=(0.2,0.1)\) とする.

Code
from pykalman import KalmanFilter
KF1 = KalmanFilter(initial_state_mean=13.6, initial_state_covariance=0.1,
                   transition_matrices=1, observation_matrices=1,
                   transition_covariance=0.1, observation_covariance=0.2, n_dim_state=1, n_dim_obs=1)
KF1 = KF1.em(data['日平均'], n_iter=5)  # EMアルゴリズムの過適応回避のため
(filtered_state_means, filtered_state_covariances) = KF1.filter(data['日平均'])
Code
plt.figure(figsize=(3.5, 3))
plt.plot(data['日平均'], label='data', linestyle='', marker='.')
plt.plot(filtered_state_means, label='filtered temperature trend')
plt.show()

(R,Q)=(0.2,0.1) の場合のKalmanフィルターの実行結果

たしかに 図 1 と極めて似通った結果になっている.

4.9 カルマン平滑化の結果

Code
(smoothed_state_means, smoothed_state_covariances) = KF1.smooth(data['日平均'])
Code
plt.figure(figsize=(3.5, 3))
plt.plot(data['日平均'], label='data', linestyle='', marker='.')
plt.plot(smoothed_state_means, label='smoothed temperature trend')
plt.show()

(R,Q)=(0.2,0.1) の場合のKalman平滑化の実行結果

より滑らかなトレンドが得られている.

References

Carpenter, J., Clifford, P., and Fearnhead, P. (1999). Improved particle filter for nonlinear problems. IEE Proceedings - Rader, Sonar and Navigation, 146(1), 2–7.
Chopin, N., and Papaspiliopoulos, O. (2020). An introduction to sequential monte carlo. Springer Cham.
Kitagawa, G. (1996). Monte carlo filter and smoother for non-gaussian nonlinear state space models. Journal of Computational and Graphical Statistics, 5(1), 1–25.
北川. (2005). 時系列解析入門. 岩波書店.

Footnotes

  1. 実はこのリサンプリング法は (Kitagawa, 1996) の付録で原型が(全く決定論的なアルゴリズムとして)提案されている↩︎

  2. 有効サンプル数の定義については (Chopin and Papaspiliopoulos, 2020) 参照.↩︎

  3. Feynman-Kacモデルなどの用語については (Chopin and Papaspiliopoulos, 2020) 参照.↩︎