扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
这期内容当中小编将会给大家带来有关基于环状队列和迭代器如何实现分布式任务RR分配策略,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
网站建设哪家好,找成都创新互联!专注于网页设计、网站建设、微信开发、重庆小程序开发、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了根河免费建站欢迎大家使用!
# 背景
## 分布式任务分配
在很多运维场景下,我们都会执行一些长时间的任务,比如装机、部署环境、打包镜像等长时间任务, 而通常我们的任务节点数量通常是有限的(排除基于k8s的hpa、或者knative等自动伸缩场景)。
那么当我们有一个任务如何根据当前的worker和corrdinator和任务来进行合理的分配,分配其实也比较复杂,往复杂里面做,可以根据当前系统的负载、每个任务的执行资源消耗、当前集群的任务数量等, 这里我们就搞一个最简单的,基于任务和当前worker的RR算法
## 系统架构
在worker和任务队列之间,添加一层协调调度层Coordinator, 由它来根据当前集群任务的状态来进行任务的分配,同时感知当前集群worker和task的状态,协调整个集群任务的执行、终止等操作
# 单机实现
## 整体设计
members: 表示当前集群中所有的worker
tasks: 就是当前的任务
Coordinator: 就是我们的协调者, 负责根据members和tasks进行任务的分配
result: 就是分配的结果
## CircularIterator
CircularIterator就是我们的环状对立迭代器, 拥有两个方法, 一个是add添加member, 一个Next返回基于rr的下一个member
```go
// CircularIterator 环状迭代器
type CircularIterator struct {
list []interface{} // 保存所有的成员变量
next int
}
// Next 返回下一个元素
func (c *CircularIterator) Next() interface{} {
item := c.list[c.next]
c.next = (c.next + 1) % len(c.list)
return item
}
// Add 添加任务
func (c *CircularIterator) Add(v interface{}) bool {
for _, item := range c.list {
if v == item {
return false
}
}
c.list = append(c.list, v)
return true
}
```
## Member&Task
Member就是负责执行任务的worker, 有一个AddTask方法和Execute方法负责任务的执行和添加任务
Task标识一个任务
```go
// Member 任务组成员
type Member struct {
id int
tasks []*Task
}
// ID 返回当前memberID
func (m *Member) ID() int {
return m.id
}
// AddTask 为member添加任务
func (m *Member) AddTask(t *Task) bool {
for _, task := range m.tasks {
if task == t {
return false
}
}
m.tasks = append(m.tasks, t)
return true
}
// Execute 执行任务
func (m *Member) Execute() {
for _, task := range m.tasks {
fmt.Printf("Member %d run task %s\n", m.ID(), task.Execute())
}
}
// Task 任务
type Task struct {
name string
}
// Execute 执行task返回结果
func (t *Task) Execute() string {
return "Task " + t.name + " run success"
}
```
## Coordinator
Coordinator是协调器,负责根据 Member和task进行集群任务的协调调度
```go
// Task 任务
type Task struct {
name string
}
// Execute 执行task返回结果
func (t *Task) Execute() string {
return "Task " + t.name + " run success"
}
// Coordinator 协调者
type Coordinator struct {
members []*Member
tasks []*Task
}
// TaskAssignments 为member分配任务
func (c *Coordinator) TaskAssignments() map[int]*Member {
taskAssignments := make(map[int]*Member)
// 构建迭代器
memberIt := c.getMemberIterator()
for _, task := range c.tasks {
member := memberIt.Next().(*Member)
_, err := taskAssignments[member.ID()]
if err == false {
taskAssignments[member.ID()] = member
}
member.AddTask(task)
}
return taskAssignments
}
func (c *Coordinator) getMemberIterator() *CircularIterator {
// 通过当前成员, 构造成员队列
members := make([]interface{}, len(c.members))
for index, member := range c.members {
members[index] = member
}
return NewCircularIterftor(members)
}
// AddMember 添加member组成员
func (c *Coordinator) AddMember(m *Member) bool {
for _, member := range c.members {
if member == m {
return false
}
}
c.members = append(c.members, m)
return true
}
// AddTask 添加任务
func (c *Coordinator) AddTask(t *Task) bool {
for _, task := range c.tasks {
if task == t {
return false
}
}
c.tasks = append(c.tasks, t)
return true
}
```
## 测试
我们首先创建一堆member和task, 然后调用coordinator进行任务分配,执行任务结果
```go
coordinator := NewCoordinator()
for i := 0; i < 10; i++ {
m := &Member{id: i}
coordinator.AddMember(m)
}
for i := 0; i < 30; i++ {
t := &Task{name: fmt.Sprintf("task %d", i)}
coordinator.AddTask(t)
}
result := coordinator.TaskAssignments()
for _, member := range result {
member.Execute()
}
```
## 结果
可以看到每个worker均匀的得到任务分配
```bash
Member 6 run task Task task 6 run success
Member 6 run task Task task 16 run success
Member 6 run task Task task 26 run success
Member 8 run task Task task 8 run success
Member 8 run task Task task 18 run success
Member 8 run task Task task 28 run success
Member 0 run task Task task 0 run success
Member 0 run task Task task 10 run success
Member 0 run task Task task 20 run success
Member 3 run task Task task 3 run success
Member 3 run task Task task 13 run success
Member 3 run task Task task 23 run success
Member 4 run task Task task 4 run success
Member 4 run task Task task 14 run success
Member 4 run task Task task 24 run success
Member 7 run task Task task 7 run success
Member 7 run task Task task 17 run success
Member 7 run task Task task 27 run success
Member 9 run task Task task 9 run success
Member 9 run task Task task 19 run success
Member 9 run task Task task 29 run success
Member 1 run task Task task 1 run success
Member 1 run task Task task 11 run success
Member 1 run task Task task 21 run success
Member 2 run task Task task 2 run success
Member 2 run task Task task 12 run success
Member 2 run task Task task 22 run success
Member 5 run task Task task 5 run success
Member 5 run task Task task 15 run success
Member 5 run task Task task 25 run success
```
## 完整代码
```go
package main
import "fmt"
// CircularIterator 环状迭代器
type CircularIterator struct {
list []interface{}
next int
}
// Next 返回下一个元素
func (c *CircularIterator) Next() interface{} {
item := c.list[c.next]
c.next = (c.next + 1) % len(c.list)
return item
}
// Add 添加任务
func (c *CircularIterator) Add(v interface{}) bool {
for _, item := range c.list {
if v == item {
return false
}
}
c.list = append(c.list, v)
return true
}
// Member 任务组成员
type Member struct {
id int
tasks []*Task
}
// ID 返回当前memberID
func (m *Member) ID() int {
return m.id
}
// AddTask 为member添加任务
func (m *Member) AddTask(t *Task) bool {
for _, task := range m.tasks {
if task == t {
return false
}
}
m.tasks = append(m.tasks, t)
return true
}
// Execute 执行任务
func (m *Member) Execute() {
for _, task := range m.tasks {
fmt.Printf("Member %d run task %s\n", m.ID(), task.Execute())
}
}
// Task 任务
type Task struct {
name string
}
// Execute 执行task返回结果
func (t *Task) Execute() string {
return "Task " + t.name + " run success"
}
// Coordinator 协调者
type Coordinator struct {
members []*Member
tasks []*Task
}
// TaskAssignments 为member分配任务
func (c *Coordinator) TaskAssignments() map[int]*Member {
taskAssignments := make(map[int]*Member)
// 构建迭代器
memberIt := c.getMemberIterator()
for _, task := range c.tasks {
member := memberIt.Next().(*Member)
_, err := taskAssignments[member.ID()]
if err == false {
taskAssignments[member.ID()] = member
}
member.AddTask(task)
}
return taskAssignments
}
func (c *Coordinator) getMemberIterator() *CircularIterator {
// 通过当前成员, 构造成员队列
members := make([]interface{}, len(c.members))
for index, member := range c.members {
members[index] = member
}
return NewCircularIterftor(members)
}
// AddMember 添加member组成员
func (c *Coordinator) AddMember(m *Member) bool {
for _, member := range c.members {
if member == m {
return false
}
}
c.members = append(c.members, m)
return true
}
// AddTask 添加任务
func (c *Coordinator) AddTask(t *Task) bool {
for _, task := range c.tasks {
if task == t {
return false
}
}
c.tasks = append(c.tasks, t)
return true
}
// NewCircularIterftor 返回迭代器
func NewCircularIterftor(list []interface{}) *CircularIterator {
iterator := CircularIterator{}
for _, item := range list {
iterator.Add(item)
}
return &iterator
}
// NewCoordinator 返回协调器
func NewCoordinator() *Coordinator {
c := Coordinator{}
return &c
}
func main() {
coordinator := NewCoordinator()
for i := 0; i < 10; i++ {
m := &Member{id: i}
coordinator.AddMember(m)
}
for i := 0; i < 30; i++ {
t := &Task{name: fmt.Sprintf("task %d", i)}
coordinator.AddTask(t)
}
result := coordinator.TaskAssignments()
for _, member := range result {
member.Execute()
}
}
```
任务协调是一个非常复杂的事情, 内部的任务平台,虽然实现了基于任务的组合和app化,但是任务调度分配着一块,仍然没有去做,只是简单的根据树形任务去简单的做一些分支任务的执行,未来有时间再做吧,要继续研究下一个模块了。
上述就是小编为大家分享的基于环状队列和迭代器如何实现分布式任务RR分配策略了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注创新互联行业资讯频道。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流