任务服务的一次优化

Updated on with 0 views and 0 comments

最近在开发任务服务,其中有一个任务上报模块。其业务逻辑如下:
1.上报子任务详情,更新数据库
2.根据子任务状态同步总任务状态,实时更新任务进度。

最简单的实现方式就是UpdateSubTask,UpdateTask。
显然,这种方式并发情况下会有问题。

优化一
1.对子任务id进行分布锁,这里记得上锁时要添加超时时间,不然死锁就完犊子了。

func (impl *RedisCache) Lock(ctx context.Context, key string, expiration time.Duration) error {
	ok, err := impl.r.SetNX(impl.getKey(key), 1, expiration).Result()
	if err != nil {
		return err
	}
	if !ok {
		return errors.New("redis lock: already locked")
	}
	return nil
}

func (impl *RedisCache) Unlock(ctx context.Context, key string) error {
	return impl.r.Del(impl.getKey(key)).Err()
}

2.由于任务与子任务是一对多的关系,而上报是以子任务粒度进行的操作的,对于任务状态还是有并发问题的。
一开始想法是直接将任务统计情况放在redis中,但这样做有几点情况:
(1)redis的key不可以设置过期时间,会越来越多
(2)重试后会状态统计不容易操作
(3)任务统计会影响上报性能。
所以对于这点使用的方式是将所以要更新的数据放到redis的list中,并通过redis的阻塞队列来保证任务状态。

func (impl *SchedulerRedisImpl) Consumer(ctx context.Context, key string, p repository.TaskConsumerCallback) {
	for {
		val := impl.r.BLPop(0, key).Val()[1]
		err := p.Process(ctx, val)
		if err != nil {
			log.ErrorCtx(ctx, "[Consumer] err"+err.Error())
		}
	}
}

优化二
任务是放到队列中处理了,但是量比较大,所以数据库操作比较多,同一批子任务的任务id相同,这里能想到就是合并操作,由于是放到redis的list中,阻塞获取到后可以使用redis的LRem命令删除队列中相同元素:

func (impl *TaskUpdateStatusQueueImpl) Process(ctx context.Context, val string) error {
	// 移除队列中相同元素前100个,以减少数据库操作
	count, err := impl.repository.RedisConsumer.RemList(ctx, entity.TaskUpdateStatusConsumer, val, 100)
	if err != nil {
		return err
	}
	log.DebugfCtx(ctx, "队列移除相同元素%s:%d个", val, count)
	return impl.UpdateTaskStatus(ctx, val)
}

最后
其实在进行第二次优化的时候不知道Lrem这个命令,最先想到的时批量获取list中元素,然后通过索引位置进行删除,然后再通过管道命令实现,然后看了看文档发现Lrem命令可能更加符合,遂用之。
代码还有优化空间,继续发现。

最后的最后
不要造轮子,文档多看几遍可能会有意想不到结果


标题:任务服务的一次优化
作者:xiongxiaochao
地址:http://solo.fancydigital.com.cn/articles/2021/06/08/1623137293246.html