简单的分配算法

先有一个需求,有一组Task,每个task有一个flow表示要处理的流量,要求分配在n个线程中,保证每个线程处理的流量数均衡。

思路是对每个线程依据已分配的task数打分,每次分配给分数最低的线程。打分算法如下:

$$Scoce_i=weight*\frac{I_i}{I_{total}}+(1-weight)*\frac{N_i}{N_{total}}$$

  • ${I_i}$表示 i 线程上已经分配的 flow和; ${I_{total}}$表示所有已经分配的 flow和;
  • ${N_i}$表示 i 线程上已经分配的task数; ${N_{total}}$表示所有已经分配的task数;

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import com.google.common.collect.Maps;
import java.util.*;

public class Test {
static class TaskInfo {
int id;
long flow;

public TaskInfo(int id, long flow) {
this.id = id;
this.flow = flow;
}
}

public static void main(String[] args) {
TaskInfo taskInfo0 = new TaskInfo(0, 1);
TaskInfo taskInfo1 = new TaskInfo(1, 5);
TaskInfo taskInfo2 = new TaskInfo(2, 10);
TaskInfo taskInfo3 = new TaskInfo(3, 20);
TaskInfo taskInfo4 = new TaskInfo(4, 5);

List<TaskInfo> tasks = new ArrayList<>();
tasks.add(taskInfo0);
tasks.add(taskInfo1);
tasks.add(taskInfo2);
tasks.add(taskInfo3);
tasks.add(taskInfo4);

assign(3, tasks);
}

public static void assign(int workers, List<TaskInfo> taskInfoList) {
// 从大到小排序
Collections.sort(taskInfoList, ((o1, o2) -> (int) (o2.flow - o1.flow)));

Map<Integer, List<TaskInfo>> assignedMap = new HashMap<>(workers);
for (int i = 0; i < workers; i++) {
assignedMap.put(i, new ArrayList<>());
}

for (TaskInfo taskInfo : taskInfoList) {
int id = getAssignId(assignedMap);

List<TaskInfo> assignShardList = assignedMap.get(id);
assignShardList.add(taskInfo);
}

assignedMap.forEach((id, list) -> {
System.out.print(id + ": ");
list.forEach(t -> System.out.print(t.flow + " "));
System.out.println();
});
}

private static int getAssignId(Map<Integer, List<TaskInfo>> assignedMap) {
Map<Integer, Double> scoreMap = getScoreMap(assignedMap);

int assignId = 0;
double minScore = Double.MAX_VALUE;
for (int id : scoreMap.keySet()) {
double currentScore = scoreMap.get(id);
if (currentScore < minScore) {
minScore = currentScore;
assignId = id;
}
}
return assignId;
}

public static Map<Integer, Double> getScoreMap(Map<Integer, List<TaskInfo>> assignedMap) {
Map<Integer, Double> idScoreMap = Maps.newHashMap();

// 计算已经分配 task数 和 flow
Map<Integer, Long> flowSumMap = new HashMap<>();

int totalTaskCount = 0;
long totalFlow = 0;
for (int id : assignedMap.keySet()) {
totalTaskCount += assignedMap.get(id).size();
long curTotalIdInflow = 0;
List<TaskInfo> taskInfoList = assignedMap.get(id);
for (TaskInfo taskInfo : taskInfoList) {
curTotalIdInflow += taskInfo.flow;
}
flowSumMap.put(id, curTotalIdInflow);
totalFlow += curTotalIdInflow;
}

// 根据每个 id 对应的 task 数和 flow 打分
double score;
// flow 的权重值
double flowWeight = 0.9;
for (int id : assignedMap.keySet()) {
List<TaskInfo> taskInfoList = assignedMap.get(id);
score = flowWeight * flowSumMap.get(id) / totalFlow
+ (1 - flowWeight) * taskInfoList.size() / totalTaskCount;
idScoreMap.put(id, score);
}
return idScoreMap;
}
}