最近在开发任务服务,其中有一个任务上报模块。其业务逻辑如下:
1.上报子任务详情,更新数据库
2.根据子任务状态同步总任务状态,实时更新任务进度。
最简单的实现方式就是UpdateSubTask,UpdateTask。
显然,这种方式并发情况下会有问题。
优化一
1.对子任务id进行分布锁,这里记得上锁时要添加超时时间,不然死锁就完犊子了。
func (impl *RedisCache) Lock(ctx context.Context, key string, expiration time.Duration) error {
ok, err := impl.r.SetNX(impl.getKey(key), 1, expiration).Result()
if err != nil {
return err
}
if !ok {
return errors.New("redis lock: already locked")
}
return nil
}
func (impl *RedisCache) Unlock(ctx context.Context, key string) error {
return impl.r.Del(impl.getKey(key)).Err()
}
2.由于任务与子任务是一对多的关系,而上报是以子任务粒度进行的操作的,对于任务状态还是有并发问题的。
一开始想法是直接将任务统计情况放在redis中,但这样做有几点情况:
(1)redis的key不可以设置过期时间,会越来越多
(2)重试后会状态统计不容易操作
(3)任务统计会影响上报性能。
所以对于这点使用的方式是将所以要更新的数据放到redis的list中,并通过redis的阻塞队列来保证任务状态。
func (impl *SchedulerRedisImpl) Consumer(ctx context.Context, key string, p repository.TaskConsumerCallback) {
for {
val := impl.r.BLPop(0, key).Val()[1]
err := p.Process(ctx, val)
if err != nil {
log.ErrorCtx(ctx, "[Consumer] err"+err.Error())
}
}
}
优化二
任务是放到队列中处理了,但是量比较大,所以数据库操作比较多,同一批子任务的任务id相同,这里能想到就是合并操作,由于是放到redis的list中,阻塞获取到后可以使用redis的LRem命令删除队列中相同元素:
func (impl *TaskUpdateStatusQueueImpl) Process(ctx context.Context, val string) error {
// 移除队列中相同元素前100个,以减少数据库操作
count, err := impl.repository.RedisConsumer.RemList(ctx, entity.TaskUpdateStatusConsumer, val, 100)
if err != nil {
return err
}
log.DebugfCtx(ctx, "队列移除相同元素%s:%d个", val, count)
return impl.UpdateTaskStatus(ctx, val)
}
最后
其实在进行第二次优化的时候不知道Lrem这个命令,最先想到的时批量获取list中元素,然后通过索引位置进行删除,然后再通过管道命令实现,然后看了看文档发现Lrem命令可能更加符合,遂用之。
代码还有优化空间,继续发现。
最后的最后
不要造轮子,文档多看几遍可能会有意想不到结果