社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
了解一下,Go的client是怎么使用TCP链接的。
http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 30 * time.Second, //连接超时时间
KeepAlive: 30 * time.Second, //连接保持超时时间
DualStack: true, //
}).DialContext,
MaxIdleConnsPerHost: 2,//每个host 最大空闲链接数
MaxIdleConns: 100, //client对与所有host最大空闲连接数总和
IdleConnTimeout: 90 * time.Second, //空闲连接在连接池中的超时时间
TLSHandshakeTimeout: 10 * time.Second, //TLS安全连接握手超时时间
ExpectContinueTimeout: 1 * time.Second, //发送完请求到接收到响应头的超时时间
},
//总的超时时间
Timeout: serv.GetTotalTimeout(),
}.Do(req)
func (c *Client) Do(req *Request) (*Response, error) {
//...
for {
//首次不会进入,如果需要重定向,会进入
if len(reqs) > 0 {
//...
}
reqs = append(reqs, req)
var err error
var didTimeout func() bool
// 调用send
if resp, didTimeout, err = c.send(req, deadline); err != nil {
if !deadline.IsZero() && didTimeout() {
err = &httpError{
err: err.Error() + " (Client.Timeout exceeded while awaiting headers)",
timeout: true,
}
}
return nil, uerr(err)
}
//检查是否需要重定向,如果不需要直接返回
var shouldRedirect bool
redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
if !shouldRedirect {
return resp, nil
}
req.closeBody()
}
}
// didTimeout is non-nil only if err != nil.
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
//...
resp, didTimeout, err = send(req, c.transport(), deadline)
//...
}
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
//...
stopTimer, didTimeout := setRequestCancel(req, rt, deadline)
resp, err = rt.RoundTrip(req)
if err != nil {
stopTimer()
//...
return nil, didTimeout, err
}
//...
return resp, nil, nil
}
setRequestCancel:有三种请求取消的方式,
1.tcp,Transport.CancelRequest
2.Request.Cancel 借助timer
3.Request.Context
(后面细说)
RoundTrip:transport提供的API,request发起,response接收
func (t *Transport) RoundTrip(req *Request) (*Response, error) {
for {
...
// Get the cached or newly-created connection to either the
// host (for http or https), the http proxy, or the http proxy
// pre-CONNECTed to https server. In any case, we'll be ready
// to send it requests.
pconn, err := t.getConn(treq, cm)
if err != nil {
t.setReqCanceler(req, nil)
req.closeBody()
return nil, err
}
var resp *Response
if pconn.alt != nil {
// HTTP/2 path.
t.setReqCanceler(req, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
resp, err = pconn.roundTrip(treq)
}
if err == nil {
return resp, nil
}
if !pconn.shouldRetryRequest(req, err) {
// Issue 16465: return underlying net.Conn.Read error from peek,
// as we've historically done.
if e, ok := err.(transportReadFromServerError); ok {
err = e.err
}
return nil, err
}
...
}
}
func (t *Transport) getConn 获取一个tcp链接
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) {
req := treq.Request
trace := treq.trace
ctx := req.Context()
if trace != nil && trace.GetConn != nil {
trace.GetConn(cm.addr())
}
if pc, idleSince := t.getIdleConn(cm); pc != nil {
if trace != nil && trace.GotConn != nil {
trace.GotConn(pc.gotIdleConnTrace(idleSince))
}
// set request canceler to some non-nil function so we
// can detect whether it was cleared between now and when
// we enter roundTrip
t.setReqCanceler(req, func(error) {})
return pc, nil
}
type dialRes struct {
pc *persistConn
err error
}
dialc := make(chan dialRes)
// Copy these hooks so we don't race on the postPendingDial in
// the goroutine we launch. Issue 11136.
testHookPrePendingDial := testHookPrePendingDial
testHookPostPendingDial := testHookPostPendingDial
handlePendingDial := func() {
testHookPrePendingDial()
go func() {
if v := <-dialc; v.err == nil {
t.putOrCloseIdleConn(v.pc)
}
testHookPostPendingDial()
}()
}
cancelc := make(chan error, 1)
t.setReqCanceler(req, func(err error) { cancelc <- err })
go func() {
pc, err := t.dialConn(ctx, cm)
dialc <- dialRes{pc, err}
}()
idleConnCh := t.getIdleConnCh(cm)
select {
case v := <-dialc:
// Our dial finished.
if v.pc != nil {
if trace != nil && trace.GotConn != nil && v.pc.alt == nil {
trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn})
}
return v.pc, nil
}
// Our dial failed. See why to return a nicer error
// value.
select {
case <-req.Cancel:
// It was an error due to cancelation, so prioritize that
// error value. (Issue 16049)
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
default:
// It wasn't an error due to cancelation, so
// return the original error message:
return nil, v.err
}
case pc := <-idleConnCh:
// Another request finished first and its net.Conn
// became available before our dial. Or somebody
// else's dial that they didn't use.
// But our dial is still going, so give it away
// when it finishes:
handlePendingDial()
if trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
}
return pc, nil
case <-req.Cancel:
handlePendingDial()
return nil, errRequestCanceledConn
case <-req.Context().Done():
handlePendingDial()
return nil, req.Context().Err()
case err := <-cancelc:
handlePendingDial()
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
}
}
getconn里面的实现具体为
开启一个goroutine去建立tcp链接,主程序block在select/case里面,如果先获取到刚建立的tcp链接就直接返回,如果连接池里面有链接,就返回连接池里面的空闲链接,同时将刚才建立的链接在放回连接池。
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) {
...
go pconn.readLoop() //读resp
go pconn.writeLoop() //写req
return pconn, nil
}
再看下pconn.readLoop 的代码可以知道,在读完响应body之后,如果这个链接还是活跃的就放回连接池。
附:
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!