高斯混合模型是一种无监督聚类算法,一般使用EM算法进行求解。

聚类随机数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import numpy as np
np.random.seed(None)

class MyGMM(object):
def __init__(self,K=3):
'''
高斯混合模型,用EM算法进行求解
:param K: 超参数,分类类别

涉及到的其它参数:
:param N: 样本量
:param D: 单个样本的维度
:param alpha: 模型参数,高斯函数的系数,决定高斯函数的高度,维度(K)
:param mu: 模型参数,高斯函数的均值,决定高斯函数的中型位置,维度(K, D)
:param Sigma: 模型参数,高斯函数的方差矩阵,决定高斯函数的形状,维度(K, D, D)
:param gamma: 模型隐变量,决定单个样本具体属于哪一个高斯分布,维度(N, K)
'''
self.K=K
self.params={
'alpha':None,
'mu': None,
'Sigma': None,
'gamma': None
}

self.N=None
self.D=None

def __init_params(self):
# alpha 需要满足和为1的约束条件
alpha=np.random.rand(self.K)
alpha=alpha/np.sum(alpha)
mu=np.random.rand(self.K,self.D)
Sigma=np.array([np.identity(self.D) for _ in range(self.K)])
# 虽然gamma有约束条件,但是第一步E步时会对此重新赋值,所以可以随意初始化
gamma=np.random.rand(self.N,self.K)

self.params={
'alpha':alpha,
'mu':mu,
'Sigma':Sigma,
'gamma':gamma
}

def _gaussian_function(self,y_j,mu_k,Sigma_k):
'''
计算高维度高斯函数
:param y_j: 第j个观测值
:param mu_k: 第k个mu值
:param Sigma_k: 第k个Sigma值
:return:
'''
# 先取对数
n_1=self.D*np.log(2*np.pi)
# 计算数组行列式的符号和(自然)对数
_,n_2=np.linalg.slogdet(Sigma_k)
# 计算矩阵的(乘法)逆矩阵
n_3=np.dot(np.dot((y_j-mu_k).T,np.linalg.inv(Sigma_k)),y_j-mu_k)
# 返回时重新取指数抵消前面的取对数操作
return np.exp(-0.5*(n_1+n_2+n_3))

def _E_step(self,y):
alpha=self.params['alpha']
mu=self.params['mu']
Sigma=self.params['Sigma']

for j in range(self.N):
y_j=y[j]
gamma_list=[]
for k in range(self.K):
alpha_k=alpha[k]
mu_k=mu[k]
Sigma_k=Sigma[k]
gamma_list.append(alpha_k*self._gaussian_function(y_j,mu_k,Sigma_k))
# 对隐变量进行迭代更新
self.params['gamma'][j,:]=np.array([v/np.sum(gamma_list) for v in gamma_list])

def _M_step(self,y):
mu=self.params['mu']
gamma=self.params['gamma']
for k in range(self.K):
mu_k=mu[k]
gamma_k=gamma[:,k]
gamma_k_j_list=[]
mu_k_part_list=[]
Sigma_k_part_list=[]
for j in range(self.N):
y_j=y[j]
gamma_k_j=gamma_k[j]
gamma_k_j_list.append(gamma_k_j)
# mu_k的分母的分母列表
mu_k_part_list.append(gamma_k_j*y_j)
# Sigma_k的分母列表
Sigma_k_part_list.append(gamma_k_j*np.outer(y_j-mu_k,(y_j-mu_k).T))
# 对模型参数进行迭代更新
self.params['mu'][k]=np.sum(mu_k_part_list,axis=0)/np.sum(gamma_k_j_list)
self.params['Sigma'][k]=np.sum(Sigma_k_part_list,axis=0)/np.sum(gamma_k_j_list)
self.params['alpha'][k]=np.sum(gamma_k_j_list)/self.N

def fit(self,y,max_iter=100):
y=np.array(y)
self.N,self.D=y.shape
self.__init_params()
for _ in range(max_iter):
self._E_step(y)
self._M_step(y)

def get_samples(n_ex=1000,n_classes=3,n_in=2,seed=None):
# 生成1000个样本,为了能够在二维平面上画出图线表示出来,每个样本的特征维度设置为2
from sklearn.datasets import make_blobs
y,_=make_blobs(n_samples=n_ex,centers=n_classes,n_features=n_in,random_state=seed)
return y

def run_my_model(K=3):
from matplotlib import pyplot as plt
my=MyGMM(K)
y=get_samples()
print(y.shape)
while True:
try:
my.fit(y)
except Exception as e:
print(e)
else:
break

max_index=np.argmax(my.params['gamma'],axis=1)
print(max_index)

k_lists=[]
for _ in range(K):
k_lists.append([])
for y_i,index in zip(y,max_index):
k_lists[index].append(y_i)
for i in range(K):
k_lists[i]=np.array(k_lists[i])
print(k_lists)
cnames=['aliceblue', 'antiquewhite', 'aqua', 'aquamarine', 'azure', 'beige', 'bisque', 'black', 'blanchedalmond', 'blue', 'blueviolet', 'brown', 'burlywood', 'cadetblue', 'chartreuse', 'chocolate', 'coral', 'cornflowerblue', 'cornsilk', 'crimson', 'cyan', 'darkblue', 'darkcyan', 'darkgoldenrod', 'darkgray']
for i in range(K):
if k_lists[i].size==0: # 聚类数据计算过程中,出现奇异矩阵的情况,产生异常,导致k_lists[i]内容为空(或者没有聚到类?我不清楚产生空的列表的原因,但一旦有异常就必会有空列表的产生)
continue
plt.scatter(k_lists[i][:,0],k_lists[i][:,1],c=cnames[i],alpha=0.8)
plt.show()

run_my_model(7)

聚类花朵分类数据集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import numpy as np
np.random.seed(None)

class MyGMM(object):
def __init__(self,K=3):
'''
高斯混合模型,用EM算法进行求解
:param K: 超参数,分类类别

涉及到的其它参数:
:param N: 样本量
:param D: 单个样本的维度
:param alpha: 模型参数,高斯函数的系数,决定高斯函数的高度,维度(K)
:param mu: 模型参数,高斯函数的均值,决定高斯函数的中型位置,维度(K, D)
:param Sigma: 模型参数,高斯函数的方差矩阵,决定高斯函数的形状,维度(K, D, D)
:param gamma: 模型隐变量,决定单个样本具体属于哪一个高斯分布,维度(N, K)
'''
self.K=K
self.params={
'alpha':None,
'mu': None,
'Sigma': None,
'gamma': None
}

self.N=None
self.D=None

def __init_params(self):
# alpha 需要满足和为1的约束条件
alpha=np.random.rand(self.K)
alpha=alpha/np.sum(alpha)
mu=np.random.rand(self.K,self.D)
Sigma=np.array([np.identity(self.D) for _ in range(self.K)])
# 虽然gamma有约束条件,但是第一步E步时会对此重新赋值,所以可以随意初始化
gamma=np.random.rand(self.N,self.K)

self.params={
'alpha':alpha,
'mu':mu,
'Sigma':Sigma,
'gamma':gamma
}

def _gaussian_function(self,y_j,mu_k,Sigma_k):
'''
计算高维度高斯函数
:param y_j: 第j个观测值
:param mu_k: 第k个mu值
:param Sigma_k: 第k个Sigma值
:return:
'''
# 先取对数
n_1=self.D*np.log(2*np.pi)
# 计算数组行列式的符号和(自然)对数
_,n_2=np.linalg.slogdet(Sigma_k)
# 计算矩阵的(乘法)逆矩阵
n_3=np.dot(np.dot((y_j-mu_k).T,np.linalg.inv(Sigma_k)),y_j-mu_k)
# 返回时重新取指数抵消前面的取对数操作
return np.exp(-0.5*(n_1+n_2+n_3))

def _E_step(self,y):
alpha=self.params['alpha']
mu=self.params['mu']
Sigma=self.params['Sigma']
gamma=self.params['gamma'].copy()

loss=float(0)
for j in range(self.N):
y_j=y[j]
gamma_list=[]
for k in range(self.K):
alpha_k=alpha[k]
mu_k=mu[k]
Sigma_k=Sigma[k]
gamma_list.append(alpha_k*self._gaussian_function(y_j,mu_k,Sigma_k))
# 对隐变量进行迭代更新
self.params['gamma'][j,:]=np.array([v/np.sum(gamma_list) for v in gamma_list])
loss+=np.linalg.norm(self.params['gamma'][j,:]-gamma[j,:])
return loss

def _M_step(self,y):
mu=self.params['mu'].copy()
gamma=self.params['gamma']
Sigma=self.params['Sigma'].copy()
alpha=self.params['alpha'].copy()
loss=float(0)
for k in range(self.K):
mu_k=mu[k]
gamma_k=gamma[:,k]
gamma_k_j_list=[]
mu_k_part_list=[]
Sigma_k_part_list=[]
for j in range(self.N):
y_j=y[j]
gamma_k_j=gamma_k[j]
gamma_k_j_list.append(gamma_k_j)
# mu_k的分母的分母列表
mu_k_part_list.append(gamma_k_j*y_j)
# Sigma_k的分母列表
Sigma_k_part_list.append(gamma_k_j*np.outer(y_j-mu_k,(y_j-mu_k).T))
# 对模型参数进行迭代更新
self.params['mu'][k]=np.sum(mu_k_part_list,axis=0)/np.sum(gamma_k_j_list)
self.params['Sigma'][k]=np.sum(Sigma_k_part_list,axis=0)/np.sum(gamma_k_j_list)
self.params['alpha'][k]=np.sum(gamma_k_j_list)/self.N
loss+=np.linalg.norm(self.params['mu'][k]-mu[k])
loss+=np.linalg.norm(self.params['Sigma'][k]-Sigma[k])
loss+=np.linalg.norm(self.params['alpha'][k]-alpha[k])
return loss

def fit(self,y,max_iter=100):
y=np.array(y)
self.N,self.D=y.shape
self.__init_params()
for _ in range(max_iter):
var1=self._E_step(y)
var2=self._M_step(y)
print(var1,var2)
if var1<1e-6 and var2<1e-6:
break

def get_samples(n_ex=1000,n_classes=3,n_in=2,seed=None):
# 生成1000个样本,为了能够在二维平面上画出图线表示出来,每个样本的特征维度设置为2
from sklearn.datasets import make_blobs
y,_=make_blobs(n_samples=n_ex,centers=n_classes,n_features=n_in,random_state=seed)
return y

def run_my_model(K=3,max_iter=100):
from matplotlib import pyplot as plt
my=MyGMM(K)
import tensorflow as tf
import pandas as pd
TRAIN_URL='http://download.tensorflow.org/data/iris_training.csv'
train_path=tf.keras.utils.get_file(TRAIN_URL.split('/')[-1],TRAIN_URL)
print('Download location',train_path)
data=pd.read_csv(train_path,skiprows=1,names=['a','b','c','d','label'])
print(data)
y=data.iloc[:,:-1]
print(y)
while True:
try:
my.fit(y,max_iter)
except Exception as e:
print(e)
else:
break

max_index=np.argmax(my.params['gamma'],axis=1)
print(max_index)
from collections import defaultdict
d=defaultdict(int)
for yi in range(data.shape[0]):
d[f'{data.iloc[yi,-1]}{max_index[yi]}']+=1
d=sorted(d.items(),key=lambda x:x[1],reverse=True)
print(d)


run_my_model(3,1000)

使用高斯混合模型聚类花朵识别,数据处理来自knn算法进行数据的分类,聚类结果和分类标签的匹配结果时好时坏,最好的情况也是一种花的识别准确率较低,这就是监督学习带来的强大进步吧。除了算了参数的变化大小,做了迭代停止的判断条件,其他内容和上面的代码没什么区别

参考链接:机器学习之高斯混合模型(GMM)及python实现

Python Numpy计算各类距离的方法

创建于2023.3.18/18.27,修改于2023.3.18/23.50