pester源码阅读笔记

当初写玩具的时候,因为技术菜,所以不会超时重试。当时在github上发现了这个项目,现在想阅读下源码,学习下

https://github.com/sethgrid/pester

首先是常规操作,import库,这里都是内置的库,所以不用特地装

1
2
3
4
5
6
7
8
//ErrUnexpectedMethod occurs when an http.Client method is unable to be mapped from a calling method in the pester client
var ErrUnexpectedMethod = errors.New("unexpected client method, must be one of Do, Get, Head, Post, or PostFrom")

// ErrReadingBody happens when we cannot read the body bytes
var ErrReadingBody = errors.New("error reading body")

// ErrReadingRequestBody happens when we cannot read the request body bytes
var ErrReadingRequestBody = errors.New("error reading request body")

其次是定义错误,这里注释就十分明确了。

接下来就是定义各种类型了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Client wraps the http client and exposes all the functionality of the http.Client.
// Additionally, Client provides pester specific values for handling resiliency.
type Client struct {
// wrap it to provide access to http built ins
hc *http.Client

Transport http.RoundTripper
CheckRedirect func(req *http.Request, via []*http.Request) error
Jar http.CookieJar
Timeout time.Duration

// pester specific
Concurrency int //请求并发数
MaxRetries int //最大重试次数
Backoff BackoffStrategy //重试等待时间
KeepLog bool //是否记录日志
LogHook LogHook

SuccessReqNum int
SuccessRetryNum int

wg *sync.WaitGroup

sync.Mutex
ErrLog []ErrEntry
}

错误类型

1
2
3
4
5
6
7
8
9
10
11
12
13
// ErrEntry is used to provide the LogString() data and is populated
// each time an error happens if KeepLog is set.
// ErrEntry.Retry is deprecated in favor of ErrEntry.Attempt
type ErrEntry struct {
Time time.Time
Method string
URL string
Verb string
Request int
Retry int
Attempt int
Err error
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// result simplifies the channel communication for concurrent request handling
type result struct {
resp *http.Response
err error
req int
retry int
}

// params represents all the params needed to run http client calls and pester errors
// 外部调用的格式
type params struct {
method string
verb string
req *http.Request
url string
bodyType string
body io.Reader
data url.Values
}

可能是为了防止被检测为机器,所以增加了重试时间随机

1
2
3
4
5
var random *rand.Rand

func init() {
random = rand.New(rand.NewSource(time.Now().UnixNano()))
}

以默认参数初始化

1
2
3
4
5
6
7
8
9
10
// New constructs a new DefaultClient with sensible default values
func New() *Client {
return &Client{
Concurrency: DefaultClient.Concurrency,
MaxRetries: DefaultClient.MaxRetries,
Backoff: DefaultClient.Backoff,
ErrLog: DefaultClient.ErrLog,
wg: &sync.WaitGroup{},
}
}

传进一个先前已经设置好的http.client,使他拥有并发和重试

1
2
3
4
5
6
7
// NewExtendedClient allows you to pass in an http.Client that is previously set up
// and extends it to have Pester's features of concurrency and retries.
func NewExtendedClient(hc *http.Client) *Client {
c := New()
c.hc = hc
return c
}

他注释写的蛮清楚的,见注释(逃

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// LogHook is used to log attempts as they happen. This function is never called,
// however, if KeepLog is set to true.
type LogHook func(e ErrEntry)

// BackoffStrategy is used to determine how long a retry request should wait until attempted
type BackoffStrategy func(retry int) time.Duration

// DefaultClient provides sensible defaults
var DefaultClient = &Client{Concurrency: 1, MaxRetries: 3, Backoff: DefaultBackoff, ErrLog: []ErrEntry{}}

// DefaultBackoff always returns 1 second
func DefaultBackoff(_ int) time.Duration {
return 1 * time.Second
}

// ExponentialBackoff returns ever increasing backoffs by a power of 2
func ExponentialBackoff(i int) time.Duration {
return time.Duration(1<<uint(i)) * time.Second
}

// ExponentialJitterBackoff returns ever increasing backoffs by a power of 2
// with +/- 0-33% to prevent sychronized reuqests.
func ExponentialJitterBackoff(i int) time.Duration {
return jitter(int(1 << uint(i)))
}

// LinearBackoff returns increasing durations, each a second longer than the last
func LinearBackoff(i int) time.Duration {
return time.Duration(i) * time.Second
}

// LinearJitterBackoff returns increasing durations, each a second longer than the last
// with +/- 0-33% to prevent sychronized reuqests.
func LinearJitterBackoff(i int) time.Duration {
return jitter(i)
}

// jitter keeps the +/- 0-33% logic in one place
func jitter(i int) time.Duration {
ms := i * 1000

maxJitter := ms / 3

// ms ± rand
ms += random.Intn(2*maxJitter) - maxJitter

// a jitter of 0 messes up the time.Tick chan
if ms <= 0 {
ms = 1
}

return time.Duration(ms) * time.Millisecond
}

// Wait blocks until all pester requests have returned
// Probably not that useful outside of testing.
func (c *Client) Wait() {
c.wg.Wait()
}

接下来就是代码的主要部分了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
// pester provides all the logic of retries, concurrency, backoff, and logging
func (c *Client) pester(p params) (*http.Response, error) {
// 因为涉及到goroutine,所以用通道来进行通信
resultCh := make(chan result)
multiplexCh := make(chan result)
finishCh := make(chan struct{})

// track all requests that go out so we can close the late listener routine that closes late incoming response bodies
totalSentRequests := &sync.WaitGroup{}
totalSentRequests.Add(1)
defer totalSentRequests.Done()
allRequestsBackCh := make(chan struct{})
// 如果所有请求都完成了,关闭allRequestsBackCh通道
go func() {
totalSentRequests.Wait()
close(allRequestsBackCh)
}()

// GET calls should be idempotent and can make use
// of concurrency. Other verbs can mutate and should not
// make use of the concurrency feature
// 因为GET操作是幂等的,所以可以设置并发请求,但是其他方法不行,一次只能请求一次
concurrency := c.Concurrency
if p.verb != "GET" {
concurrency = 1
}

// 这里涉及到写入操作,防止竞争,上了个锁
// 如果没有给定http.client,则初始化一个
c.Lock()
if c.hc == nil {
c.hc = &http.Client{}
c.hc.Transport = c.Transport
c.hc.CheckRedirect = c.CheckRedirect
c.hc.Jar = c.Jar
c.hc.Timeout = c.Timeout
}
c.Unlock()

// re-create the http client so we can leverage the std lib
// 重新建一个http client,以便复用标准库
httpClient := http.Client{
Transport: c.hc.Transport,
CheckRedirect: c.hc.CheckRedirect,
Jar: c.hc.Jar,
Timeout: c.hc.Timeout,
}

// if we have a request body, we need to save it for later
// 如果收到的请求不为空,则需要保存内容,后面作为判断请求是否完成的依据
var originalRequestBody []byte
var originalBody []byte
var err error
if p.req != nil && p.req.Body != nil {
originalRequestBody, err = ioutil.ReadAll(p.req.Body)
if err != nil {
return nil, ErrReadingRequestBody
}
p.req.Body.Close()
}
if p.body != nil {
originalBody, err = ioutil.ReadAll(p.body)
if err != nil {
return nil, ErrReadingBody
}
}

// 最大重试次数
AttemptLimit := c.MaxRetries
if AttemptLimit <= 0 {
AttemptLimit = 1
}

for req := 0; req < concurrency; req++ {
c.wg.Add(1)
totalSentRequests.Add(1)
// 创建concurrency个请求
go func(n int, p params) {
defer c.wg.Done()
defer totalSentRequests.Done()

var err error
for i := 1; i <= AttemptLimit; i++ {
// 如果没有成功返回,则等待n秒后重试,重试次数为AttemptLimit
c.wg.Add(1)
defer c.wg.Done()
select {
case <-finishCh: // 如果请求完成,finishCh通道会被关闭,就退出阻塞状态,return
return
default:
}

// rehydrate the body (it is drained each read)
// 每次请求后清空收到请求内容,防止干扰
if len(originalRequestBody) > 0 {
p.req.Body = ioutil.NopCloser(bytes.NewBuffer(originalRequestBody))
}
if len(originalBody) > 0 {
p.body = bytes.NewBuffer(originalBody)
}

var resp *http.Response // 新建resp来接收请求
// route the calls
// 形象生动,调用路由表
switch p.method {
case "Do":
resp, err = httpClient.Do(p.req)
case "Get":
resp, err = httpClient.Get(p.url)
case "Head":
resp, err = httpClient.Head(p.url)
case "Post":
resp, err = httpClient.Post(p.url, p.bodyType, p.body)
case "PostForm":
resp, err = httpClient.PostForm(p.url, p.data)
default:
err = ErrUnexpectedMethod
}

// Early return if we have a valid result
// Only retry (ie, continue the loop) on 5xx status codes
// 如果请求为空,并且错误代码小于500(说明不是服务端的问题)
// 则向通道内输入报错日志
// 只有服务端错误才进行重试
if err == nil && resp.StatusCode < 500 {
multiplexCh <- result{resp: resp, err: err, req: n, retry: i}
return
}

// 日志格式
c.log(ErrEntry{
Time: time.Now(),
Method: p.method,
Verb: p.verb,
URL: p.url,
Request: n,
Retry: i + 1, // would remove, but would break backward compatibility
Attempt: i,
Err: err,
})

// if it is the last iteration, grab the result (which is an error at this point)
// 如果超出了最大重试次数,则向通道内输入错误,并返回
if i == AttemptLimit {
multiplexCh <- result{resp: resp, err: err}
return
}

//If the request has been cancelled, skip retries
// 如果请求被取消,则跳过重试
// p.req.Context()获取当前请求的状态
if p.req != nil {
ctx := p.req.Context()
select {
case <-ctx.Done():
multiplexCh <- result{resp: resp, err: ctx.Err()}
return
default:
}
}

// if we are retrying, we should close this response body to free the fd
// 如果进行到这里了,肯定是要重试了,先清空一波response body压压惊
if resp != nil {
resp.Body.Close()
}

// prevent a 0 from causing the tick to block, pass additional microsecond
// 定时器
<-time.After(c.Backoff(i) + 1*time.Microsecond)
}
}(req, p)
}

// spin off the go routine so it can continually listen in on late results and close the response bodies
go func() {
gotFirstResult := false
for {
select {
case res := <-multiplexCh: // 如果有请求完成,不管返回的是成功还是失败,通道中就会有数据输出
// 如果是收到的第一个请求
if !gotFirstResult {
gotFirstResult = true
close(finishCh)
resultCh <- res
} else if res.resp != nil {
// we only return one result to the caller; close all other response bodies that come back
// drain the body before close as to not prevent keepalive. see https://gist.github.com/mholt/eba0f2cc96658be0f717
// 如果不是第一个收到的请求,则将请求内容输入无底洞(放弃不是第一个的响应)
io.Copy(ioutil.Discard, res.resp.Body)
res.resp.Body.Close()
}
case <-allRequestsBackCh: // 如果所有请求都完成了,该通道被关闭,退出阻塞态
// don't leave this goroutine running
return
}
}
}()

res := <-resultCh
c.Lock()
defer c.Unlock()
c.SuccessReqNum = res.req
c.SuccessRetryNum = res.retry
return res.resp, res.err

}

日志处理函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// LogString provides a string representation of the errors the client has seen
func (c *Client) LogString() string {
c.Lock()
defer c.Unlock()
var res string
for _, e := range c.ErrLog {
res += c.FormatError(e)
}
return res
}

// Format the Error to human readable string
func (c *Client) FormatError(e ErrEntry) string {
return fmt.Sprintf("%d %s [%s] %s request-%d retry-%d error: %s\n",
e.Time.Unix(), e.Method, e.Verb, e.URL, e.Request, e.Retry, e.Err)
}

// LogErrCount is a helper method used primarily for test validation
func (c *Client) LogErrCount() int {
c.Lock()
defer c.Unlock()
return len(c.ErrLog)
}

func (c *Client) log(e ErrEntry) {
if c.KeepLog {
c.Lock()
defer c.Unlock()
c.ErrLog = append(c.ErrLog, e)
} else if c.LogHook != nil {
// NOTE: There is a possibility that Log Printing hook slows it down.
// but the consumer can always do the Job in a go-routine.
c.LogHook(e)
}
}

改变内部http.Client的属性

1
2
3
4
5
// EmbedHTTPClient allows you to extend an existing Pester client with an
// underlying http.Client, such as https://godoc.org/golang.org/x/oauth2/google#DefaultClient
func (c *Client) EmbedHTTPClient(hc *http.Client) {
c.hc = hc
}

用自定义属性的Client去请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Do provides the same functionality as http.Client.Do
func (c *Client) Do(req *http.Request) (resp *http.Response, err error) {
return c.pester(params{method: "Do", req: req, verb: req.Method, url: req.URL.String()})
}

// Get provides the same functionality as http.Client.Get
func (c *Client) Get(url string) (resp *http.Response, err error) {
return c.pester(params{method: "Get", url: url, verb: "GET"})
}

// Head provides the same functionality as http.Client.Head
func (c *Client) Head(url string) (resp *http.Response, err error) {
return c.pester(params{method: "Head", url: url, verb: "HEAD"})
}

// Post provides the same functionality as http.Client.Post
func (c *Client) Post(url string, bodyType string, body io.Reader) (resp *http.Response, err error) {
return c.pester(params{method: "Post", url: url, bodyType: bodyType, body: body, verb: "POST"})
}

// PostForm provides the same functionality as http.Client.PostForm
func (c *Client) PostForm(url string, data url.Values) (resp *http.Response, err error) {
return c.pester(params{method: "PostForm", url: url, data: data, verb: "POST"})
}

用默认属性的Client去请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
////////////////////////////////////////
// Provide self-constructing variants //
////////////////////////////////////////

// Do provides the same functionality as http.Client.Do and creates its own constructor
func Do(req *http.Request) (resp *http.Response, err error) {
c := New()
return c.Do(req)
}

// Get provides the same functionality as http.Client.Get and creates its own constructor
func Get(url string) (resp *http.Response, err error) {
c := New()
return c.Get(url)
}

// Head provides the same functionality as http.Client.Head and creates its own constructor
func Head(url string) (resp *http.Response, err error) {
c := New()
return c.Head(url)
}

// Post provides the same functionality as http.Client.Post and creates its own constructor
func Post(url string, bodyType string, body io.Reader) (resp *http.Response, err error) {
c := New()
return c.Post(url, bodyType, body)
}

// PostForm provides the same functionality as http.Client.PostForm and creates its own constructor
func PostForm(url string, data url.Values) (resp *http.Response, err error) {
c := New()
return c.PostForm(url, data)
}