qtrack/scripts/libs/models.py

517 lines
20 KiB
Python
Raw Permalink Normal View History

2021-11-23 15:07:32 +04:00
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from libs.region_loss import RegionLoss
from libs.yolo_layer import YoloLayer
from libs.config import *
from libs.torch_utils import *
class Mish(torch.nn.Module):
def __init__(self):
super().__init__()
def forward(self, x):
x = x * (torch.tanh(F.softplus(x)))
return x
class MaxPoolDark(nn.Module):
def __init__(self, size=2, stride=1):
super(MaxPoolDark, self).__init__()
self.size = size
self.stride = stride
def forward(self, x):
'''
darknet output_size = (input_size + p - k) / s +1
p : padding = k - 1
k : size
s : stride
torch output_size = (input_size + 2*p -k) / s +1
p : padding = k//2
'''
p = self.size // 2
if ((x.shape[2] - 1) // self.stride) != ((x.shape[2] + 2 * p - self.size) // self.stride):
padding1 = (self.size - 1) // 2
padding2 = padding1 + 1
else:
padding1 = (self.size - 1) // 2
padding2 = padding1
if ((x.shape[3] - 1) // self.stride) != ((x.shape[3] + 2 * p - self.size) // self.stride):
padding3 = (self.size - 1) // 2
padding4 = padding3 + 1
else:
padding3 = (self.size - 1) // 2
padding4 = padding3
x = F.max_pool2d(F.pad(x, (padding3, padding4, padding1, padding2), mode='replicate'),
self.size, stride=self.stride)
return x
class Upsample_expand(nn.Module):
def __init__(self, stride=2):
super(Upsample_expand, self).__init__()
self.stride = stride
def forward(self, x):
assert (x.data.dim() == 4)
x = x.view(x.size(0), x.size(1), x.size(2), 1, x.size(3), 1).\
expand(x.size(0), x.size(1), x.size(2), self.stride, x.size(3), self.stride).contiguous().\
view(x.size(0), x.size(1), x.size(2) * self.stride, x.size(3) * self.stride)
return x
class Upsample_interpolate(nn.Module):
def __init__(self, stride):
super(Upsample_interpolate, self).__init__()
self.stride = stride
def forward(self, x):
assert (x.data.dim() == 4)
out = F.interpolate(x, size=(x.size(2) * self.stride, x.size(3) * self.stride), mode='nearest')
return out
class Reorg(nn.Module):
def __init__(self, stride=2):
super(Reorg, self).__init__()
self.stride = stride
def forward(self, x):
stride = self.stride
assert (x.data.dim() == 4)
B = x.data.size(0)
C = x.data.size(1)
H = x.data.size(2)
W = x.data.size(3)
assert (H % stride == 0)
assert (W % stride == 0)
ws = stride
hs = stride
x = x.view(B, C, H / hs, hs, W / ws, ws).transpose(3, 4).contiguous()
x = x.view(B, C, H / hs * W / ws, hs * ws).transpose(2, 3).contiguous()
x = x.view(B, C, hs * ws, H / hs, W / ws).transpose(1, 2).contiguous()
x = x.view(B, hs * ws * C, H / hs, W / ws)
return x
class GlobalAvgPool2d(nn.Module):
def __init__(self):
super(GlobalAvgPool2d, self).__init__()
def forward(self, x):
N = x.data.size(0)
C = x.data.size(1)
H = x.data.size(2)
W = x.data.size(3)
x = F.avg_pool2d(x, (H, W))
x = x.view(N, C)
return x
# for route and shortcut
class EmptyModule(nn.Module):
def __init__(self):
super(EmptyModule, self).__init__()
def forward(self, x):
return x
# support route shortcut and reorg
class Darknet(nn.Module):
def __init__(self, cfgfile, inference=False):
super(Darknet, self).__init__()
self.inference = inference
self.training = not self.inference
self.blocks = parse_cfg(cfgfile)
self.width = int(self.blocks[0]['width'])
self.height = int(self.blocks[0]['height'])
self.models = self.create_network(self.blocks) # merge conv, bn,leaky
self.loss = self.models[len(self.models) - 1]
if self.blocks[(len(self.blocks) - 1)]['type'] == 'region':
self.anchors = self.loss.anchors
self.num_anchors = self.loss.num_anchors
self.anchor_step = self.loss.anchor_step
self.num_classes = self.loss.num_classes
self.header = torch.IntTensor([0, 0, 0, 0])
self.seen = 0
def forward(self, x):
ind = -2
self.loss = None
outputs = dict()
out_boxes = []
for block in self.blocks:
ind = ind + 1
# if ind > 0:
# return x
if block['type'] == 'net':
continue
elif block['type'] in ['convolutional', 'maxpool', 'reorg', 'upsample', 'avgpool', 'softmax', 'connected']:
x = self.models[ind](x)
outputs[ind] = x
elif block['type'] == 'route':
layers = block['layers'].split(',')
layers = [int(i) if int(i) > 0 else int(i) + ind for i in layers]
if len(layers) == 1:
if 'groups' not in block.keys() or int(block['groups']) == 1:
x = outputs[layers[0]]
outputs[ind] = x
else:
groups = int(block['groups'])
group_id = int(block['group_id'])
_, b, _, _ = outputs[layers[0]].shape
x = outputs[layers[0]][:, b // groups * group_id:b // groups * (group_id + 1)]
outputs[ind] = x
elif len(layers) == 2:
x1 = outputs[layers[0]]
x2 = outputs[layers[1]]
x = torch.cat((x1, x2), 1)
outputs[ind] = x
elif len(layers) == 4:
x1 = outputs[layers[0]]
x2 = outputs[layers[1]]
x3 = outputs[layers[2]]
x4 = outputs[layers[3]]
x = torch.cat((x1, x2, x3, x4), 1)
outputs[ind] = x
else:
print("rounte number > 2 ,is {}".format(len(layers)))
elif block['type'] == 'shortcut':
from_layer = int(block['from'])
activation = block['activation']
from_layer = from_layer if from_layer > 0 else from_layer + ind
x1 = outputs[from_layer]
x2 = outputs[ind - 1]
x = x1 + x2
if activation == 'leaky':
x = F.leaky_relu(x, 0.1, inplace=True)
elif activation == 'relu':
x = F.relu(x, inplace=True)
outputs[ind] = x
elif block['type'] == 'region':
continue
if self.loss:
self.loss = self.loss + self.models[ind](x)
else:
self.loss = self.models[ind](x)
outputs[ind] = None
elif block['type'] == 'yolo':
# if self.training:
# pass
# else:
# boxes = self.models[ind](x)
# out_boxes.append(boxes)
boxes = self.models[ind](x)
out_boxes.append(boxes)
elif block['type'] == 'cost':
continue
else:
print('unknown type %s' % (block['type']))
if self.training:
return out_boxes
else:
return get_region_boxes(out_boxes)
def print_network(self):
print_cfg(self.blocks)
def create_network(self, blocks):
models = nn.ModuleList()
prev_filters = 3
out_filters = []
prev_stride = 1
out_strides = []
conv_id = 0
for block in blocks:
if block['type'] == 'net':
prev_filters = int(block['channels'])
continue
elif block['type'] == 'convolutional':
conv_id = conv_id + 1
batch_normalize = int(block['batch_normalize'])
filters = int(block['filters'])
kernel_size = int(block['size'])
stride = int(block['stride'])
is_pad = int(block['pad'])
pad = (kernel_size - 1) // 2 if is_pad else 0
activation = block['activation']
model = nn.Sequential()
if batch_normalize:
model.add_module('conv{0}'.format(conv_id),
nn.Conv2d(prev_filters, filters, kernel_size, stride, pad, bias=False))
model.add_module('bn{0}'.format(conv_id), nn.BatchNorm2d(filters))
# model.add_module('bn{0}'.format(conv_id), BN2d(filters))
else:
model.add_module('conv{0}'.format(conv_id),
nn.Conv2d(prev_filters, filters, kernel_size, stride, pad))
if activation == 'leaky':
model.add_module('leaky{0}'.format(conv_id), nn.LeakyReLU(0.1, inplace=True))
elif activation == 'relu':
model.add_module('relu{0}'.format(conv_id), nn.ReLU(inplace=True))
elif activation == 'mish':
model.add_module('mish{0}'.format(conv_id), Mish())
else:
print("convalution havn't activate {}".format(activation))
prev_filters = filters
out_filters.append(prev_filters)
prev_stride = stride * prev_stride
out_strides.append(prev_stride)
models.append(model)
elif block['type'] == 'maxpool':
pool_size = int(block['size'])
stride = int(block['stride'])
if stride == 1 and pool_size % 2:
# You can use Maxpooldark instead, here is convenient to convert onnx.
# Example: [maxpool] size=3 stride=1
model = nn.MaxPool2d(kernel_size=pool_size, stride=stride, padding=pool_size // 2)
elif stride == pool_size:
# You can use Maxpooldark instead, here is convenient to convert onnx.
# Example: [maxpool] size=2 stride=2
model = nn.MaxPool2d(kernel_size=pool_size, stride=stride, padding=0)
else:
model = MaxPoolDark(pool_size, stride)
out_filters.append(prev_filters)
prev_stride = stride * prev_stride
out_strides.append(prev_stride)
models.append(model)
elif block['type'] == 'avgpool':
model = GlobalAvgPool2d()
out_filters.append(prev_filters)
models.append(model)
elif block['type'] == 'softmax':
model = nn.Softmax()
out_strides.append(prev_stride)
out_filters.append(prev_filters)
models.append(model)
elif block['type'] == 'cost':
if block['_type'] == 'sse':
model = nn.MSELoss(reduction='mean')
elif block['_type'] == 'L1':
model = nn.L1Loss(reduction='mean')
elif block['_type'] == 'smooth':
model = nn.SmoothL1Loss(reduction='mean')
out_filters.append(1)
out_strides.append(prev_stride)
models.append(model)
elif block['type'] == 'reorg':
stride = int(block['stride'])
prev_filters = stride * stride * prev_filters
out_filters.append(prev_filters)
prev_stride = prev_stride * stride
out_strides.append(prev_stride)
models.append(Reorg(stride))
elif block['type'] == 'upsample':
stride = int(block['stride'])
out_filters.append(prev_filters)
prev_stride = prev_stride // stride
out_strides.append(prev_stride)
models.append(Upsample_expand(stride))
# models.append(Upsample_interpolate(stride))
elif block['type'] == 'route':
layers = block['layers'].split(',')
ind = len(models)
layers = [int(i) if int(i) > 0 else int(i) + ind for i in layers]
if len(layers) == 1:
if 'groups' not in block.keys() or int(block['groups']) == 1:
prev_filters = out_filters[layers[0]]
prev_stride = out_strides[layers[0]]
else:
prev_filters = out_filters[layers[0]] // int(block['groups'])
prev_stride = out_strides[layers[0]] // int(block['groups'])
elif len(layers) == 2:
assert (layers[0] == ind - 1 or layers[1] == ind - 1)
prev_filters = out_filters[layers[0]] + out_filters[layers[1]]
prev_stride = out_strides[layers[0]]
elif len(layers) == 4:
assert (layers[0] == ind - 1)
prev_filters = out_filters[layers[0]] + out_filters[layers[1]] + out_filters[layers[2]] + \
out_filters[layers[3]]
prev_stride = out_strides[layers[0]]
else:
print("route error!!!")
out_filters.append(prev_filters)
out_strides.append(prev_stride)
models.append(EmptyModule())
elif block['type'] == 'shortcut':
ind = len(models)
prev_filters = out_filters[ind - 1]
out_filters.append(prev_filters)
prev_stride = out_strides[ind - 1]
out_strides.append(prev_stride)
models.append(EmptyModule())
elif block['type'] == 'connected':
filters = int(block['output'])
if block['activation'] == 'linear':
model = nn.Linear(prev_filters, filters)
elif block['activation'] == 'leaky':
model = nn.Sequential(
nn.Linear(prev_filters, filters),
nn.LeakyReLU(0.1, inplace=True))
elif block['activation'] == 'relu':
model = nn.Sequential(
nn.Linear(prev_filters, filters),
nn.ReLU(inplace=True))
prev_filters = filters
out_filters.append(prev_filters)
out_strides.append(prev_stride)
models.append(model)
elif block['type'] == 'region':
loss = RegionLoss()
anchors = block['anchors'].split(',')
loss.anchors = [float(i) for i in anchors]
loss.num_classes = int(block['classes'])
loss.num_anchors = int(block['num'])
loss.anchor_step = len(loss.anchors) // loss.num_anchors
loss.object_scale = float(block['object_scale'])
loss.noobject_scale = float(block['noobject_scale'])
loss.class_scale = float(block['class_scale'])
loss.coord_scale = float(block['coord_scale'])
out_filters.append(prev_filters)
out_strides.append(prev_stride)
models.append(loss)
elif block['type'] == 'yolo':
yolo_layer = YoloLayer()
anchors = block['anchors'].split(',')
anchor_mask = block['mask'].split(',')
yolo_layer.anchor_mask = [int(i) for i in anchor_mask]
yolo_layer.anchors = [float(i) for i in anchors]
yolo_layer.num_classes = int(block['classes'])
self.num_classes = yolo_layer.num_classes
yolo_layer.num_anchors = int(block['num'])
yolo_layer.anchor_step = len(yolo_layer.anchors) // yolo_layer.num_anchors
yolo_layer.stride = prev_stride
yolo_layer.scale_x_y = float(block['scale_x_y'])
# yolo_layer.object_scale = float(block['object_scale'])
# yolo_layer.noobject_scale = float(block['noobject_scale'])
# yolo_layer.class_scale = float(block['class_scale'])
# yolo_layer.coord_scale = float(block['coord_scale'])
out_filters.append(prev_filters)
out_strides.append(prev_stride)
models.append(yolo_layer)
else:
print('unknown type %s' % (block['type']))
return models
def load_weights(self, weightfile):
fp = open(weightfile, 'rb')
header = np.fromfile(fp, count=5, dtype=np.int32)
self.header = torch.from_numpy(header)
self.seen = self.header[3]
buf = np.fromfile(fp, dtype=np.float32)
fp.close()
start = 0
ind = -2
for block in self.blocks:
if start >= buf.size:
break
ind = ind + 1
if block['type'] == 'net':
continue
elif block['type'] == 'convolutional':
model = self.models[ind]
batch_normalize = int(block['batch_normalize'])
if batch_normalize:
start = load_conv_bn(buf, start, model[0], model[1])
else:
start = load_conv(buf, start, model[0])
elif block['type'] == 'connected':
model = self.models[ind]
if block['activation'] != 'linear':
start = load_fc(buf, start, model[0])
else:
start = load_fc(buf, start, model)
elif block['type'] == 'maxpool':
pass
elif block['type'] == 'reorg':
pass
elif block['type'] == 'upsample':
pass
elif block['type'] == 'route':
pass
elif block['type'] == 'shortcut':
pass
elif block['type'] == 'region':
pass
elif block['type'] == 'yolo':
pass
elif block['type'] == 'avgpool':
pass
elif block['type'] == 'softmax':
pass
elif block['type'] == 'cost':
pass
else:
print('unknown type %s' % (block['type']))
# def save_weights(self, outfile, cutoff=0):
# if cutoff <= 0:
# cutoff = len(self.blocks) - 1
#
# fp = open(outfile, 'wb')
# self.header[3] = self.seen
# header = self.header
# header.numpy().tofile(fp)
#
# ind = -1
# for blockId in range(1, cutoff + 1):
# ind = ind + 1
# block = self.blocks[blockId]
# if block['type'] == 'convolutional':
# model = self.models[ind]
# batch_normalize = int(block['batch_normalize'])
# if batch_normalize:
# save_conv_bn(fp, model[0], model[1])
# else:
# save_conv(fp, model[0])
# elif block['type'] == 'connected':
# model = self.models[ind]
# if block['activation'] != 'linear':
# save_fc(fc, model)
# else:
# save_fc(fc, model[0])
# elif block['type'] == 'maxpool':
# pass
# elif block['type'] == 'reorg':
# pass
# elif block['type'] == 'upsample':
# pass
# elif block['type'] == 'route':
# pass
# elif block['type'] == 'shortcut':
# pass
# elif block['type'] == 'region':
# pass
# elif block['type'] == 'yolo':
# pass
# elif block['type'] == 'avgpool':
# pass
# elif block['type'] == 'softmax':
# pass
# elif block['type'] == 'cost':
# pass
# else:
# print('unknown type %s' % (block['type']))
# fp.close()