从attention到Transformer+CV中的self-attention_将自self-attention扩展到transformer体系结构-程序员宅基地

技术标签: 文献阅读  pytorch  transformer  attention  

一.总体结构

由于rnn等循环神经网络有时序依赖,导致无法并行计算,而Transformer主体框架是一个encoder-decoder结构,去掉了RNN序列结构,完全基于attention和全连接。同时为了弥补词与词之间时序信息,将词位置embedding成向量输入模型.

二.每一步拆分

1.padding mask

对于输入序列一般我们都要进行padding补齐,也就是说设定一个统一长度N,在较短的序列后面填充0到长度为N。对于那些补零的数据来说,我们的attention机制不应该把注意力放在这些位置上,所以我们需要进行一些处理。具体的做法是,把这些位置的值加上一个非常大的负数(负无穷),这样经过softmax后,这些位置的权重就会接近0。Transformer的padding mask实际上是一个张量,每个值都是一个Boolean,值为false的地方就是要进行处理的地方。

def padding_mask(seq_k, seq_q):
    len_q = seq_q.size(1)
    print('=len_q:', len_q)
    # `PAD` is 0
    pad_mask_ = seq_k.eq(0)#每句话的pad mask
    print('==pad_mask_:', pad_mask_)
    pad_mask = pad_mask_.unsqueeze(1).expand(-1, len_q, -1)  # shape [B, L_q, L_k]#作用于attention的mask
    print('==pad_mask', pad_mask)
    return pad_mask


def debug_padding_mask():
    Bs = 2
    inputs_len = np.random.randint(1, 5, Bs).reshape(Bs, 1)
    print('==inputs_len:', inputs_len)
    vocab_size = 6000  # 词汇数
    max_seq_len = int(max(inputs_len))
    # vocab_size = int(max(inputs_len))
    x = np.zeros((Bs, max_seq_len), dtype=np.int)
    for s in range(Bs):
        for j in range(inputs_len[s][0]):
            x[s][j] = j + 1
    x = torch.LongTensor(torch.from_numpy(x))
    print('x.shape', x.shape)
    mask = padding_mask(seq_k=x, seq_q=x)
    print('==mask:', mask.shape)

if __name__ == '__main__':
    debug_padding_mask()

2.Position encoding

其也叫做Position embedding,由于Transformer模型没有使用RNN,故Position encoding(PE)的目的就是实现文本序列的顺序(或者说位置)信息而出现的。

代码实现如下:输入batch内的词位置,输出是batch内的每个词的位置embedding向量.



class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_len):
        """初始化
        Args:
            d_model: 一个标量。模型的维度,论文默认是512
            max_seq_len: 一个标量。文本序列的最大长度
        """
        super(PositionalEncoding, self).__init__()
        # 根据论文给的公式,构造出PE矩阵
        position_encoding = np.array([
            [pos / np.power(10000, 2.0 * (j // 2) / d_model) for j in range(d_model)]
            for pos in range(max_seq_len)]).astype(np.float32)
        # 偶数列使用sin,奇数列使用cos
        position_encoding[:, 0::2] = np.sin(position_encoding[:, 0::2])
        position_encoding[:, 1::2] = np.cos(position_encoding[:, 1::2])
        # 在PE矩阵的第一行,加上一行全是0的向量,代表这`PAD`的positional encoding
        # 在word embedding中也经常会加上`UNK`,代表位置单词的word embedding,两者十分类似
        # 那么为什么需要这个额外的PAD的编码呢?很简单,因为文本序列的长度不一,我们需要对齐,
        # 短的序列我们使用0在结尾补全,我们也需要这些补全位置的编码,也就是`PAD`对应的位置编码
        position_encoding = torch.from_numpy(position_encoding)  # [max_seq_len, model_dim]
        # print('==position_encoding.shape:', position_encoding.shape)
        pad_row = torch.zeros([1, d_model])
        position_encoding = torch.cat((pad_row, position_encoding))  # [max_seq_len+1, model_dim]
        # print('==position_encoding.shape:', position_encoding.shape)
        # 嵌入操作,+1是因为增加了`PAD`这个补全位置的编码,
        # Word embedding中如果词典增加`UNK`,我们也需要+1。看吧,两者十分相似
        self.position_encoding = nn.Embedding(max_seq_len + 1, d_model)
        self.position_encoding.weight = nn.Parameter(position_encoding,
                                                     requires_grad=False)

    def forward(self, input_len):
        """神经网络的前向传播。
        Args:
          input_len: 一个张量,形状为[BATCH_SIZE, 1]。每一个张量的值代表这一批文本序列中对应的长度。
        Returns:
          返回这一批序列的位置编码,进行了对齐。
        """
        # 找出这一批序列的最大长度
        max_len = torch.max(input_len)
        tensor = torch.cuda.LongTensor if input_len.is_cuda else torch.LongTensor
        # 对每一个序列的位置进行对齐,在原序列位置的后面补上0
        # 这里range从1开始也是因为要避开PAD(0)的位置
        input_pos = tensor(
            [list(range(1, len + 1)) + [0] * (max_len - len) for len in input_len])
        # print('==input_pos:', input_pos)#pad补齐
        # print('==input_pos.shape:', input_pos.shape)#[bs, max_len]
        return self.position_encoding(input_pos)

def debug_posion():
    """d_model:模型的维度"""
    bs = 16
    x_sclar = np.random.randint(1, 30, bs).reshape(bs, 1)
    model = PositionalEncoding(d_model=512, max_seq_len=int(max(x_sclar)))
    x = torch.from_numpy(x_sclar)#[bs, 1]
    print('===x:', x)
    print('====x.shape', x.shape)
    out = model(x)
    print('==out.shape:', out.shape)#[bs, max_seq_len, model_dim]
if __name__ == '__main__':
    debug_posion()

3.Scaled dot-product attention实现

Q,K,V:可看成一个batch内词的三个embedding向量和矩阵相乘得到的,而这个矩阵就是需要学习的,通过Q,K获取attention score作用于V上获取加权的V.这样一句话的不同词就获取了不同关注度.注意,Q,K,V这 3 个向量一般比原来的词向量的长度更小。假设这 3 个向量的长度是64 ,而原始的词向量或者最终输出的向量的长度是 512(Q,K,V这 3 个向量的长度,和最终输出的向量长度,是有倍数关系的)

上图中,有两个词向量:Thinking 的词向量 x1 和 Machines 的词向量 x2。以 x1 为例,X1 乘以 WQ 得到 q1,q1 就是 X1 对应的 Query 向量。同理,X1 乘以 WK 得到 k1,k1 是 X1 对应的 Key 向量;X1 乘以 WV 得到 v1,v1 是 X1 对应的 Value 向量。

对应代码实现: 


class ScaledDotProductAttention(nn.Module):
    """Scaled dot-product attention mechanism."""

    def __init__(self, attention_dropout=0.5):
        super(ScaledDotProductAttention, self).__init__()
        self.dropout = nn.Dropout(attention_dropout)
        self.softmax = nn.Softmax(dim=2)

    def forward(self, q, k, v, scale=None, attn_mask=None):
        """前向传播.
        Args:
          q: Queries张量,形状为[B, L_q, D_q]
          k: Keys张量,形状为[B, L_k, D_k]
          v: Values张量,形状为[B, L_v, D_v],一般来说就是k
          scale: 缩放因子,一个浮点标量
          attn_mask: Masking张量,形状为[B, L_q, L_k]
        Returns:
          上下文张量和attetention张量
        """
        attention = torch.bmm(q, k.transpose(1, 2))  # [B, sequence, sequence]
        print('===attention.shape', attention)
        if scale:
            attention = attention * scale

        if attn_mask is not None:
            # 给需要mask的地方设置一个负无穷
            attention = attention.masked_fill_(attn_mask, -np.inf)
        print('===attention.shape', attention)

        attention = self.softmax(attention)  # [B, sequence, sequence]
        # print('===attention.shape', attention.shape)
        attention = self.dropout(attention)  # [B, sequence, sequence]
        # print('===attention.shape', attention.shape)
        context = torch.bmm(attention, v)  # [B, sequence, dim]
        return context, attention

def debug_scale_attention():
    model = ScaledDotProductAttention()
    # B, L_q, D_q = 32, 100, 128
    B, L_q, D_q = 2, 4, 10
    pading_mask = torch.tensor([[[False, False, False, False],
                                 [False, False, False, False],
                                 [False, False, False, False],
                                 [False, False, False, False]],

                                [[False, False,  True,  True],
                                 [False, False,  True,  True],
                                 [False, False,  True,  True],
                                 [False, False,  True,  True]]])
    q, k, v = torch.rand(B, L_q, D_q), torch.rand(B, L_q, D_q), torch.rand(B, L_q, D_q)
    print('==q.shape:', q.shape)
    print('====k.shape', k.shape)
    print('==v.shape:', v.shape)
    out = model(q, k, v, attn_mask=pading_mask)
if __name__ == '__main__':
    debug_scale_attention()

注意q和k,v维度可以不一样


import torch.nn as nn
d_model = 256
nhead = 8
multihead_attn1 = nn.MultiheadAttention(d_model, nhead, dropout=0.1)
src1 = torch.rand((256, 1, 256))
src2 = torch.rand((1024, 1, 256))
src2_key_padding_mask = torch.zeros((1, 1024))
src12 = multihead_attn1(query=src1,
                        key=src2,
                        value=src2, attn_mask=None,
                                   key_padding_mask=src2_key_padding_mask)[0]

print('=src12.shape:', src12.shape)

key_padding_mask = torch.zeros((1, 1024))
num_heads = 8
q = torch.rand((256, 1, 256))
tgt_len, bsz, embed_dim = q.size()
head_dim = embed_dim // num_heads
q = q.contiguous().view(tgt_len, bsz * num_heads, head_dim).transpose(0, 1)
print('==q.shape:', q.shape)
k = torch.rand((1024, 1, 256))
v = torch.rand((1024, 1, 256))
k = k.contiguous().view(-1, bsz * num_heads, head_dim).transpose(0, 1)
src_len = k.size(1)
v = v.contiguous().view(-1, bsz * num_heads, head_dim).transpose(0, 1)
print('==k.shape:', k.shape)
print('==v.shape:', v.shape)
attn_output_weights = torch.bmm(q, k.transpose(1, 2))
print('==attn_output_weights.shape:', attn_output_weights.shape)
if key_padding_mask is not None:
    attn_output_weights = attn_output_weights.view(bsz, num_heads, tgt_len, src_len)
    attn_output_weights = attn_output_weights.masked_fill(
        key_padding_mask.unsqueeze(1).unsqueeze(2),
        float('-inf'),
    )
    attn_output_weights = attn_output_weights.view(bsz * num_heads, tgt_len, src_len)
attn_output_weights = F.softmax(
        attn_output_weights, dim=-1)
print('==attn_output_weights.shape:', attn_output_weights.shape)
attn_output = torch.bmm(attn_output_weights, v)
print('==attn_output.shape:', attn_output.shape)
attn_output = attn_output.transpose(0, 1).contiguous().view(tgt_len, bsz, embed_dim)
print('==attn_output.shape:', attn_output.shape)

 

4.Multi-Head Attention

                      

其中H就是Multi-Head可看出首先对Q,K,V进行一次线性变换,然后进行切分,对每一个切分的部分进行attention(Scaled dot-product attention),然后最后将结果进行合并.有一种类似通道加权的感觉.

对应代码实现: 


class MultiHeadAttention(nn.Module):
    def __init__(self, model_dim=512, num_heads=8, dropout=0.0):
        """model_dim:词向量维度
            num_heads:头个数
        """
        super(MultiHeadAttention, self).__init__()
        self.dim_per_head = model_dim // num_heads#split个数也就是每个head要处理维度
        self.num_heads = num_heads
        self.linear_k = nn.Linear(model_dim, self.dim_per_head * num_heads)
        self.linear_v = nn.Linear(model_dim, self.dim_per_head * num_heads)
        self.linear_q = nn.Linear(model_dim, self.dim_per_head * num_heads)

        self.dot_product_attention = ScaledDotProductAttention(dropout)
        self.linear_final = nn.Linear(model_dim, model_dim)
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(model_dim)

    def forward(self, key, value, query, attn_mask=None):
        residual = query# [B, sequence, model_dim]

        dim_per_head = self.dim_per_head
        num_heads = self.num_heads
        batch_size = key.size(0)

        # linear projection
        key = self.linear_k(key)# [B, sequence, model_dim]
        value = self.linear_v(value)# [B, sequence, model_dim]
        query = self.linear_q(query)# [B, sequence, model_dim]
        # print('===key.shape:', key.shape)
        # print('===value.shape:', value.shape)
        # print('==query.shape:', query.shape)

        # split by heads
        key = key.view(batch_size * num_heads, -1, dim_per_head)# [B* num_heads, sequence, model_dim//*num_heads]
        value = value.view(batch_size * num_heads, -1, dim_per_head)# [B* num_heads, sequence, model_dim//*num_heads]
        query = query.view(batch_size * num_heads, -1, dim_per_head)# [B* num_heads, sequence, model_dim//*num_heads]
        # print('===key.shape:', key.shape)
        # print('===value.shape:', value.shape)
        # print('==query.shape:', query.shape)

        if attn_mask:
            attn_mask = attn_mask.repeat(num_heads, 1, 1)
        # scaled dot product attention
        scale = (key.size(-1) // num_heads) ** -0.5
        context, attention = self.dot_product_attention(
          query, key, value, scale, attn_mask)
        # print('===context.shape', context.shape)# [B* num_heads, sequence, model_dim//*num_heads]
        # print('===attention.shape', attention.shape)# [B* num_heads, sequence, sequence]
        # concat heads
        context = context.view(batch_size, -1, dim_per_head * num_heads)# [B, sequence, model_dim]
        # print('===context.shape', context.shape)
        # final linear projection
        output = self.linear_final(context)# [B, sequence, model_dim]
        # print('===context.shape', context.shape)
        # dropout
        output = self.dropout(output)
        # add residual and norm layer
        output = self.layer_norm(residual + output)# [B, sequence, model_dim]
        # print('==output.shape:', output.shape)
        return output, attention
def debug_mutil_head_attention():
    model = MultiHeadAttention()
    B, L_q, D_q = 32, 100, 512
    q, k, v = torch.rand(B, L_q, D_q), torch.rand(B, L_q, D_q), torch.rand(B, L_q, D_q)
    # print('==q.shape:', q.shape)# [B, sequence, model_dim]
    # print('====k.shape', k.shape)# [B, sequence, model_dim]
    # print('==v.shape:', v.shape)# [B, sequence, model_dim]
    out, _ = model(q, k, v)# [B, sequence, model_dim]
    print('==out.shape:', out.shape)
if __name__ == '__main__':
    debug_mutil_head_attention()

5.Positional-wise feed forward network(前馈神经网络层)

如上图中画框就是其所在,

代码:


#Position-wise Feed Forward Networks
class PositionalWiseFeedForward(nn.Module):
    def __init__(self, model_dim=512, ffn_dim=2048, dropout=0.0):
        """model_dim:词向量的维度
            ffn_dim:卷积输出的维度
        """
        super(PositionalWiseFeedForward, self).__init__()
        self.w1 = nn.Conv1d(model_dim, ffn_dim, 1)
        self.w2 = nn.Conv1d(ffn_dim, model_dim, 1)
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(model_dim)

    def forward(self, x):#[B, sequence, model_dim]
        output = x.transpose(1, 2)#[B, model_dim, sequence]
        # print('===output.shape:', output.shape)
        output = self.w2(F.relu(self.w1(output)))#[B, model_dim, sequence]
        output = self.dropout(output.transpose(1, 2))#[B, sequence, model_dim]

        # add residual and norm layer
        output = self.layer_norm(x + output)
        return output

def debug_PositionalWiseFeedForward():
    B, L_q, D_q = 32, 100, 512
    x = torch.rand(B, L_q, D_q)
    model = PositionalWiseFeedForward()
    out = model(x)
    print('==out.shape:', out.shape)
if __name__ == '__main__':
    debug_PositionalWiseFeedForward()

6.encoder实现

其共有6层4,5的结构,可看出q k v 均来自同一文本.


def sequence_mask(seq):
    batch_size, seq_len = seq.size()
    mask = torch.triu(torch.ones((seq_len, seq_len), dtype=torch.uint8),
                    diagonal=1)
    mask = mask.unsqueeze(0).expand(batch_size, -1, -1)  # [B, L, L]
    return mask


def padding_mask(seq_k, seq_q):
    len_q = seq_q.size(1)
    # `PAD` is 0
    pad_mask = seq_k.eq(0)
    pad_mask = pad_mask.unsqueeze(1).expand(-1, len_q, -1)  # shape [B, L_q, L_k]
    return pad_mask

class EncoderLayer(nn.Module):
    """一个encode的layer实现"""

    def __init__(self, model_dim=512, num_heads=8, ffn_dim=2018, dropout=0.0):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(model_dim, num_heads, dropout)
        self.feed_forward = PositionalWiseFeedForward(model_dim, ffn_dim, dropout)

    def forward(self, inputs, attn_mask=None):
        # self attention
        # [B, sequence, model_dim]  [B* num_heads, sequence, sequence]
        context, attention = self.attention(inputs, inputs, inputs, attn_mask)
        # feed forward network
        output = self.feed_forward(context)  # [B, sequence, model_dim]
        return output, attention


class Encoder(nn.Module):
    """编码器实现 总共6层"""

    def __init__(self,
                 vocab_size,
                 max_seq_len,
                 num_layers=6,
                 model_dim=512,
                 num_heads=8,
                 ffn_dim=2048,
                 dropout=0.0):
        super(Encoder, self).__init__()

        self.encoder_layers = nn.ModuleList(
            [EncoderLayer(model_dim, num_heads, ffn_dim, dropout) for _ in range(num_layers)])

        self.seq_embedding = nn.Embedding(vocab_size + 1, model_dim, padding_idx=0)
        self.pos_embedding = PositionalEncoding(model_dim, max_seq_len)

    #       [bs, max_seq_len]  [bs, 1]
    def forward(self, inputs, inputs_len):
        output = self.seq_embedding(inputs)  # [bs, max_seq_len, model_dim]
        print('========output.shape', output.shape)
        # 加入位置信息embedding
        output += self.pos_embedding(inputs_len)  # [bs, max_seq_len, model_dim]
        print('========output.shape', output.shape)

        self_attention_mask = padding_mask(inputs, inputs)

        attentions = []
        for encoder in self.encoder_layers:
            output, attention = encoder(output, attn_mask=None)
            # output, attention = encoder(output, self_attention_mask)
            attentions.append(attention)

        return output, attentions

def debug_encoder():
    Bs = 16
    inputs_len = np.random.randint(1, 30, Bs).reshape(Bs, 1)
    # print('==inputs_len:', inputs_len)  # 模拟获取每个词的长度
    vocab_size = 6000  # 词汇数
    max_seq_len = int(max(inputs_len))
    # vocab_size = int(max(inputs_len))
    x = np.zeros((Bs, max_seq_len), dtype=np.int)
    for s in range(Bs):
        for j in range(inputs_len[s][0]):
            x[s][j] = j+1
    x = torch.LongTensor(torch.from_numpy(x))
    inputs_len = torch.from_numpy(inputs_len)#[Bs, 1]
    model = Encoder(vocab_size=vocab_size, max_seq_len=max_seq_len)
    # x = torch.LongTensor([list(range(1, max_seq_len + 1)) for _ in range(Bs)])#模拟每个单词
    print('==x.shape:', x.shape)
    print(x)
    model(x, inputs_len=inputs_len)

if __name__ == '__main__':
    debug_encoder()

7.Sequence Mask

样本:“我/爱/机器/学习”和 "i/ love /machine/ learning"

训练:
7.1. 把“我/爱/机器/学习”embedding后输入到encoder里去,最后一层的encoder最终输出的outputs [10, 512](假设我们采用的embedding长度为512,而且batch size = 1),此outputs 乘以新的参数矩阵,可以作为decoder里每一层用到的K和V;

7.2. 将<bos>作为decoder的初始输入,将decoder的最大概率输出词 A1和‘i’做cross entropy计算error。

7.3. 将<bos>,"i" 作为decoder的输入,将decoder的最大概率输出词 A2 和‘love’做cross entropy计算error。

7.4. 将<bos>,"i","love" 作为decoder的输入,将decoder的最大概率输出词A3和'machine' 做cross entropy计算error。

7.5. 将<bos>,"i","love ","machine" 作为decoder的输入,将decoder最大概率输出词A4和‘learning’做cross entropy计算error。

7.6. 将<bos>,"i","love ","machine","learning" 作为decoder的输入,将decoder最大概率输出词A5和终止符</s>做cross entropy计算error。

可看出上述训练过程是挨个单词串行进行的,故引入sequence mask,用于并行训练.

作用生成

8.decoder实现

也是循环6层,可以看出decoder的soft-attention,q来自于decoder,k和v来自于encoder。它体现的是encoder对decoder的加权贡献。


class DecoderLayer(nn.Module):
    """解码器的layer实现"""

    def __init__(self, model_dim, num_heads=8, ffn_dim=2048, dropout=0.0):
        super(DecoderLayer, self).__init__()

        self.attention = MultiHeadAttention(model_dim, num_heads, dropout)
        self.feed_forward = PositionalWiseFeedForward(model_dim, ffn_dim, dropout)

    # [B, sequence, model_dim] [B, sequence, model_dim]
    def forward(self,
                dec_inputs,
                enc_outputs,
                self_attn_mask=None,
                context_attn_mask=None):
        # self attention, all inputs are decoder inputs
        # [B, sequence, model_dim]  [B* num_heads, sequence, sequence]
        dec_output, self_attention = self.attention(
            key=dec_inputs, value=dec_inputs, query=dec_inputs, attn_mask=self_attn_mask)

        # context attention
        # query is decoder's outputs, key and value are encoder's inputs
        # [B, sequence, model_dim]  [B* num_heads, sequence, sequence]
        dec_output, context_attention = self.attention(
            key=enc_outputs, value=enc_outputs, query=dec_output, attn_mask=context_attn_mask)

        # decoder's output, or context
        dec_output = self.feed_forward(dec_output)  # [B, sequence, model_dim]

        return dec_output, self_attention, context_attention

class Decoder(nn.Module):
    """解码器"""
    def __init__(self,
                 vocab_size,
                 max_seq_len,
                 num_layers=6,
                 model_dim=512,
                 num_heads=8,
                 ffn_dim=2048,
                 dropout=0.0):
        super(Decoder, self).__init__()

        self.num_layers = num_layers

        self.decoder_layers = nn.ModuleList(
            [DecoderLayer(model_dim, num_heads, ffn_dim, dropout) for _ in
             range(num_layers)])

        self.seq_embedding = nn.Embedding(vocab_size + 1, model_dim, padding_idx=0)
        self.pos_embedding = PositionalEncoding(model_dim, max_seq_len)

    def forward(self, inputs, inputs_len, enc_output, context_attn_mask=None):
        output = self.seq_embedding(inputs)
        output += self.pos_embedding(inputs_len)
        print('==output.shape:', output.shape)
        self_attention_padding_mask = padding_mask(inputs, inputs)
        seq_mask = sequence_mask(inputs)
        self_attn_mask = torch.gt((self_attention_padding_mask + seq_mask), 0)

        self_attentions = []
        context_attentions = []
        for decoder in self.decoder_layers:
            # [B, sequence, model_dim]  [B* num_heads, sequence, sequence] [B* num_heads, sequence, sequence]
            output, self_attn, context_attn = decoder(
                output, enc_output, self_attn_mask=None, context_attn_mask=None)
            self_attentions.append(self_attn)
            context_attentions.append(context_attn)

        return output, self_attentions, context_attentions


def debug_decoder():
    Bs = 2
    model_dim = 512
    vocab_size = 6000 #词汇数
    inputs_len = np.random.randint(1, 5, Bs).reshape(Bs, 1)#batch里每句话的单词个数
    inputs_len = torch.from_numpy(inputs_len)  # [Bs, 1]
    max_seq_len = int(max(inputs_len))
    x = np.zeros((Bs, max_seq_len), dtype=np.int)
    for s in range(Bs):
        for j in range(inputs_len[s][0]):
            x[s][j] = j + 1
    x = torch.LongTensor(torch.from_numpy(x))#模拟每个单词
    # x = torch.LongTensor([list(range(1, max_seq_len + 1)) for _ in range(Bs)])
    print('==x:', x)
    print('==x.shape:', x.shape)
    model = Decoder(vocab_size=vocab_size, max_seq_len=max_seq_len, model_dim=model_dim)
    enc_output = torch.rand(Bs, max_seq_len, model_dim) #[B, sequence, model_dim]
    print('==enc_output.shape:', enc_output.shape)
    out, self_attentions, context_attentions = model(inputs=x, inputs_len=inputs_len, enc_output=enc_output)
    print('==out.shape:', out.shape)#[B, sequence, model_dim]
    print('==len(self_attentions):', len(self_attentions), self_attentions[0].shape)
    print('==len(context_attentions):', len(context_attentions), context_attentions[0].shape)

if __name__ == '__main__':
    debug_decoder()

9.transformer

将encoder和decoder组合起来即可.


class Transformer(nn.Module):

    def __init__(self,
               src_vocab_size,
               src_max_len,
               tgt_vocab_size,
               tgt_max_len,
               num_layers=6,
               model_dim=512,
               num_heads=8,
               ffn_dim=2048,
               dropout=0.2):
        super(Transformer, self).__init__()

        self.encoder = Encoder(src_vocab_size, src_max_len, num_layers, model_dim,
                               num_heads, ffn_dim, dropout)
        self.decoder = Decoder(tgt_vocab_size, tgt_max_len, num_layers, model_dim,
                               num_heads, ffn_dim, dropout)

        self.linear = nn.Linear(model_dim, tgt_vocab_size, bias=False)
        self.softmax = nn.Softmax(dim=2)

    def forward(self, src_seq, src_len, tgt_seq, tgt_len):
        context_attn_mask = padding_mask(tgt_seq, src_seq)
        print('==context_attn_mask.shape', context_attn_mask.shape)
        output, enc_self_attn = self.encoder(src_seq, src_len)

        output, dec_self_attn, ctx_attn = self.decoder(
          tgt_seq, tgt_len, output, context_attn_mask)

        output = self.linear(output)
        output = self.softmax(output)

        return output, enc_self_attn, dec_self_attn, ctx_attn
def debug_transoform():
    Bs = 4
    #需要翻译的
    encode_inputs_len = np.random.randint(1, 10, Bs).reshape(Bs, 1)
    src_vocab_size = 6000  # 词汇数
    encode_max_seq_len = int(max(encode_inputs_len))
    encode_x = np.zeros((Bs, encode_max_seq_len), dtype=np.int)
    for s in range(Bs):
        for j in range(encode_inputs_len[s][0]):
            encode_x[s][j] = j + 1
    encode_x = torch.LongTensor(torch.from_numpy(encode_x))

    #翻译的结果
    decode_inputs_len = np.random.randint(1, 10, Bs).reshape(Bs, 1)
    target_vocab_size = 5000  # 词汇数
    decode_max_seq_len = int(max(decode_inputs_len))
    decode_x = np.zeros((Bs, decode_max_seq_len), dtype=np.int)
    for s in range(Bs):
        for j in range(decode_inputs_len[s][0]):
            decode_x[s][j] = j + 1
    decode_x = torch.LongTensor(torch.from_numpy(decode_x))

    encode_inputs_len = torch.from_numpy(encode_inputs_len)  # [Bs, 1]
    decode_inputs_len = torch.from_numpy(decode_inputs_len)  # [Bs, 1]
    model = Transformer(src_vocab_size=src_vocab_size, src_max_len=encode_max_seq_len, tgt_vocab_size=target_vocab_size, tgt_max_len=decode_max_seq_len)
    # x = torch.LongTensor([list(range(1, max_seq_len + 1)) for _ in range(Bs)])#模拟每个单词
    print('==encode_x.shape:', encode_x.shape)
    print('==decode_x.shape:', decode_x.shape)

    model(encode_x, encode_inputs_len, decode_x, decode_inputs_len)
if __name__ == '__main__':
    debug_transoform()

10.总结

(1):相比lstm而言,其能够实现并行,而lstm由于依赖上一时刻只能串行输出;
(2):利用self-attention将每个词之间距离缩短为1,大大缓解了长距离依赖问题,所以网络相比lstm能够堆叠得更深;
(3):Transformer可以同时融合前后位置的信息,而双向LSTM只是简单的将两个方向的结果相加,严格来说仍然是单向的;
(4):完全基于attention的Transformer,可以表达字与字之间的相关关系,可解释性更强;
(5):Transformer位置信息只能依靠position encoding,故当语句较短时效果不一定比lstm好;
(6):attention计算量为O(n^2), n为文本长度,计算量较大;
(7):相比CNN能够捕获全局的信息,而不是局部信息,所以CNN缺乏对数据的整体把握。

三.CV中的self-attention

介绍完了nlp的self-attention,现在介绍CV中的,如下图所示。

1.feature map通过1*1卷积获得,q,k,v三个向量,q与k转置相乘得到attention矩阵,进行softmax归一化到0到1,在作用于V,得到每个像素的加权.

2.softmax

3,加权求和


import torch
import torch.nn as nn
import torch.nn.functional as F

class Self_Attn(nn.Module):
    """ Self attention Layer"""

    def __init__(self, in_dim):
        super(Self_Attn, self).__init__()
        self.chanel_in = in_dim

        self.query_conv = nn.Conv2d(in_channels=in_dim, out_channels=in_dim // 8, kernel_size=1)
        self.key_conv = nn.Conv2d(in_channels=in_dim, out_channels=in_dim // 8, kernel_size=1)
        self.value_conv = nn.Conv2d(in_channels=in_dim, out_channels=in_dim, kernel_size=1)
        self.gamma = nn.Parameter(torch.zeros(1))

        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        """
            inputs :
                x : input feature maps( B * C * W * H)
            returns :
                out : self attention value + input feature
                attention: B * N * N (N is Width*Height)
        """
        m_batchsize, C, width, height = x.size()
        proj_query = self.query_conv(x).view(m_batchsize, -1, width * height).permute(0, 2, 1)  # B*N*C
        proj_key = self.key_conv(x).view(m_batchsize, -1, width * height)  # B*C*N
        energy = torch.bmm(proj_query, proj_key)  # batch的matmul B*N*N
        attention = self.softmax(energy)  # B * (N) * (N)
        proj_value = self.value_conv(x).view(m_batchsize, -1, width * height)  # B * C * N

        out = torch.bmm(proj_value, attention.permute(0, 2, 1))  # B*C*N
        out = out.view(m_batchsize, C, width, height)  # B*C*H*W

        out = self.gamma * out + x
        return out, attention


def debug_attention():
    attention_module = Self_Attn(in_dim=128)
    #B,C,H,W
    x = torch.rand((2, 128, 100, 100))
    attention_module(x)


if __name__ == '__main__':
    debug_attention()

参考:

举个例子讲下transformer的输入输出细节及其他 - 知乎

The Illustrated Transformer – Jay Alammar – Visualizing machine learning one concept at a time.

machine-learning-notes/transformer_pytorch.ipynb at master · luozhouyang/machine-learning-notes · GitHub

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/fanzonghao/article/details/109240938

智能推荐

PD芯片应用在显示器方面的优势-程序员宅基地

文章浏览阅读334次,点赞8次,收藏6次。随着消费者对于高清画质的追求,Type-C接口的显示器能够满足用户对于高质量画面的需求。同时,高刷新率的显示器可以提供更加流畅、自然的视觉体验,提高用户的观看舒适度。首先,Type-C接口的显示器具有强大的扩展性和多功能性。随着技术的不断进步和应用领域的拓展,PD芯片在显示器领域的应用将越来越广泛。随着技术的不断进步和应用领域的拓展,Type-C接口的显示器将会成为主流,为用户带来更加便捷、高效的使用体验。随着技术的不断发展和消费者对于便捷、高效、智能化的需求增加,Type-C接口的显示器将会成为主流。

VBA 不连续的多行一次选中操作_vba range里面不连续的多行-程序员宅基地

文章浏览阅读3.1k次。不连续的多行一次选中操作,比如复制或者删除。这个用法,还是与前面说到过的多行一次选中删除一样的原理。为了更好操作,将不连续的多行用个range对象装起来,即可方便操作。原理:将不连续的多行使用字符串装起来,各行号间用逗号隔开。格式如:myrow = "1,3,9"然后使用这个轮子,把字符串代表的行给转换成单元格对象:Function RowsSelect(ByVal Rowstr As String) As Range Dim cc As Variant, ran As R._vba range里面不连续的多行

【随笔记】NDK 编译开源库 SQLite3_android sdk 编译 sqllite-程序员宅基地

文章浏览阅读2.4k次。一、下载源代码wget https://github.com/sqlite/sqlite/archive/refs/tags/version-3.38.0.tar.gztar -zxvf version-3.38.0.tar.gz二、编译配置脚本#!/bin/bashexport TOOLCHAIN=/opt/ndk/android-ndk-r19c/toolchains/llvm/prebuilt/linux-x86_64export CC="$TOOLCHAIN"/bin/arm_android sdk 编译 sqllite

matlab 基于小波变换的图像压缩,基于小波变换的图像压缩(matlab)-程序员宅基地

文章浏览阅读481次。【实例简介】有界面,主要是利用matlab里的自带函数完成了一个简单的小波压缩实例。界面很完整,可直接运行!【实例截图】【核心代码】小波压缩(含有压缩指标和GUI)└── 小波压缩(含有压缩指标和GUI)├── Dvalue.asv├── Dvalue.m├── PSNR.m├── RLC.asv├── RLC.m├── Recover_Dvalue.asv├── Recover_Dvalue.m..._基于matlab,针对一幅图想,实现一维离散小波变换,对小波结果进行压缩。 尝试五种不

C++异常处理:try,catch,throw,finally的用法_c++ __finally-程序员宅基地

文章浏览阅读6.2k次。写在前面所谓异常处理,即让一个程序运行时遇到自己无法处理的错误时抛出一个异常,希望调用者可以发现处理问题.异常处理的基本思想是简化程序的错误代码,为程序键壮性提供一个标准检测机制.也许我们已经使用过异常,但是你习惯使用异常了吗?现在很多软件都是n*365*24小时运行,软件的健壮性至关重要. 内容导读本文包括2个大的异常实现概念:C++的标准异常和SEH异常.C++_c++ __finally

unicode 中 CW2A CA2W两个宏的含义-程序员宅基地

文章浏览阅读1.1w次,点赞4次,收藏6次。C:convertW:wide2:toA:ASCII CW2A 将宽字符集(Unicode)转化为多字符集(ASCII)CA2W 将多字符集(ASCII)转化为宽字符集(Unicode)_cw2a

随便推点

一个简单的vue双向绑定功能演示页面(附图)_vue 设备绑定配置界面设计-程序员宅基地

文章浏览阅读335次。<!DOCTYPE html><html><head> <meta charset="UTF-8"> <title>测试页</title> <script src="https://unpkg.com/vue/dist/vue.js"></script></head>&l..._vue 设备绑定配置界面设计

灯泡的故事_口中含灯泡-程序员宅基地

文章浏览阅读953次。在英国,灯泡的包装纸上都有警示--do not put that object into your mouth.意思是不要把灯泡放进口中。 哪有人会放这东西进口?英国人都有些傻瓜... 告诉你,世事无绝对! 有天我和一个印度朋在家中看电视,我和他谈到这件事,他告诉我他们小学的教科书也有说到,因灯泡放进口后便会卡住,无论如何都拿不出来,他十分肯定书是那么说的... 但我十分怀_口中含灯泡

java有穷自动机的实现_设计有穷自动机DFA实现C++简单程序的词法分析、扫描(编译原理实验)...-程序员宅基地

文章浏览阅读491次。前面两篇(一、二)只是直观地针对已明确给出的教学语言 Tiny 源程序进行直接的词法分析(其实根本就称不上),不具有一般性(下面这个针对C++源程序的词法分析也相当单一,考虑面不足)。下面是我们的课程实验,需要结合课堂上学到的利用有限自动机DFA的方法来设计并分析源程序,提取出符合要求的Token。根据老师给出的课件以及教材上的内容,扫描程序(词法分析)有下面3种实现方式,前面两篇(一、二)就是属..._无符号数有穷自动机的实现java

存储Cache 丢失导致数据库无法open的案例_unable to open temporary cache: unable to open cac-程序员宅基地

文章浏览阅读424次。原文链接:http://www.killdb.com/2017/09/27/%e5%ad%98%e5%82%a8cache-%e4%b8%a2%e5%a4%b1%e5%af%bc%e8%87%b4%e6%95%b0%e6%8d%ae%e5%ba%93%e6%97%a0%e6%b3%95open%e7%9a%84%e6%a1%88%e4%be%8b/最近某客户的一套核心数据库由于存储问题导致清掉..._unable to open temporary cache: unable to open cache:

android中对Canvas.drawCircle()方法的理解_android canvas drawcircle-程序员宅基地

文章浏览阅读4w次,点赞3次,收藏3次。功能说明该方法用于在画布上绘制圆形,通过指定圆形圆心的坐标和半径来实现。该方法是绘制圆形的主要方法,同时也可以通过设置画笔的空心效果来绘制空心的圆形。基本语法public void drawCircle (float cx, float cy, float radius, Paint paint)参数说明cx:圆心的x坐标。cy:圆心的y坐标。radius:圆的半径。paint:绘制时所使用_android canvas drawcircle

数据分析5大软件Excel、SAS、R、SPSS、Python优势分析-程序员宅基地

文章浏览阅读4k次。01 ExcelMicrosoft Excel是微软公司的办公软件Microsoft office的组件之一,是由Microsoft为Windows和Apple Macintosh操作系..._circle数据分析软件