滑动窗口计数

  滑动窗口计数有很多使用场景,比如说限流防止系统雪崩。相比计数实现,滑动窗口实现会更加平滑,能自动消除毛刺。
  滑动窗口原理是在每次有访问进来时,先判断前N个单位时间内的总访问量是否超过了设置的阈值,并对当前时间片上的请求数+1。

  每一个格式表示一个固定的时间(比如1s),每个格子一个计数器,要获取前3s的请求量,就是对当前时间片i ~ i-2的时间片上计数器进行累加。
  这种模式的实现的方式更加契合流控的本质意义,理解较为简单。但由于访问量的不可预见性,会发生单位时间的前半段大量请求涌入,而后半段则拒绝所有请求的情况(通常,需要可以将单位时间切的足够的小来缓解);其次,很难确定这个阈值设置在多少比较合适,只能通过经验或者模拟(如压测)来进行估计,即使是压测也很难估计的准确。集群部署中每台机器的硬件参数不同,可能导致需要对每台机器的阈值设置的都不尽相同。同一台机子在不同的时间点的系统压力也不一样(比如晚上还有一些任务,或其他的一些业务操作的影响),能够承受的最大阈值也不尽相同,无法考虑的周全。
  所以滑窗模式通常适用于对某一资源的保护的需求上,如对db的保护,对某一服务的调用的控制上。

代码实现思路:
  每一个时间片(单位时间)就是一个独立的计数器,用以数组保存。将当前时间以某种方式(比如取模)映射到数组的一项中。每次访问先对当前时间片上的计数器+1,再计算前N个时间片的访问量总合,超过阈值则限流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import java.util.concurrent.atomic.AtomicInteger;

public class SlidingWindow {

private AtomicInteger[] timeSlices;
/* 队列的总长度 */
private final int timeSliceSize;
/* 每个时间片的时长 */
private final long timeMillisPerSlice;
/* 窗口长度 */
private final int windowSize;

/* 当前所使用的时间片位置 */
private AtomicInteger cursor = new AtomicInteger(0);

public static enum Time {
MILLISECONDS(1),
SECONDS(1000),
MINUTES(SECONDS.getMillis() * 60),
HOURS(MINUTES.getMillis() * 60),
DAYS(HOURS.getMillis() * 24),
WEEKS(DAYS.getMillis() * 7);

private long millis;

Time(long millis) {
this.millis = millis;
}

public long getMillis() {
return millis;
}
}

public SlidingWindow(int windowSize, Time timeSlice) {
this.timeMillisPerSlice = timeSlice.millis;
this.windowSize = windowSize;
// 保证存储在至少两个window
this.timeSliceSize = windowSize * 2 + 1;

init();
}

/**
* 初始化
*/
private void init() {
AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];
for (int i = 0; i < timeSliceSize; i++) {
localTimeSlices[i] = new AtomicInteger(0);
}
timeSlices = localTimeSlices;
}

private int locationIndex() {
long time = System.currentTimeMillis();
return (int) ((time / timeMillisPerSlice) % timeSliceSize);
}

/**
* <p>对时间片计数+1,并返回窗口中所有的计数总和
* <p>该方法只要调用就一定会对某个时间片进行+1
* @return
*/
public int incrementAndSum() {
int index = locationIndex();
int sum = 0;
// cursor等于index,返回true
// cursor不等于index,返回false,并会将cursor设置为index
int oldCursor = cursor.getAndSet(index);
if (oldCursor == index) {
// 在当前时间片里继续+1
sum += timeSlices[index].incrementAndGet();
} else {
//轮到新的时间片,置0,可能有其它线程也置了该值,容许
timeSlices[index].set(0);
// 清零,访问量不大时会有时间片跳跃的情况
clearBetween(oldCursor, index);

sum += timeSlices[index].incrementAndGet();
}

for (int i = 1; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
return sum;
}

/**
* 判断是否允许进行访问,未超过阈值的话才会对某个时间片+1
* @param threshold
* @return
*/
public boolean allow(int threshold) {
int index = locationIndex();
int sum = 0;
int oldCursor = cursor.getAndSet(index);
if (oldCursor != index) {
timeSlices[index].set(0);
clearBetween(oldCursor, index);
}
for (int i = 0; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}

// 阈值判断
if (sum < threshold) {
// 未超过阈值才+1
timeSlices[index].incrementAndGet();
return true;
}
return false;
}

/**
* <p>将fromIndex~toIndex之间的时间片计数都清零
* <p>极端情况下,当循环队列已经走了超过1个timeSliceSize以上,这里的清零并不能如期望的进行
* @param fromIndex 不包含
* @param toIndex 不包含
*/
private void clearBetween(int fromIndex, int toIndex) {
for (int index = (fromIndex + 1) % timeSliceSize; index != toIndex; index = (index + 1) % timeSliceSize) {
timeSlices[index].set(0);
}
}

public static void main(String[] args) {
SlidingWindow window = new SlidingWindow(5, Time.MILLISECONDS);
for (int i = 0; i < 10; i++) {
System.out.println(window.allow(7));
}
}
}

转载自:http://go12345.iteye.com/blog/1744728