5月27日 22:53

TensorFlow 自定义层和自定义损失函数怎么实现

为什么需要自定义层和损失函数

TensorFlow 内置的层(Dense、Conv2D 等)和损失函数(MSE、CrossEntropy 等)覆盖了大多数常见场景,但实际工作中经常会遇到内置组件无法满足需求的情况:比如你要实现论文中提出的一种新的注意力机制,或者针对极度不平衡的数据集设计专属的损失函数。这时候就需要自己动手写自定义层和自定义损失函数。

面试中被问到这个话题,面试官通常想考察的是你对 TensorFlow 底层机制的理解程度,而不是让你背代码。所以下面不光写代码,更重要的是讲清楚每一步为什么这么做。

自定义层

核心机制:__init__、build、call 三件套

自定义层的标准做法是继承 tf.keras.layers.Layer,然后实现三个关键方法:

  • __init__:存放和输入形状无关的配置,比如神经元数量、激活函数名称。这里不要创建权重,因为此时还不知道输入维度。
  • build:在第一次调用时自动触发,此时已经拿到了输入形状 input_shape,可以据此创建权重。用 self.add_weight() 创建的变量会被 TensorFlow 自动追踪,训练时更新、保存时序列化。
  • call:定义前向传播逻辑,也就是输入到输出之间的计算过程。

为什么要把权重创建放在 build 而不是 __init__ 里?因为很多层的权重维度取决于输入——比如全连接层的权重矩阵是 (输入维度, 输出维度),而输入维度只有在真正喂入数据时才能确定。build 方法推迟了权重的创建时机,让层能够自动适配不同维度的输入。

最基本的自定义全连接层

python
import tensorflow as tf from tensorflow.keras import layers class MyDenseLayer(layers.Layer): def __init__(self, units=32, **kwargs): super(MyDenseLayer, self).__init__(**kwargs) self.units = units def build(self, input_shape): self.w = self.add_weight( shape=(input_shape[-1], self.units), initializer='random_normal', trainable=True, name='kernel' ) self.b = self.add_weight( shape=(self.units,), initializer='zeros', trainable=True, name='bias' ) def call(self, inputs): return tf.matmul(inputs, self.w) + self.b def get_config(self): config = super(MyDenseLayer, self).get_config() config.update({'units': self.units}) return config

get_config 不是必须的,但如果你希望模型能被保存为 HDF5 格式并正确加载回来,就必须实现它。它返回一个字典,记录层初始化时需要的参数,from_config 方法会自动根据这个字典重建层实例。

把自定义层放进模型里用:

python
model = tf.keras.Sequential([ MyDenseLayer(units=64, input_shape=(10,)), layers.Activation('relu'), MyDenseLayer(units=10), layers.Activation('softmax') ]) model.compile(optimizer='adam', loss='sparse_categorical_crossentropy') model.fit(x_train, y_train, epochs=10)

和内置层完全一样的用法,不需要额外处理。

带激活函数的层

把激活函数内嵌到层里,省得每次都单独套一个 Activation 层:

python
class DenseWithActivation(layers.Layer): def __init__(self, units=32, activation='relu', **kwargs): super(DenseWithActivation, self).__init__(**kwargs) self.units = units self.activation = tf.keras.activations.get(activation) def build(self, input_shape): self.w = self.add_weight( shape=(input_shape[-1], self.units), initializer='glorot_uniform', trainable=True ) self.b = self.add_weight( shape=(self.units,), initializer='zeros', trainable=True ) def call(self, inputs): output = tf.matmul(inputs, self.w) + self.b return self.activation(output)

tf.keras.activations.get() 是个很方便的函数,传入字符串(如 'relu')或可调用对象都能正常工作,不需要自己写 if-else 判断。

带正则化的层

给权重加上 L2 正则化,训练时会自动把正则项加到总损失里:

python
class RegularizedDense(layers.Layer): def __init__(self, units=32, l2_reg=0.01, **kwargs): super(RegularizedDense, self).__init__(**kwargs) self.units = units self.l2_reg = l2_reg def build(self, input_shape): self.w = self.add_weight( shape=(input_shape[-1], self.units), initializer='glorot_uniform', regularizer=tf.keras.regularizers.l2(self.l2_reg), trainable=True ) self.b = self.add_weight( shape=(self.units,), initializer='zeros', trainable=True ) def call(self, inputs): return tf.matmul(inputs, self.w) + self.b

关键点在 regularizer=tf.keras.regularizers.l2(self.l2_reg) 这一行。设置之后,TensorFlow 在每次前向传播时会自动收集层上的正则化损失(通过 self.losses 属性访问),并在计算总损失时累加上去。你不需要手动把正则项加到损失函数里。

自定义卷积层

卷积层的权重形状是 (kernel_h, kernel_w, input_channels, output_channels),比全连接层稍复杂:

python
class CustomConv2D(layers.Layer): def __init__(self, filters=32, kernel_size=(3, 3), **kwargs): super(CustomConv2D, self).__init__(**kwargs) self.filters = filters self.kernel_size = kernel_size def build(self, input_shape): input_channels = input_shape[-1] kernel_shape = (*self.kernel_size, input_channels, self.filters) self.kernel = self.add_weight( shape=kernel_shape, initializer='glorot_uniform', trainable=True ) self.bias = self.add_weight( shape=(self.filters,), initializer='zeros', trainable=True ) def call(self, inputs): conv = tf.nn.conv2d( inputs, self.kernel, strides=[1, 1, 1, 1], padding='SAME' ) return conv + self.bias

这里用 tf.nn.conv2d 而不是 layers.Conv2D,因为后者本身就是一个完整的层实现,包含了自己内部的权重管理,不适合在自定义层中再套一层。tf.nn.conv2d 是纯计算函数,权重由我们自己管理,这才是自定义层的正确姿势。

自定义注意力层

注意力机制是面试高频考点。下面实现的是一个加性注意力(也叫 Bahdanau 注意力)的简化版:

python
class AttentionLayer(layers.Layer): def __init__(self, units=64, **kwargs): super(AttentionLayer, self).__init__(**kwargs) self.units = units def build(self, input_shape): self.W = self.add_weight( shape=(input_shape[-1], self.units), initializer='glorot_uniform', trainable=True ) self.b = self.add_weight( shape=(self.units,), initializer='zeros', trainable=True ) self.u = self.add_weight( shape=(self.units,), initializer='glorot_uniform', trainable=True ) def call(self, inputs): # uit = tanh(W * input + b),得到隐藏表示 uit = tf.nn.tanh(tf.tensordot(inputs, self.W, axes=1) + self.b) # ait = softmax(uit * u),计算每个时间步的注意力权重 ait = tf.tensordot(uit, self.u, axes=1) ait = tf.nn.softmax(ait, axis=1) # 用注意力权重对输入做加权求和 weighted_input = inputs * tf.expand_dims(ait, -1) output = tf.reduce_sum(weighted_input, axis=1) return output

这段代码的思路是:先把输入映射到一个隐藏空间(通过 W 和 b),再用一个可学习的向量 u 和隐藏表示做点积来打分,分数归一化后就是注意力权重,最后对原始输入做加权求和。这种方式的好处是 u、W、b 都参与训练,能自动学到"哪些位置更值得关注"。

自定义残差块

残差连接的核心思想是让梯度可以直接流过网络,缓解深层网络的梯度消失问题:

python
class ResidualBlock(layers.Layer): def __init__(self, filters=64, **kwargs): super(ResidualBlock, self).__init__(**kwargs) self.filters = filters def build(self, input_shape): self.conv1 = layers.Conv2D( self.filters, (3, 3), padding='same', activation='relu' ) self.conv2 = layers.Conv2D( self.filters, (3, 3), padding='same' ) self.bn1 = layers.BatchNormalization() self.bn2 = layers.BatchNormalization() def call(self, inputs): x = self.bn1(inputs) x = self.conv1(x) x = self.bn2(x) x = self.conv2(x) # 残差连接:输出 = 卷积结果 + 原始输入 output = layers.add([x, inputs]) output = layers.Activation('relu')(output) return output

注意这里的残差连接 layers.add([x, inputs])——之所以能直接相加,是因为卷积用了 padding='same' 且 filter 数量和输入通道数一致,保证维度匹配。如果维度不一致,需要在跳连上加一个 1x1 卷积做投影。

自定义损失函数

函数式写法

最简单的方式就是写一个接受 y_truey_pred 的普通函数:

python
def custom_mse_with_l2(y_true, y_pred): mse = tf.reduce_mean(tf.square(y_true - y_pred)) return mse

注意:损失函数内部必须使用 TensorFlow 的运算(tf.reduce_meantf.square 等),不能用 NumPy。原因有二:一是 TensorFlow 需要构建计算图来做自动求导,NumPy 运算不在图中,梯度无法回传;二是 GPU 上跑的也是 TensorFlow 运算,混用 NumPy 会导致数据在 CPU 和 GPU 之间反复搬运,拖慢训练。

python
model.compile(optimizer='adam', loss=custom_mse_with_l2, metrics=['accuracy']) model.fit(x_train, y_train, epochs=10)

带额外参数的损失函数

有些损失函数需要超参数(比如类别权重、margin 等),但 model.compile(loss=...) 只接受签名为 (y_true, y_pred) 的函数。解决办法是用 functools.partial 固定额外参数:

python
def weighted_binary_crossentropy(y_true, y_pred, weight=1.0): bce = tf.keras.losses.binary_crossentropy(y_true, y_pred) weight_vector = y_true * weight + (1.0 - y_true) weighted_bce = weight_vector * bce return tf.reduce_mean(weighted_bce) from functools import partial loss_fn = partial(weighted_binary_crossentropy, weight=2.0) model.compile(optimizer='adam', loss=loss_fn)

当正样本占比很小(比如欺诈检测中只有 1% 是正样本),就需要给正样本更大的权重,让模型不会倾向于全部预测为负。这里的 weight 就是正样本的权重倍数。

Focal Loss:解决类别不平衡的利器

Focal Loss 来自 2017 年的 RetinaNet 论文,核心思想是降低"容易分类的样本"对损失的贡献,让模型集中注意力在"难分类的样本"上。gamma 参数控制衰减程度——gamma 越大,简单样本的权重被压得越低:

python
def focal_loss(y_true, y_pred, alpha=0.25, gamma=2.0): y_pred = tf.clip_by_value(y_pred, 1e-7, 1.0 - 1e-7) logit = tf.math.log(y_pred / (1 - y_pred)) loss = -alpha * y_true * tf.math.pow(1 - y_pred, gamma) * logit - (1 - alpha) * (1 - y_true) * tf.math.pow(y_pred, gamma) * tf.math.log(1 - y_pred) return tf.reduce_mean(loss)
  • alpha:正负样本的平衡因子,默认 0.25 表示正样本权重略低(因为正样本通常较少)
  • gamma:聚焦参数,论文中推荐 2.0。当 gamma=0 时退化为标准交叉熵

Dice Loss:图像分割的常用损失

Dice 系数衡量两个集合的重叠程度,值域 [0, 1],1 表示完全重叠。Dice Loss = 1 - Dice 系数,在医学图像分割等正负样本极度不平衡的场景中表现优于交叉熵:

python
def dice_loss(y_true, y_pred, smooth=1.0): y_true_f = tf.reshape(y_true, [-1]) y_pred_f = tf.reshape(y_pred, [-1]) intersection = tf.reduce_sum(y_true_f * y_pred_f) union = tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) dice = (2.0 * intersection + smooth) / (union + smooth) return 1 - dice

smooth 是一个很小的数(通常取 1),防止分母为 0。这在预测值和真实值都接近全 0 的情况下尤为重要。

Contrastive Loss:度量学习的基础

对比损失用于训练孪生网络(Siamese Network),目标是让相似样本的距离更近、不相似样本的距离更远。margin 是不相似样本对之间的距离下界——当不相似对的距离已经大于 margin 时,损失为 0,不再优化:

python
def contrastive_loss(y_true, y_pred, margin=1.0): square_pred = tf.square(y_pred) margin_square = tf.square(tf.maximum(margin - y_pred, 0)) return tf.reduce_mean( y_true * square_pred + (1 - y_true) * margin_square )

y_true 为 1 表示两个样本相似,为 0 表示不相似;y_pred 是两个样本的欧氏距离。

Triplet Loss:人脸识别的经典损失

Triplet Loss 同时考虑三个样本:锚点(anchor)、正样本(positive,和锚点同类)、负样本(negative,和锚点不同类)。目标是让锚点和正样本的距离小于锚点和负样本的距离,且差距至少为 margin:

python
def triplet_loss(y_true, y_pred, margin=0.5): anchor = y_pred[:, 0] positive = y_pred[:, 1] negative = y_pred[:, 2] pos_dist = tf.reduce_sum(tf.square(anchor - positive), axis=1) neg_dist = tf.reduce_sum(tf.square(anchor - negative), axis=1) basic_loss = pos_dist - neg_dist + margin loss = tf.reduce_mean(tf.maximum(basic_loss, 0.0)) return loss

如果负样本距离已经比正样本距离大 margin 以上,basic_loss 为负,tf.maximum(..., 0.0) 会把损失截断为 0,不再施加优化压力。

Huber Loss:对异常值更鲁棒

Huber Loss 在误差较小时等价于 MSE(平方损失),误差较大时等价于 MAE(绝对值损失)。delta 是切换阈值——误差小于 delta 时用平方损失(梯度随误差缩小,收敛更精确),误差大于 delta 时用线性损失(梯度恒定,不会被异常值牵着走):

python
def huber_loss(y_true, y_pred, delta=1.0): error = y_true - y_pred abs_error = tf.abs(error) quadratic = tf.minimum(abs_error, delta) linear = abs_error - quadratic loss = 0.5 * tf.square(quadratic) + delta * linear return tf.reduce_mean(loss)

实际上 TensorFlow 已经内置了 tf.keras.losses.Huber,但在面试中手写实现能体现你对损失函数特性的理解。

用类的形式定义损失函数

函数式写法简单直接,但有一个局限:model.compile(loss=...) 只能传 (y_true, y_pred) 两个参数。如果你的损失函数需要额外的配置(比如正则化系数),而且这些配置也要被保存到模型文件中,就应该用类的形式:

python
class CustomLoss(tf.keras.losses.Loss): def __init__(self, regularization_factor=0.1, **kwargs): super(CustomLoss, self).__init__(**kwargs) self.regularization_factor = regularization_factor def call(self, y_true, y_pred): loss = tf.keras.losses.mean_squared_error(y_true, y_pred) regularization = tf.reduce_sum([ tf.reduce_sum(tf.square(w)) for w in self.model.trainable_weights ]) return loss + self.regularization_factor * regularization def get_config(self): base_config = super(CustomLoss, self).get_config() base_config['regularization_factor'] = self.regularization_factor return base_config

和自定义层的套路一样:__init__ 保存配置,call 定义计算逻辑,get_config 支持序列化。self.model 会在损失函数被绑定到模型后自动可用。

自定义评估指标

有些场景下准确率(Accuracy)不够用,比如你可能需要精确率(Precision)、召回率(Recall)或者某个业务自定义的指标。自定义指标继承 tf.keras.metrics.Metric,核心是维护跨 batch 的累积状态:

python
class CustomPrecision(tf.keras.metrics.Metric): def __init__(self, name='custom_precision', **kwargs): super(CustomPrecision, self).__init__(name=name, **kwargs) self.true_positives = self.add_weight(name='tp', initializer='zeros') self.false_positives = self.add_weight(name='fp', initializer='zeros') def update_state(self, y_true, y_pred, sample_weight=None): y_pred_labels = tf.argmax(y_pred, axis=1) y_true = tf.cast(y_true, tf.int64) tp = tf.reduce_sum( tf.cast(tf.logical_and(y_true == y_pred_labels, y_pred_labels == 1), tf.float32) ) fp = tf.reduce_sum( tf.cast(tf.logical_and(y_true != y_pred_labels, y_pred_labels == 1), tf.float32) ) self.true_positives.assign_add(tp) self.false_positives.assign_add(fp) def result(self): return self.true_positives / (self.true_positives + self.false_positives + 1e-7) def reset_states(self): self.true_positives.assign(0.0) self.false_positives.assign(0.0)

update_state 在每个 batch 调用,累积 TP 和 FP;result 返回当前的精确率;reset_states 在每个 epoch 开始时清零。这样就能跨 batch 正确计算指标,而不是每个 batch 独立算再取平均。

python
model.compile( optimizer='adam', loss='sparse_categorical_crossentropy', metrics=[CustomPrecision()] )

完整实战示例

把自定义层、自定义损失和自定义指标组合在一起,构建一个完整可训练的模型:

python
import tensorflow as tf from tensorflow.keras import layers, models class MyCustomLayer(layers.Layer): def __init__(self, units=64, **kwargs): super(MyCustomLayer, self).__init__(**kwargs) self.units = units def build(self, input_shape): self.w = self.add_weight( shape=(input_shape[-1], self.units), initializer='glorot_uniform', trainable=True ) self.b = self.add_weight( shape=(self.units,), initializer='zeros', trainable=True ) def call(self, inputs): return tf.matmul(inputs, self.w) + self.b def my_custom_loss(y_true, y_pred): mse = tf.keras.losses.mean_squared_error(y_true, y_pred) return mse model = models.Sequential([ MyCustomLayer(units=128, input_shape=(10,)), layers.Activation('relu'), layers.Dropout(0.5), MyCustomLayer(units=64), layers.Activation('relu'), MyCustomLayer(units=1) ]) model.compile( optimizer='adam', loss=my_custom_loss, metrics=['mae'] ) model.fit(x_train, y_train, epochs=10, validation_split=0.2)

实践中容易踩的坑

  1. 权重创建位置搞错:在 __init__ 里用 tf.Variable 创建权重,虽然也能跑,但绕过了 TensorFlow 的权重追踪机制,保存模型时容易出问题。正确做法是 build 中用 self.add_weight()

  2. 损失函数里用了 NumPynp.mean()np.square() 这些在 Eager Mode 下看似正常,但一旦开启图模式(@tf.function)或部署到生产环境就会报错,必须全部替换为 tf.reduce_mean()tf.square() 等。

  3. 忘记实现 get_config:如果你的自定义层或损失不实现 get_config,用 model.save() 保存后 tf.keras.models.load_model() 会加载失败。调试这种问题非常耗时。

  4. 残差连接维度不匹配:当卷积的 filter 数量不等于输入通道数,或者用了 stride > 1 的卷积时,残差 x + inputs 会因为形状不同而报错。解决办法是在跳连上加一个 1x1 卷积做维度对齐。

  5. 自定义指标在多 GPU 下状态不同步add_weight 创建的变量默认不会跨 GPU 同步。分布式训练时需要使用 tf.keras.metrics.Metric 的内置同步机制,或者显式指定同步策略。

掌握自定义层和损失函数的实现,是从"会调 API"到"能根据需求定制模型"的关键一步。面试中能把 build/call 的设计意图、损失函数必须用 tf 运算的原因、以及序列化的注意事项讲清楚,基本就能拿高分。

标签:Tensorflow