1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
| import numpy as np np.random.seed(None)
class MyGMM(object): def __init__(self,K=3): ''' 高斯混合模型,用EM算法进行求解 :param K: 超参数,分类类别
涉及到的其它参数: :param N: 样本量 :param D: 单个样本的维度 :param alpha: 模型参数,高斯函数的系数,决定高斯函数的高度,维度(K) :param mu: 模型参数,高斯函数的均值,决定高斯函数的中型位置,维度(K, D) :param Sigma: 模型参数,高斯函数的方差矩阵,决定高斯函数的形状,维度(K, D, D) :param gamma: 模型隐变量,决定单个样本具体属于哪一个高斯分布,维度(N, K) ''' self.K=K self.params={ 'alpha':None, 'mu': None, 'Sigma': None, 'gamma': None }
self.N=None self.D=None
def __init_params(self): # alpha 需要满足和为1的约束条件 alpha=np.random.rand(self.K) alpha=alpha/np.sum(alpha) mu=np.random.rand(self.K,self.D) Sigma=np.array([np.identity(self.D) for _ in range(self.K)]) # 虽然gamma有约束条件,但是第一步E步时会对此重新赋值,所以可以随意初始化 gamma=np.random.rand(self.N,self.K)
self.params={ 'alpha':alpha, 'mu':mu, 'Sigma':Sigma, 'gamma':gamma }
def _gaussian_function(self,y_j,mu_k,Sigma_k): ''' 计算高维度高斯函数 :param y_j: 第j个观测值 :param mu_k: 第k个mu值 :param Sigma_k: 第k个Sigma值 :return: ''' # 先取对数 n_1=self.D*np.log(2*np.pi) # 计算数组行列式的符号和(自然)对数 _,n_2=np.linalg.slogdet(Sigma_k) # 计算矩阵的(乘法)逆矩阵 n_3=np.dot(np.dot((y_j-mu_k).T,np.linalg.inv(Sigma_k)),y_j-mu_k) # 返回时重新取指数抵消前面的取对数操作 return np.exp(-0.5*(n_1+n_2+n_3))
def _E_step(self,y): alpha=self.params['alpha'] mu=self.params['mu'] Sigma=self.params['Sigma']
for j in range(self.N): y_j=y[j] gamma_list=[] for k in range(self.K): alpha_k=alpha[k] mu_k=mu[k] Sigma_k=Sigma[k] gamma_list.append(alpha_k*self._gaussian_function(y_j,mu_k,Sigma_k)) # 对隐变量进行迭代更新 self.params['gamma'][j,:]=np.array([v/np.sum(gamma_list) for v in gamma_list])
def _M_step(self,y): mu=self.params['mu'] gamma=self.params['gamma'] for k in range(self.K): mu_k=mu[k] gamma_k=gamma[:,k] gamma_k_j_list=[] mu_k_part_list=[] Sigma_k_part_list=[] for j in range(self.N): y_j=y[j] gamma_k_j=gamma_k[j] gamma_k_j_list.append(gamma_k_j) # mu_k的分母的分母列表 mu_k_part_list.append(gamma_k_j*y_j) # Sigma_k的分母列表 Sigma_k_part_list.append(gamma_k_j*np.outer(y_j-mu_k,(y_j-mu_k).T)) # 对模型参数进行迭代更新 self.params['mu'][k]=np.sum(mu_k_part_list,axis=0)/np.sum(gamma_k_j_list) self.params['Sigma'][k]=np.sum(Sigma_k_part_list,axis=0)/np.sum(gamma_k_j_list) self.params['alpha'][k]=np.sum(gamma_k_j_list)/self.N
def fit(self,y,max_iter=100): y=np.array(y) self.N,self.D=y.shape self.__init_params() for _ in range(max_iter): self._E_step(y) self._M_step(y)
def get_samples(n_ex=1000,n_classes=3,n_in=2,seed=None): # 生成1000个样本,为了能够在二维平面上画出图线表示出来,每个样本的特征维度设置为2 from sklearn.datasets import make_blobs y,_=make_blobs(n_samples=n_ex,centers=n_classes,n_features=n_in,random_state=seed) return y
def run_my_model(K=3): from matplotlib import pyplot as plt my=MyGMM(K) y=get_samples() print(y.shape) while True: try: my.fit(y) except Exception as e: print(e) else: break
max_index=np.argmax(my.params['gamma'],axis=1) print(max_index)
k_lists=[] for _ in range(K): k_lists.append([]) for y_i,index in zip(y,max_index): k_lists[index].append(y_i) for i in range(K): k_lists[i]=np.array(k_lists[i]) print(k_lists) cnames=['aliceblue', 'antiquewhite', 'aqua', 'aquamarine', 'azure', 'beige', 'bisque', 'black', 'blanchedalmond', 'blue', 'blueviolet', 'brown', 'burlywood', 'cadetblue', 'chartreuse', 'chocolate', 'coral', 'cornflowerblue', 'cornsilk', 'crimson', 'cyan', 'darkblue', 'darkcyan', 'darkgoldenrod', 'darkgray'] for i in range(K): if k_lists[i].size==0: # 聚类数据计算过程中,出现奇异矩阵的情况,产生异常,导致k_lists[i]内容为空(或者没有聚到类?我不清楚产生空的列表的原因,但一旦有异常就必会有空列表的产生) continue plt.scatter(k_lists[i][:,0],k_lists[i][:,1],c=cnames[i],alpha=0.8) plt.show()
run_my_model(7)
|