chore: migrate to gitea
Some checks failed
golangci-lint / lint (push) Failing after 1m30s
Test / test (push) Failing after 2m17s

This commit is contained in:
2026-01-27 00:40:46 +01:00
parent 1e05874160
commit f8df24c29d
3169 changed files with 1216434 additions and 1587 deletions

View File

@@ -0,0 +1,167 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package resolver provides internal resolver-related functionality.
package resolver
import (
"context"
"sync"
"google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
)
// ConfigSelector controls what configuration to use for every RPC.
type ConfigSelector interface {
// Selects the configuration for the RPC, or terminates it using the error.
// This error will be converted by the gRPC library to a status error with
// code UNKNOWN if it is not returned as a status error.
SelectConfig(RPCInfo) (*RPCConfig, error)
}
// RPCInfo contains RPC information needed by a ConfigSelector.
type RPCInfo struct {
// Context is the user's context for the RPC and contains headers and
// application timeout. It is passed for interception purposes and for
// efficiency reasons. SelectConfig should not be blocking.
Context context.Context
Method string // i.e. "/Service/Method"
}
// RPCConfig describes the configuration to use for each RPC.
type RPCConfig struct {
// The context to use for the remainder of the RPC; can pass info to LB
// policy or affect timeout or metadata.
Context context.Context
MethodConfig serviceconfig.MethodConfig // configuration to use for this RPC
OnCommitted func() // Called when the RPC has been committed (retries no longer possible)
Interceptor ClientInterceptor
}
// ClientStream is the same as grpc.ClientStream, but defined here for circular
// dependency reasons.
type ClientStream interface {
// Header returns the header metadata received from the server if there
// is any. It blocks if the metadata is not ready to read.
Header() (metadata.MD, error)
// Trailer returns the trailer metadata from the server, if there is any.
// It must only be called after stream.CloseAndRecv has returned, or
// stream.Recv has returned a non-nil error (including io.EOF).
Trailer() metadata.MD
// CloseSend closes the send direction of the stream. It closes the stream
// when non-nil error is met. It is also not safe to call CloseSend
// concurrently with SendMsg.
CloseSend() error
// Context returns the context for this stream.
//
// It should not be called until after Header or RecvMsg has returned. Once
// called, subsequent client-side retries are disabled.
Context() context.Context
// SendMsg is generally called by generated code. On error, SendMsg aborts
// the stream. If the error was generated by the client, the status is
// returned directly; otherwise, io.EOF is returned and the status of
// the stream may be discovered using RecvMsg.
//
// SendMsg blocks until:
// - There is sufficient flow control to schedule m with the transport, or
// - The stream is done, or
// - The stream breaks.
//
// SendMsg does not wait until the message is received by the server. An
// untimely stream closure may result in lost messages. To ensure delivery,
// users should ensure the RPC completed successfully using RecvMsg.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines. It is also
// not safe to call CloseSend concurrently with SendMsg.
SendMsg(m any) error
// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the stream completes successfully. On
// any other error, the stream is aborted and the error contains the RPC
// status.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not
// safe to call RecvMsg on the same stream in different goroutines.
RecvMsg(m any) error
}
// ClientInterceptor is an interceptor for gRPC client streams.
type ClientInterceptor interface {
// NewStream produces a ClientStream for an RPC which may optionally use
// the provided function to produce a stream for delegation. Note:
// RPCInfo.Context should not be used (will be nil).
//
// done is invoked when the RPC is finished using its connection, or could
// not be assigned a connection. RPC operations may still occur on
// ClientStream after done is called, since the interceptor is invoked by
// application-layer operations. done must never be nil when called.
NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error)
}
// ServerInterceptor is an interceptor for incoming RPC's on gRPC server side.
type ServerInterceptor interface {
// AllowRPC checks if an incoming RPC is allowed to proceed based on
// information about connection RPC was received on, and HTTP Headers. This
// information will be piped into context.
AllowRPC(ctx context.Context) error // TODO: Make this a real interceptor for filters such as rate limiting.
}
type csKeyType string
const csKey = csKeyType("grpc.internal.resolver.configSelector")
// SetConfigSelector sets the config selector in state and returns the new
// state.
func SetConfigSelector(state resolver.State, cs ConfigSelector) resolver.State {
state.Attributes = state.Attributes.WithValue(csKey, cs)
return state
}
// GetConfigSelector retrieves the config selector from state, if present, and
// returns it or nil if absent.
func GetConfigSelector(state resolver.State) ConfigSelector {
cs, _ := state.Attributes.Value(csKey).(ConfigSelector)
return cs
}
// SafeConfigSelector allows for safe switching of ConfigSelector
// implementations such that previous values are guaranteed to not be in use
// when UpdateConfigSelector returns.
type SafeConfigSelector struct {
mu sync.RWMutex
cs ConfigSelector
}
// UpdateConfigSelector swaps to the provided ConfigSelector and blocks until
// all uses of the previous ConfigSelector have completed.
func (scs *SafeConfigSelector) UpdateConfigSelector(cs ConfigSelector) {
scs.mu.Lock()
defer scs.mu.Unlock()
scs.cs = cs
}
// SelectConfig defers to the current ConfigSelector in scs.
func (scs *SafeConfigSelector) SelectConfig(r RPCInfo) (*RPCConfig, error) {
scs.mu.RLock()
defer scs.mu.RUnlock()
return scs.cs.SelectConfig(r)
}

View File

@@ -0,0 +1,477 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package delegatingresolver implements a resolver capable of resolving both
// target URIs and proxy addresses.
package delegatingresolver
import (
"fmt"
"net"
"net/http"
"net/url"
"sync"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/proxyattributes"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
var (
logger = grpclog.Component("delegating-resolver")
// HTTPSProxyFromEnvironment will be overwritten in the tests
HTTPSProxyFromEnvironment = http.ProxyFromEnvironment
)
const defaultPort = "443"
// delegatingResolver manages both target URI and proxy address resolution by
// delegating these tasks to separate child resolvers. Essentially, it acts as
// an intermediary between the gRPC ClientConn and the child resolvers.
//
// It implements the [resolver.Resolver] interface.
type delegatingResolver struct {
target resolver.Target // parsed target URI to be resolved
cc resolver.ClientConn // gRPC ClientConn
proxyURL *url.URL // proxy URL, derived from proxy environment and target
// We do not hold both mu and childMu in the same goroutine. Avoid holding
// both locks when calling into the child, as the child resolver may
// synchronously callback into the channel.
mu sync.Mutex // protects all the fields below
targetResolverState *resolver.State // state of the target resolver
proxyAddrs []resolver.Address // resolved proxy addresses; empty if no proxy is configured
// childMu serializes calls into child resolvers. It also protects access to
// the following fields.
childMu sync.Mutex
targetResolver resolver.Resolver // resolver for the target URI, based on its scheme
proxyResolver resolver.Resolver // resolver for the proxy URI; nil if no proxy is configured
}
// nopResolver is a resolver that does nothing.
type nopResolver struct{}
func (nopResolver) ResolveNow(resolver.ResolveNowOptions) {}
func (nopResolver) Close() {}
// proxyURLForTarget determines the proxy URL for the given address based on the
// environment. It can return the following:
// - nil URL, nil error: No proxy is configured or the address is excluded
// using the `NO_PROXY` environment variable or if req.URL.Host is
// "localhost" (with or without // a port number)
// - nil URL, non-nil error: An error occurred while retrieving the proxy URL.
// - non-nil URL, nil error: A proxy is configured, and the proxy URL was
// retrieved successfully without any errors.
func proxyURLForTarget(address string) (*url.URL, error) {
req := &http.Request{URL: &url.URL{
Scheme: "https",
Host: address,
}}
return HTTPSProxyFromEnvironment(req)
}
// New creates a new delegating resolver that can create up to two child
// resolvers:
// - one to resolve the proxy address specified using the supported
// environment variables. This uses the registered resolver for the "dns"
// scheme. It is lazily built when a target resolver update contains at least
// one TCP address.
// - one to resolve the target URI using the resolver specified by the scheme
// in the target URI or specified by the user using the WithResolvers dial
// option. As a special case, if the target URI's scheme is "dns" and a
// proxy is specified using the supported environment variables, the target
// URI's path portion is used as the resolved address unless target
// resolution is enabled using the dial option.
func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions, targetResolverBuilder resolver.Builder, targetResolutionEnabled bool) (resolver.Resolver, error) {
r := &delegatingResolver{
target: target,
cc: cc,
proxyResolver: nopResolver{},
targetResolver: nopResolver{},
}
addr := target.Endpoint()
var err error
if target.URL.Scheme == "dns" && !targetResolutionEnabled && envconfig.EnableDefaultPortForProxyTarget {
addr, err = parseTarget(addr)
if err != nil {
return nil, fmt.Errorf("delegating_resolver: invalid target address %q: %v", target.Endpoint(), err)
}
}
r.proxyURL, err = proxyURLForTarget(addr)
if err != nil {
return nil, fmt.Errorf("delegating_resolver: failed to determine proxy URL for target %q: %v", target, err)
}
// proxy is not configured or proxy address excluded using `NO_PROXY` env
// var, so only target resolver is used.
if r.proxyURL == nil {
return targetResolverBuilder.Build(target, cc, opts)
}
if logger.V(2) {
logger.Infof("Proxy URL detected : %s", r.proxyURL)
}
// Resolver updates from one child may trigger calls into the other. Block
// updates until the children are initialized.
r.childMu.Lock()
defer r.childMu.Unlock()
// When the scheme is 'dns' and target resolution on client is not enabled,
// resolution should be handled by the proxy, not the client. Therefore, we
// bypass the target resolver and store the unresolved target address.
if target.URL.Scheme == "dns" && !targetResolutionEnabled {
r.targetResolverState = &resolver.State{
Addresses: []resolver.Address{{Addr: addr}},
Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: addr}}}},
}
r.updateTargetResolverState(*r.targetResolverState)
return r, nil
}
wcc := &wrappingClientConn{
stateListener: r.updateTargetResolverState,
parent: r,
}
if r.targetResolver, err = targetResolverBuilder.Build(target, wcc, opts); err != nil {
return nil, fmt.Errorf("delegating_resolver: unable to build the resolver for target %s: %v", target, err)
}
return r, nil
}
// proxyURIResolver creates a resolver for resolving proxy URIs using the "dns"
// scheme. It adjusts the proxyURL to conform to the "dns:///" format and builds
// a resolver with a wrappingClientConn to capture resolved addresses.
func (r *delegatingResolver) proxyURIResolver(opts resolver.BuildOptions) (resolver.Resolver, error) {
proxyBuilder := resolver.Get("dns")
if proxyBuilder == nil {
panic("delegating_resolver: resolver for proxy not found for scheme dns")
}
url := *r.proxyURL
url.Scheme = "dns"
url.Path = "/" + r.proxyURL.Host
url.Host = "" // Clear the Host field to conform to the "dns:///" format
proxyTarget := resolver.Target{URL: url}
wcc := &wrappingClientConn{
stateListener: r.updateProxyResolverState,
parent: r,
}
return proxyBuilder.Build(proxyTarget, wcc, opts)
}
func (r *delegatingResolver) ResolveNow(o resolver.ResolveNowOptions) {
r.childMu.Lock()
defer r.childMu.Unlock()
r.targetResolver.ResolveNow(o)
r.proxyResolver.ResolveNow(o)
}
func (r *delegatingResolver) Close() {
r.childMu.Lock()
defer r.childMu.Unlock()
r.targetResolver.Close()
r.targetResolver = nil
r.proxyResolver.Close()
r.proxyResolver = nil
}
func needsProxyResolver(state *resolver.State) bool {
for _, addr := range state.Addresses {
if !skipProxy(addr) {
return true
}
}
for _, endpoint := range state.Endpoints {
for _, addr := range endpoint.Addresses {
if !skipProxy(addr) {
return true
}
}
}
return false
}
// parseTarget takes a target string and ensures it is a valid "host:port" target.
//
// It does the following:
// 1. If the target already has a port (e.g., "host:port", "[ipv6]:port"),
// it is returned as is.
// 2. If the host part is empty (e.g., ":80"), it defaults to "localhost",
// returning "localhost:80".
// 3. If the target is missing a port (e.g., "host", "ipv6"), the defaultPort
// is added.
//
// An error is returned for empty targets or targets with a trailing colon
// but no port (e.g., "host:").
func parseTarget(target string) (string, error) {
if target == "" {
return "", fmt.Errorf("missing address")
}
host, port, err := net.SplitHostPort(target)
if err != nil {
// If SplitHostPort fails, it's likely because the port is missing.
// We append the default port and return the result.
return net.JoinHostPort(target, defaultPort), nil
}
// If SplitHostPort succeeds, we check for edge cases.
if port == "" {
// A success with an empty port means the target had a trailing colon,
// e.g., "host:", which is an error.
return "", fmt.Errorf("missing port after port-separator colon")
}
if host == "" {
// A success with an empty host means the target was like ":80".
// We default the host to "localhost".
host = "localhost"
}
return net.JoinHostPort(host, port), nil
}
func skipProxy(address resolver.Address) bool {
// Avoid proxy when network is not tcp.
networkType, ok := networktype.Get(address)
if !ok {
networkType, _ = transport.ParseDialTarget(address.Addr)
}
if networkType != "tcp" {
return true
}
req := &http.Request{URL: &url.URL{
Scheme: "https",
Host: address.Addr,
}}
// Avoid proxy when address included in `NO_PROXY` environment variable or
// fails to get the proxy address.
url, err := HTTPSProxyFromEnvironment(req)
if err != nil || url == nil {
return true
}
return false
}
// updateClientConnStateLocked constructs a combined list of addresses by
// pairing each proxy address with every target address of type TCP. For each
// pair, it creates a new [resolver.Address] using the proxy address and
// attaches the corresponding target address and user info as attributes. Target
// addresses that are not of type TCP are appended to the list as-is. The
// function returns nil if either resolver has not yet provided an update, and
// returns the result of ClientConn.UpdateState once both resolvers have
// provided at least one update.
func (r *delegatingResolver) updateClientConnStateLocked() error {
if r.targetResolverState == nil || r.proxyAddrs == nil {
return nil
}
// If multiple resolved proxy addresses are present, we send only the
// unresolved proxy host and let net.Dial handle the proxy host name
// resolution when creating the transport. Sending all resolved addresses
// would increase the number of addresses passed to the ClientConn and
// subsequently to load balancing (LB) policies like Round Robin, leading
// to additional TCP connections. However, if there's only one resolved
// proxy address, we send it directly, as it doesn't affect the address
// count returned by the target resolver and the address count sent to the
// ClientConn.
var proxyAddr resolver.Address
if len(r.proxyAddrs) == 1 {
proxyAddr = r.proxyAddrs[0]
} else {
proxyAddr = resolver.Address{Addr: r.proxyURL.Host}
}
var addresses []resolver.Address
for _, targetAddr := range (*r.targetResolverState).Addresses {
if skipProxy(targetAddr) {
addresses = append(addresses, targetAddr)
continue
}
addresses = append(addresses, proxyattributes.Set(proxyAddr, proxyattributes.Options{
User: r.proxyURL.User,
ConnectAddr: targetAddr.Addr,
}))
}
// For each target endpoint, construct a new [resolver.Endpoint] that
// includes all addresses from all proxy endpoints and the addresses from
// that target endpoint, preserving the number of target endpoints.
var endpoints []resolver.Endpoint
for _, endpt := range (*r.targetResolverState).Endpoints {
var addrs []resolver.Address
for _, targetAddr := range endpt.Addresses {
// Avoid proxy when network is not tcp.
if skipProxy(targetAddr) {
addrs = append(addrs, targetAddr)
continue
}
for _, proxyAddr := range r.proxyAddrs {
addrs = append(addrs, proxyattributes.Set(proxyAddr, proxyattributes.Options{
User: r.proxyURL.User,
ConnectAddr: targetAddr.Addr,
}))
}
}
endpoints = append(endpoints, resolver.Endpoint{Addresses: addrs})
}
// Use the targetResolverState for its service config and attributes
// contents. The state update is only sent after both the target and proxy
// resolvers have sent their updates, and curState has been updated with the
// combined addresses.
curState := *r.targetResolverState
curState.Addresses = addresses
curState.Endpoints = endpoints
return r.cc.UpdateState(curState)
}
// updateProxyResolverState updates the proxy resolver state by storing proxy
// addresses and endpoints, marking the resolver as ready, and triggering a
// state update if both proxy and target resolvers are ready. If the ClientConn
// returns a non-nil error, it calls `ResolveNow()` on the target resolver. It
// is a StateListener function of wrappingClientConn passed to the proxy
// resolver.
func (r *delegatingResolver) updateProxyResolverState(state resolver.State) error {
r.mu.Lock()
defer r.mu.Unlock()
if logger.V(2) {
logger.Infof("Addresses received from proxy resolver: %s", state.Addresses)
}
if len(state.Endpoints) > 0 {
// We expect exactly one address per endpoint because the proxy resolver
// uses "dns" resolution.
r.proxyAddrs = make([]resolver.Address, 0, len(state.Endpoints))
for _, endpoint := range state.Endpoints {
r.proxyAddrs = append(r.proxyAddrs, endpoint.Addresses...)
}
} else if state.Addresses != nil {
r.proxyAddrs = state.Addresses
} else {
r.proxyAddrs = []resolver.Address{} // ensure proxyAddrs is non-nil to indicate an update has been received
}
err := r.updateClientConnStateLocked()
// Another possible approach was to block until updates are received from
// both resolvers. But this is not used because calling `New()` triggers
// `Build()` for the first resolver, which calls `UpdateState()`. And the
// second resolver hasn't sent an update yet, so it would cause `New()` to
// block indefinitely.
if err != nil {
go func() {
r.childMu.Lock()
defer r.childMu.Unlock()
if r.targetResolver != nil {
r.targetResolver.ResolveNow(resolver.ResolveNowOptions{})
}
}()
}
return err
}
// updateTargetResolverState is the StateListener function provided to the
// target resolver via wrappingClientConn. It updates the resolver state and
// marks the target resolver as ready. If the update includes at least one TCP
// address and the proxy resolver has not yet been constructed, it initializes
// the proxy resolver. A combined state update is triggered once both resolvers
// are ready. If all addresses are non-TCP, it proceeds without waiting for the
// proxy resolver. If ClientConn.UpdateState returns a non-nil error,
// ResolveNow() is called on the proxy resolver.
func (r *delegatingResolver) updateTargetResolverState(state resolver.State) error {
r.mu.Lock()
defer r.mu.Unlock()
if logger.V(2) {
logger.Infof("Addresses received from target resolver: %v", state.Addresses)
}
r.targetResolverState = &state
// If all addresses returned by the target resolver have a non-TCP network
// type, or are listed in the `NO_PROXY` environment variable, do not wait
// for proxy update.
if !needsProxyResolver(r.targetResolverState) {
return r.cc.UpdateState(*r.targetResolverState)
}
// The proxy resolver may be rebuilt multiple times, specifically each time
// the target resolver sends an update, even if the target resolver is built
// successfully but building the proxy resolver fails.
if len(r.proxyAddrs) == 0 {
go func() {
r.childMu.Lock()
defer r.childMu.Unlock()
if _, ok := r.proxyResolver.(nopResolver); !ok {
return
}
proxyResolver, err := r.proxyURIResolver(resolver.BuildOptions{})
if err != nil {
r.cc.ReportError(fmt.Errorf("delegating_resolver: unable to build the proxy resolver: %v", err))
return
}
r.proxyResolver = proxyResolver
}()
}
err := r.updateClientConnStateLocked()
if err != nil {
go func() {
r.childMu.Lock()
defer r.childMu.Unlock()
if r.proxyResolver != nil {
r.proxyResolver.ResolveNow(resolver.ResolveNowOptions{})
}
}()
}
return nil
}
// wrappingClientConn serves as an intermediary between the parent ClientConn
// and the child resolvers created here. It implements the resolver.ClientConn
// interface and is passed in that capacity to the child resolvers.
type wrappingClientConn struct {
// Callback to deliver resolver state updates
stateListener func(state resolver.State) error
parent *delegatingResolver
}
// UpdateState receives resolver state updates and forwards them to the
// appropriate listener function (either for the proxy or target resolver).
func (wcc *wrappingClientConn) UpdateState(state resolver.State) error {
return wcc.stateListener(state)
}
// ReportError intercepts errors from the child resolvers and passes them to
// ClientConn.
func (wcc *wrappingClientConn) ReportError(err error) {
wcc.parent.cc.ReportError(err)
}
// NewAddress intercepts the new resolved address from the child resolvers and
// passes them to ClientConn.
func (wcc *wrappingClientConn) NewAddress(addrs []resolver.Address) {
wcc.UpdateState(resolver.State{Addresses: addrs})
}
// ParseServiceConfig parses the provided service config and returns an object
// that provides the parsed config.
func (wcc *wrappingClientConn) ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult {
return wcc.parent.cc.ParseServiceConfig(serviceConfigJSON)
}

View File

@@ -0,0 +1,461 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package dns implements a dns resolver to be installed as the default resolver
// in grpc.
package dns
import (
"context"
"encoding/json"
"fmt"
rand "math/rand/v2"
"net"
"net/netip"
"os"
"strconv"
"strings"
"sync"
"time"
grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/resolver/dns/internal"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
var (
// EnableSRVLookups controls whether the DNS resolver attempts to fetch gRPCLB
// addresses from SRV records. Must not be changed after init time.
EnableSRVLookups = false
// MinResolutionInterval is the minimum interval at which re-resolutions are
// allowed. This helps to prevent excessive re-resolution.
MinResolutionInterval = 30 * time.Second
// ResolvingTimeout specifies the maximum duration for a DNS resolution request.
// If the timeout expires before a response is received, the request will be canceled.
//
// It is recommended to set this value at application startup. Avoid modifying this variable
// after initialization as it's not thread-safe for concurrent modification.
ResolvingTimeout = 30 * time.Second
logger = grpclog.Component("dns")
)
func init() {
resolver.Register(NewBuilder())
internal.TimeAfterFunc = time.After
internal.TimeNowFunc = time.Now
internal.TimeUntilFunc = time.Until
internal.NewNetResolver = newNetResolver
internal.AddressDialer = addressDialer
}
const (
defaultPort = "443"
defaultDNSSvrPort = "53"
golang = "GO"
// txtPrefix is the prefix string to be prepended to the host name for txt
// record lookup.
txtPrefix = "_grpc_config."
// In DNS, service config is encoded in a TXT record via the mechanism
// described in RFC-1464 using the attribute name grpc_config.
txtAttribute = "grpc_config="
)
var addressDialer = func(address string) func(context.Context, string, string) (net.Conn, error) {
return func(ctx context.Context, network, _ string) (net.Conn, error) {
var dialer net.Dialer
return dialer.DialContext(ctx, network, address)
}
}
var newNetResolver = func(authority string) (internal.NetResolver, error) {
if authority == "" {
return net.DefaultResolver, nil
}
host, port, err := parseTarget(authority, defaultDNSSvrPort)
if err != nil {
return nil, err
}
authorityWithPort := net.JoinHostPort(host, port)
return &net.Resolver{
PreferGo: true,
Dial: internal.AddressDialer(authorityWithPort),
}, nil
}
// NewBuilder creates a dnsBuilder which is used to factory DNS resolvers.
func NewBuilder() resolver.Builder {
return &dnsBuilder{}
}
type dnsBuilder struct{}
// Build creates and starts a DNS resolver that watches the name resolution of
// the target.
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
host, port, err := parseTarget(target.Endpoint(), defaultPort)
if err != nil {
return nil, err
}
// IP address.
if ipAddr, err := formatIP(host); err == nil {
addr := []resolver.Address{{Addr: ipAddr + ":" + port}}
cc.UpdateState(resolver.State{Addresses: addr})
return deadResolver{}, nil
}
// DNS address (non-IP).
ctx, cancel := context.WithCancel(context.Background())
d := &dnsResolver{
host: host,
port: port,
ctx: ctx,
cancel: cancel,
cc: cc,
rn: make(chan struct{}, 1),
enableServiceConfig: envconfig.EnableTXTServiceConfig && !opts.DisableServiceConfig,
}
d.resolver, err = internal.NewNetResolver(target.URL.Host)
if err != nil {
return nil, err
}
d.wg.Add(1)
go d.watcher()
return d, nil
}
// Scheme returns the naming scheme of this resolver builder, which is "dns".
func (b *dnsBuilder) Scheme() string {
return "dns"
}
// deadResolver is a resolver that does nothing.
type deadResolver struct{}
func (deadResolver) ResolveNow(resolver.ResolveNowOptions) {}
func (deadResolver) Close() {}
// dnsResolver watches for the name resolution update for a non-IP target.
type dnsResolver struct {
host string
port string
resolver internal.NetResolver
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
// rn channel is used by ResolveNow() to force an immediate resolution of the
// target.
rn chan struct{}
// wg is used to enforce Close() to return after the watcher() goroutine has
// finished. Otherwise, data race will be possible. [Race Example] in
// dns_resolver_test we replace the real lookup functions with mocked ones to
// facilitate testing. If Close() doesn't wait for watcher() goroutine
// finishes, race detector sometimes will warn lookup (READ the lookup
// function pointers) inside watcher() goroutine has data race with
// replaceNetFunc (WRITE the lookup function pointers).
wg sync.WaitGroup
enableServiceConfig bool
}
// ResolveNow invoke an immediate resolution of the target that this
// dnsResolver watches.
func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {
select {
case d.rn <- struct{}{}:
default:
}
}
// Close closes the dnsResolver.
func (d *dnsResolver) Close() {
d.cancel()
d.wg.Wait()
}
func (d *dnsResolver) watcher() {
defer d.wg.Done()
backoffIndex := 1
for {
state, err := d.lookup()
if err != nil {
// Report error to the underlying grpc.ClientConn.
d.cc.ReportError(err)
} else {
err = d.cc.UpdateState(*state)
}
var nextResolutionTime time.Time
if err == nil {
// Success resolving, wait for the next ResolveNow. However, also wait 30
// seconds at the very least to prevent constantly re-resolving.
backoffIndex = 1
nextResolutionTime = internal.TimeNowFunc().Add(MinResolutionInterval)
select {
case <-d.ctx.Done():
return
case <-d.rn:
}
} else {
// Poll on an error found in DNS Resolver or an error received from
// ClientConn.
nextResolutionTime = internal.TimeNowFunc().Add(backoff.DefaultExponential.Backoff(backoffIndex))
backoffIndex++
}
select {
case <-d.ctx.Done():
return
case <-internal.TimeAfterFunc(internal.TimeUntilFunc(nextResolutionTime)):
}
}
}
func (d *dnsResolver) lookupSRV(ctx context.Context) ([]resolver.Address, error) {
// Skip this particular host to avoid timeouts with some versions of
// systemd-resolved.
if !EnableSRVLookups || d.host == "metadata.google.internal." {
return nil, nil
}
var newAddrs []resolver.Address
_, srvs, err := d.resolver.LookupSRV(ctx, "grpclb", "tcp", d.host)
if err != nil {
err = handleDNSError(err, "SRV") // may become nil
return nil, err
}
for _, s := range srvs {
lbAddrs, err := d.resolver.LookupHost(ctx, s.Target)
if err != nil {
err = handleDNSError(err, "A") // may become nil
if err == nil {
// If there are other SRV records, look them up and ignore this
// one that does not exist.
continue
}
return nil, err
}
for _, a := range lbAddrs {
ip, err := formatIP(a)
if err != nil {
return nil, fmt.Errorf("dns: error parsing A record IP address %v: %v", a, err)
}
addr := ip + ":" + strconv.Itoa(int(s.Port))
newAddrs = append(newAddrs, resolver.Address{Addr: addr, ServerName: s.Target})
}
}
return newAddrs, nil
}
func handleDNSError(err error, lookupType string) error {
dnsErr, ok := err.(*net.DNSError)
if ok && !dnsErr.IsTimeout && !dnsErr.IsTemporary {
// Timeouts and temporary errors should be communicated to gRPC to
// attempt another DNS query (with backoff). Other errors should be
// suppressed (they may represent the absence of a TXT record).
return nil
}
if err != nil {
err = fmt.Errorf("dns: %v record lookup error: %v", lookupType, err)
logger.Info(err)
}
return err
}
func (d *dnsResolver) lookupTXT(ctx context.Context) *serviceconfig.ParseResult {
ss, err := d.resolver.LookupTXT(ctx, txtPrefix+d.host)
if err != nil {
if envconfig.TXTErrIgnore {
return nil
}
if err = handleDNSError(err, "TXT"); err != nil {
return &serviceconfig.ParseResult{Err: err}
}
return nil
}
var res string
for _, s := range ss {
res += s
}
// TXT record must have "grpc_config=" attribute in order to be used as
// service config.
if !strings.HasPrefix(res, txtAttribute) {
logger.Warningf("dns: TXT record %v missing %v attribute", res, txtAttribute)
// This is not an error; it is the equivalent of not having a service
// config.
return nil
}
sc := canaryingSC(strings.TrimPrefix(res, txtAttribute))
return d.cc.ParseServiceConfig(sc)
}
func (d *dnsResolver) lookupHost(ctx context.Context) ([]resolver.Address, error) {
addrs, err := d.resolver.LookupHost(ctx, d.host)
if err != nil {
err = handleDNSError(err, "A")
return nil, err
}
newAddrs := make([]resolver.Address, 0, len(addrs))
for _, a := range addrs {
ip, err := formatIP(a)
if err != nil {
return nil, fmt.Errorf("dns: error parsing A record IP address %v: %v", a, err)
}
addr := ip + ":" + d.port
newAddrs = append(newAddrs, resolver.Address{Addr: addr})
}
return newAddrs, nil
}
func (d *dnsResolver) lookup() (*resolver.State, error) {
ctx, cancel := context.WithTimeout(d.ctx, ResolvingTimeout)
defer cancel()
srv, srvErr := d.lookupSRV(ctx)
addrs, hostErr := d.lookupHost(ctx)
if hostErr != nil && (srvErr != nil || len(srv) == 0) {
return nil, hostErr
}
state := resolver.State{Addresses: addrs}
if len(srv) > 0 {
state = grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: srv})
}
if d.enableServiceConfig {
state.ServiceConfig = d.lookupTXT(ctx)
}
return &state, nil
}
// formatIP returns an error if addr is not a valid textual representation of
// an IP address. If addr is an IPv4 address, return the addr and error = nil.
// If addr is an IPv6 address, return the addr enclosed in square brackets and
// error = nil.
func formatIP(addr string) (string, error) {
ip, err := netip.ParseAddr(addr)
if err != nil {
return "", err
}
if ip.Is4() {
return addr, nil
}
return "[" + addr + "]", nil
}
// parseTarget takes the user input target string and default port, returns
// formatted host and port info. If target doesn't specify a port, set the port
// to be the defaultPort. If target is in IPv6 format and host-name is enclosed
// in square brackets, brackets are stripped when setting the host.
// examples:
// target: "www.google.com" defaultPort: "443" returns host: "www.google.com", port: "443"
// target: "ipv4-host:80" defaultPort: "443" returns host: "ipv4-host", port: "80"
// target: "[ipv6-host]" defaultPort: "443" returns host: "ipv6-host", port: "443"
// target: ":80" defaultPort: "443" returns host: "localhost", port: "80"
func parseTarget(target, defaultPort string) (host, port string, err error) {
if target == "" {
return "", "", internal.ErrMissingAddr
}
if _, err := netip.ParseAddr(target); err == nil {
// target is an IPv4 or IPv6(without brackets) address
return target, defaultPort, nil
}
if host, port, err = net.SplitHostPort(target); err == nil {
if port == "" {
// If the port field is empty (target ends with colon), e.g. "[::1]:",
// this is an error.
return "", "", internal.ErrEndsWithColon
}
// target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port
if host == "" {
// Keep consistent with net.Dial(): If the host is empty, as in ":80",
// the local system is assumed.
host = "localhost"
}
return host, port, nil
}
if host, port, err = net.SplitHostPort(target + ":" + defaultPort); err == nil {
// target doesn't have port
return host, port, nil
}
return "", "", fmt.Errorf("invalid target address %v, error info: %v", target, err)
}
type rawChoice struct {
ClientLanguage *[]string `json:"clientLanguage,omitempty"`
Percentage *int `json:"percentage,omitempty"`
ClientHostName *[]string `json:"clientHostName,omitempty"`
ServiceConfig *json.RawMessage `json:"serviceConfig,omitempty"`
}
func containsString(a *[]string, b string) bool {
if a == nil {
return true
}
for _, c := range *a {
if c == b {
return true
}
}
return false
}
func chosenByPercentage(a *int) bool {
if a == nil {
return true
}
return rand.IntN(100)+1 <= *a
}
func canaryingSC(js string) string {
if js == "" {
return ""
}
var rcs []rawChoice
err := json.Unmarshal([]byte(js), &rcs)
if err != nil {
logger.Warningf("dns: error parsing service config json: %v", err)
return ""
}
cliHostname, err := os.Hostname()
if err != nil {
logger.Warningf("dns: error getting client hostname: %v", err)
return ""
}
var sc string
for _, c := range rcs {
if !containsString(c.ClientLanguage, golang) ||
!chosenByPercentage(c.Percentage) ||
!containsString(c.ClientHostName, cliHostname) ||
c.ServiceConfig == nil {
continue
}
sc = string(*c.ServiceConfig)
break
}
return sc
}

View File

@@ -0,0 +1,77 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package internal contains functionality internal to the dns resolver package.
package internal
import (
"context"
"errors"
"net"
"time"
)
// NetResolver groups the methods on net.Resolver that are used by the DNS
// resolver implementation. This allows the default net.Resolver instance to be
// overridden from tests.
type NetResolver interface {
LookupHost(ctx context.Context, host string) (addrs []string, err error)
LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error)
LookupTXT(ctx context.Context, name string) (txts []string, err error)
}
var (
// ErrMissingAddr is the error returned when building a DNS resolver when
// the provided target name is empty.
ErrMissingAddr = errors.New("dns resolver: missing address")
// ErrEndsWithColon is the error returned when building a DNS resolver when
// the provided target name ends with a colon that is supposed to be the
// separator between host and port. E.g. "::" is a valid address as it is
// an IPv6 address (host only) and "[::]:" is invalid as it ends with a
// colon as the host and port separator
ErrEndsWithColon = errors.New("dns resolver: missing port after port-separator colon")
)
// The following vars are overridden from tests.
var (
// TimeAfterFunc is used by the DNS resolver to wait for the given duration
// to elapse. In non-test code, this is implemented by time.After. In test
// code, this can be used to control the amount of time the resolver is
// blocked waiting for the duration to elapse.
TimeAfterFunc func(time.Duration) <-chan time.Time
// TimeNowFunc is used by the DNS resolver to get the current time.
// In non-test code, this is implemented by time.Now. In test code,
// this can be used to control the current time for the resolver.
TimeNowFunc func() time.Time
// TimeUntilFunc is used by the DNS resolver to calculate the remaining
// wait time for re-resolution. In non-test code, this is implemented by
// time.Until. In test code, this can be used to control the remaining
// time for resolver to wait for re-resolution.
TimeUntilFunc func(time.Time) time.Duration
// NewNetResolver returns the net.Resolver instance for the given target.
NewNetResolver func(string) (NetResolver, error)
// AddressDialer is the dialer used to dial the DNS server. It accepts the
// Host portion of the URL corresponding to the user's dial target and
// returns a dial function.
AddressDialer func(address string) func(context.Context, string, string) (net.Conn, error)
)

View File

@@ -0,0 +1,64 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package passthrough implements a pass-through resolver. It sends the target
// name without scheme back to gRPC as resolved address.
package passthrough
import (
"errors"
"google.golang.org/grpc/resolver"
)
const scheme = "passthrough"
type passthroughBuilder struct{}
func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if target.Endpoint() == "" && opts.Dialer == nil {
return nil, errors.New("passthrough: received empty target in Build()")
}
r := &passthroughResolver{
target: target,
cc: cc,
}
r.start()
return r, nil
}
func (*passthroughBuilder) Scheme() string {
return scheme
}
type passthroughResolver struct {
target resolver.Target
cc resolver.ClientConn
}
func (r *passthroughResolver) start() {
r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint()}}})
}
func (*passthroughResolver) ResolveNow(resolver.ResolveNowOptions) {}
func (*passthroughResolver) Close() {}
func init() {
resolver.Register(&passthroughBuilder{})
}

View File

@@ -0,0 +1,78 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package unix implements a resolver for unix targets.
package unix
import (
"fmt"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/resolver"
)
const unixScheme = "unix"
const unixAbstractScheme = "unix-abstract"
type builder struct {
scheme string
}
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
if target.URL.Host != "" {
return nil, fmt.Errorf("invalid (non-empty) authority: %v", target.URL.Host)
}
// gRPC was parsing the dial target manually before PR #4817, and we
// switched to using url.Parse() in that PR. To avoid breaking existing
// resolver implementations we ended up stripping the leading "/" from the
// endpoint. This obviously does not work for the "unix" scheme. Hence we
// end up using the parsed URL instead.
endpoint := target.URL.Path
if endpoint == "" {
endpoint = target.URL.Opaque
}
addr := resolver.Address{Addr: endpoint}
if b.scheme == unixAbstractScheme {
// We can not prepend \0 as c++ gRPC does, as in Golang '@' is used to signify we do
// not want trailing \0 in address.
addr.Addr = "@" + addr.Addr
}
cc.UpdateState(resolver.State{Addresses: []resolver.Address{networktype.Set(addr, "unix")}})
return &nopResolver{}, nil
}
func (b *builder) Scheme() string {
return b.scheme
}
func (b *builder) OverrideAuthority(resolver.Target) string {
return "localhost"
}
type nopResolver struct {
}
func (*nopResolver) ResolveNow(resolver.ResolveNowOptions) {}
func (*nopResolver) Close() {}
func init() {
resolver.Register(&builder{scheme: unixScheme})
resolver.Register(&builder{scheme: unixAbstractScheme})
}