k8s源码解析(4)--apiserver请求处理

apiserver请求处理

当apiserver启动后,就可以接受客户端的请求了。

  • 认证:客户端是否合法

  • 鉴权:客户端是否具备当前请求资源的权限

  • 准入控制器:提供回调钩子,资源持久化前对资源的值做改动或者验证等操作

  • 持久化:持久化到ETCD

1. 认证

请求到达apiserver后第一个是需要进行认证,辨别请求来源的身份。

前面讲过在buildGenericConfig调用了 genericapiserver.NewConfig

NewConfig创建Config结构时给BuildHandlerChainFunc字段传入DefaultBuildHandlerChain这个函数


func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
    ...
    //这就是构建认证处理器的
    handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
    ...
}

vendor/k8s.io/apiserver/pkg/endpoints/filters/authentication.go


func withAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler, apiAuds authenticator.Audiences, metrics recordMetrics) http.Handler {
 if auth == nil {
  klog.Warning("Authentication is disabled")
  return handler
 }
 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
  authenticationStart := time.Now()

  if len(apiAuds) > 0 {
   req = req.WithContext(authenticator.WithAudiences(req.Context(), apiAuds))
  }
        //执行认证逻辑的地方,如果认证失败则会返回返回失败。认证成功会把请求头中Authorization去掉,再调用里层的handler函数handler.ServeHTTP(w, req)
  resp, ok, err := auth.AuthenticateRequest(req)
  authenticationFinish := time.Now()
  defer func() {
   metrics(req.Context(), resp, ok, err, apiAuds, authenticationStart, authenticationFinish)
  }()
  if err != nil || !ok {
   if err != nil {
    klog.ErrorS(err, "Unable to authenticate the request")
   }
   failed.ServeHTTP(w, req)
   return
  }

  if !audiencesAreAcceptable(apiAuds, resp.Audiences) {
   err = fmt.Errorf("unable to match the audience: %v , accepted: %v", resp.Audiences, apiAuds)
   klog.Error(err)
   failed.ServeHTTP(w, req)
   return
  }

  // authorization header is not required anymore in case of a successful authentication.
  req.Header.Del("Authorization")

  req = req.WithContext(genericapirequest.WithUser(req.Context(), resp.User))
  handler.ServeHTTP(w, req)
 })
}

1.1 认证接口

type Request interface {
    //该方法接收客户端请求。若验证失败,bool 返回 false,验证成功,bool 返回 true,Response 中携带身份验证用户的信息,例如 Name、UID、Groups、Extra。
 AuthenticateRequest(req *http.Request) (*Response, bool, error)
}

1.2 认证实现

// AuthenticateRequest authenticates the request using a chain of authenticator.Request objects.
func (authHandler *unionAuthRequestHandler) AuthenticateRequest(req *http.Request) (*authenticator.Response, bool, error) {
 var errlist []error
    //遍历所有启用的认证方式,只有一个成功了就可以了
 for _, currAuthRequestHandler := range authHandler.Handlers {
  resp, ok, err := currAuthRequestHandler.AuthenticateRequest(req)
  if err != nil {
   if authHandler.FailOnError {
    return resp, ok, err
   }
   errlist = append(errlist, err)
   continue
  }

  if ok {
   return resp, ok, err
  }
 }

 return nil, false, utilerrors.NewAggregate(errlist)
}

各种认证方式文档:https://kubernetes.io/zh-cn/docs/reference/access-authn-authz/authentication/

2. 鉴权

授权和认证的方式类似

buildGenericConfig中通过以下代码完成Authorizer的创建

genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
 if err != nil {
  lastErr = fmt.Errorf("invalid authorization config: %v", err)
  return
 }
func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, error) {
 if len(config.AuthorizationModes) == 0 {
  return nil, nil, fmt.Errorf("at least one authorization mode must be passed")
 }

 var (
  authorizers   []authorizer.Authorizer
  ruleResolvers []authorizer.RuleResolver
 )

 for _, authorizationMode := range config.AuthorizationModes {
  // Keep cases in sync with constant list in k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes/modes.go.
  switch authorizationMode {
  case modes.ModeNode:
   node.RegisterMetrics()
   graph := node.NewGraph()
   node.AddGraphEventHandlers(
    graph,
    config.VersionedInformerFactory.Core().V1().Nodes(),
    config.VersionedInformerFactory.Core().V1().Pods(),
    config.VersionedInformerFactory.Core().V1().PersistentVolumes(),
    config.VersionedInformerFactory.Storage().V1().VolumeAttachments(),
   )
   nodeAuthorizer := node.NewAuthorizer(graph, nodeidentifier.NewDefaultNodeIdentifier(), bootstrappolicy.NodeRules())
   authorizers = append(authorizers, nodeAuthorizer)
   ruleResolvers = append(ruleResolvers, nodeAuthorizer)

  case modes.ModeAlwaysAllow:
   alwaysAllowAuthorizer := authorizerfactory.NewAlwaysAllowAuthorizer()
   authorizers = append(authorizers, alwaysAllowAuthorizer)
   ruleResolvers = append(ruleResolvers, alwaysAllowAuthorizer)
  case modes.ModeAlwaysDeny:
   alwaysDenyAuthorizer := authorizerfactory.NewAlwaysDenyAuthorizer()
   authorizers = append(authorizers, alwaysDenyAuthorizer)
   ruleResolvers = append(ruleResolvers, alwaysDenyAuthorizer)
  case modes.ModeABAC:
   abacAuthorizer, err := abac.NewFromFile(config.PolicyFile)
   if err != nil {
    return nil, nil, err
   }
   authorizers = append(authorizers, abacAuthorizer)
   ruleResolvers = append(ruleResolvers, abacAuthorizer)
  case modes.ModeWebhook:
   if config.WebhookRetryBackoff == nil {
    return nil, nil, errors.New("retry backoff parameters for authorization webhook has not been specified")
   }
   clientConfig, err := webhookutil.LoadKubeconfig(config.WebhookConfigFile, config.CustomDial)
   if err != nil {
    return nil, nil, err
   }
   webhookAuthorizer, err := webhook.New(clientConfig,
    config.WebhookVersion,
    config.WebhookCacheAuthorizedTTL,
    config.WebhookCacheUnauthorizedTTL,
    *config.WebhookRetryBackoff,
   )
   if err != nil {
    return nil, nil, err
   }
   authorizers = append(authorizers, webhookAuthorizer)
   ruleResolvers = append(ruleResolvers, webhookAuthorizer)
  case modes.ModeRBAC:
   rbacAuthorizer := rbac.New(
    &rbac.RoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().Roles().Lister()},
    &rbac.RoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().RoleBindings().Lister()},
    &rbac.ClusterRoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoles().Lister()},
    &rbac.ClusterRoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoleBindings().Lister()},
   )
   authorizers = append(authorizers, rbacAuthorizer)
   ruleResolvers = append(ruleResolvers, rbacAuthorizer)
  default:
   return nil, nil, fmt.Errorf("unknown authorization mode %s specified", authorizationMode)
  }
 }

 return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil
}

2.1 鉴权小知识

kube-apiserver 目前提供了 6 种鉴权机制:

  • AlwaysAllow:总是允许

  • AlwaysDeny:总是拒绝

  • ABAC:基于属性的访问控制(Attribute-Based Access Control)

  • Node:节点鉴权,专门鉴权给 kubelet 发出的 API 请求

  • RBAC:基于角色的访问控制(Role-Based Access Control)

  • Webhook:基于 webhook 的一种 HTTP 回调机制,可以进行远程鉴权管理

鉴权中有三个概念:

  • Decision:决策状态

  • Authorizer:鉴权接口

  • RuleResolver:规则解析器

2.1.1 Decision:决策状态

Decision 决策状态类似于认证中的 true 和 false,用于决定是否鉴权成功。 鉴权支持三种 Decision 决策状态,例如鉴权成功,则返回 DecisionAllow,代码如下:

//staging/src/k8s.io/apiserver/pkg/authorization/authorizer/interfaces.go
type Decision int

const (
 DecisionDeny Decision = iota
 DecisionAllow
 DecisionNoOpinion
)
  • DecisionDeny:拒绝该操作

  • DecisionAllow:允许该操作

  • DecisionNoOpinion:表示无明显意见允许或拒绝,继续执行下一个鉴权模块

2.1.2 Authorizer:鉴权接口

每个鉴权模块都要实现该接口的方法,代码如下:

//vendor/k8s.io/apiserver/pkg/authorization/authorizer/interfaces.go
type Authorizer interface {
 Authorize(ctx context.Context, a Attributes) (authorized Decision, reason string, err error)
}

Authorizer 接口定义了 Authorize 方法,该方法接收一个 Attribute 参数。

Attributes 是决定鉴权模块从 HTTP 请求中获取鉴权信息方法的参数,它是一个方法集合的接口, 例如 GetUser、GetVerb、GetNamespace、GetResource 等鉴权信息方法。 如果鉴权成功,Decision 状态变成 DecisionAllow, 如果鉴权失败,Decision 状态变成 DecisionDeny,并返回失败的原因。

type Attributes interface {
 // GetUser returns the user.Info object to authorize
 GetUser() user.Info

 // GetVerb returns the kube verb associated with API requests (this includes get, list, watch, create, update, patch, delete, deletecollection, and proxy),
 // or the lowercased HTTP verb associated with non-API requests (this includes get, put, post, patch, and delete)
 GetVerb() string

 // When IsReadOnly() == true, the request has no side effects, other than
 // caching, logging, and other incidentals.
 IsReadOnly() bool

 // The namespace of the object, if a request is for a REST object.
 GetNamespace() string

 // The kind of object, if a request is for a REST object.
 GetResource() string

 // GetSubresource returns the subresource being requested, if present
 GetSubresource() string

 // GetName returns the name of the object as parsed off the request.  This will not be present for all request types, but
 // will be present for: get, update, delete
 GetName() string

 // The group of the resource, if a request is for a REST object.
 GetAPIGroup() string

 // GetAPIVersion returns the version of the group requested, if a request is for a REST object.
 GetAPIVersion() string

 // IsResourceRequest returns true for requests to API resources, like /api/v1/nodes,
 // and false for non-resource endpoints like /api, /healthz
 IsResourceRequest() bool

 // GetPath returns the path of the request
 GetPath() string
}

2.1.3 RuleResolver:规则解析器

鉴权模块通过 RuleResolver 解析规则,定义如下:

//staging/src/k8s.io/apiserver/pkg/authorization/authorizer/interfaces.go
type RuleResolver interface {
 // RulesFor get the list of cluster wide rules, the list of rules in the specific namespace, incomplete status and errors.
 RulesFor(user user.Info, namespace string) ([]ResourceRuleInfo, []NonResourceRuleInfo, bool, error)
}

RuleResolver 接口定义的 RulesFor 方法,所有鉴权模块都要实现。 RulesFor 方法接受 user 用户信息和 namespace 命名空间参数,解析出规则列表并返回。

规则列表分成两种:

  • ResourceRuleInfo:资源类型的规则列表,例如 /api/v1/pods 的资源接口

  • NonResourceRuleInfo:非资源类型的规则列表,例如 /healthz 的非资源接口

以 ResourceRuleInfo 资源类型为例,定义如下:

//staging/src/k8s.io/apiserver/pkg/authorization/authorizer/rule.go
type DefaultResourceRuleInfo struct {
 Verbs         []string
 APIGroups     []string
 Resources     []string
 ResourceNames []string
}

比如:

resourcesRules: []authorizer.ResourceRuleInfo {
    &authorizer.DefaultResourceRuleInfo {
        // 通配符(*)表示匹配所有,该规则表示用户对所有资源版本的 Pod 拥有所有操作权限, 即:get、list、watch、create、update、patch、delete、deletecollection
        Verbs:     []string{"*"}
        APIGroups: []string{"*"}
        Resources: []string{"pods"}
    }
}

2.2 鉴权分析


func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
    ...
    //这就是构建鉴权处理器的
   handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
 
    ...
}

// WithAuthorizationCheck passes all authorized requests on to handler, and returns a forbidden error otherwise.
func WithAuthorization(handler http.Handler, a authorizer.Authorizer, s runtime.NegotiatedSerializer) http.Handler {
 if a == nil {
  klog.Warning("Authorization is disabled")
  return handler
 }
 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
  ctx := req.Context()

  attributes, err := GetAuthorizerAttributes(ctx)
  if err != nil {
   responsewriters.InternalError(w, req, err)
   return
  }
        //鉴权
  authorized, reason, err := a.Authorize(ctx, attributes)
  // an authorizer like RBAC could encounter evaluation errors and still allow the request, so authorizer decision is checked before error here.
  if authorized == authorizer.DecisionAllow {
   audit.AddAuditAnnotations(ctx,
    decisionAnnotationKey, decisionAllow,
    reasonAnnotationKey, reason)
   handler.ServeHTTP(w, req)
   return
  }
  if err != nil {
   audit.AddAuditAnnotation(ctx, reasonAnnotationKey, reasonError)
   responsewriters.InternalError(w, req, err)
   return
  }

  klog.V(4).InfoS("Forbidden", "URI", req.RequestURI, "Reason", reason)
  audit.AddAuditAnnotations(ctx,
   decisionAnnotationKey, decisionForbid,
   reasonAnnotationKey, reason)
  responsewriters.Forbidden(ctx, attributes, w, req, reason, s)
 })
}

// Authorizes against a chain of authorizer.Authorizer objects and returns nil if successful and returns error if unsuccessful
func (authzHandler unionAuthzHandler) Authorize(ctx context.Context, a authorizer.Attributes) (authorizer.Decision, string, error) {
 var (
  errlist    []error
  reasonlist []string
 )
 //同样的道理,如果kube-apiserver 启用了多种鉴权方式,则会遍历所有鉴权模块,进行鉴权
 for _, currAuthzHandler := range authzHandler {
  decision, reason, err := currAuthzHandler.Authorize(ctx, a)

  if err != nil {
   errlist = append(errlist, err)
  }
  if len(reason) != 0 {
   reasonlist = append(reasonlist, reason)
  }
  switch decision {
  case authorizer.DecisionAllow, authorizer.DecisionDeny:
   return decision, reason, err
            //这里我们可以看出 上一个鉴权失败会继续走下一个
  case authorizer.DecisionNoOpinion:
   // continue to the next authorizer
  }
 }

 return authorizer.DecisionNoOpinion, strings.Join(reasonlist, "\n"), utilerrors.NewAggregate(errlist)
}

比如:

  • --authorization-mode=ABAC:启用 ABAC 鉴权

  • --authorization-mode=RBAC: 启用RBAC鉴权

  • --authorization-mode = Node : 启用 Node 鉴权

  • --authorization-mode=Webhook:启用 webhook 鉴权

具体的鉴权详情,通过官方文档了解即可: https://kubernetes.io/zh-cn/docs/reference/access-authn-authz/authorization/

3. 准入控制器

准入控制器是在对象持久化之前用于对 Kubernetes API Server 的请求进行拦截的代码段。

在 Kubernetes apiserver 中包含两个特殊的准入控制器:MutatingAdmissionWebhook和 ValidatingAdmissionWebhook。

  • Mutate,可修改提交上来的资源(Mutate里面也可以包含验证操作)

  • Validate, 是对提交上来的资源进行验证

buildGenericConfig通过以下代码创建:

func buildGenericConfig(
 s *options.ServerRunOptions,
 proxyTransport *http.Transport,
){
    ...
 err = s.Admission.ApplyTo(
  genericConfig,
  versionedInformers,
  kubeClientConfig,
  utilfeature.DefaultFeatureGate,
  pluginInitializers...)
 if err != nil {
  lastErr = fmt.Errorf("failed to initialize admission: %v", err)
  return
 }
    ...
}

最终调用:

func (a *AdmissionOptions) ApplyTo(
 c *server.Config,
 informers informers.SharedInformerFactory,
 kubeAPIServerClientConfig *rest.Config,
 features featuregate.FeatureGate,
 pluginInitializers ...admission.PluginInitializer,
) error {
 if a == nil {
  return nil
 }

 // Admission depends on CoreAPI to set SharedInformerFactory and ClientConfig.
 if informers == nil {
  return fmt.Errorf("admission depends on a Kubernetes core API shared informer, it cannot be nil")
 }

 pluginNames := a.enabledPluginNames()
 //获取各个准入控制器的provider
 pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(pluginNames, a.ConfigFile, configScheme)
 if err != nil {
  return fmt.Errorf("failed to read plugin config: %v", err)
 }

 clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig)
 if err != nil {
  return err
 }
 genericInitializer := initializer.New(clientset, informers, c.Authorization.Authorizer, features)
 initializersChain := admission.PluginInitializers{}
 pluginInitializers = append(pluginInitializers, genericInitializer)
 initializersChain = append(initializersChain, pluginInitializers...)
 //将准入控制器集合串成一个admissionChain,类似于之前处理认证与授权一样的方式
 admissionChain, err := a.Plugins.NewFromPlugins(pluginNames, pluginsConfigProvider, initializersChain, a.Decorators)
 if err != nil {
  return err
 }
 //可统计指标的wrapper
 c.AdmissionControl = admissionmetrics.WithStepMetrics(admissionChain)
 return nil
}

func (ps *Plugins) NewFromPlugins(pluginNames []string, configProvider ConfigProvider, pluginInitializer PluginInitializer, decorator Decorator) (Interface, error) {
 handlers := []Interface{}
 mutationPlugins := []string{}
 validationPlugins := []string{}
 for _, pluginName := range pluginNames {
  pluginConfig, err := configProvider.ConfigFor(pluginName)
  if err != nil {
   return nil, err
  }
  //初始化插件
  plugin, err := ps.InitPlugin(pluginName, pluginConfig, pluginInitializer)
  if err != nil {
   return nil, err
  }
  if plugin != nil {
   if decorator != nil {
    handlers = append(handlers, decorator.Decorate(plugin, pluginName))
   } else {
    handlers = append(handlers, plugin)
   }

   if _, ok := plugin.(MutationInterface); ok {
    mutationPlugins = append(mutationPlugins, pluginName)
   }
   if _, ok := plugin.(ValidationInterface); ok {
    validationPlugins = append(validationPlugins, pluginName)
   }
  }
 }
 if len(mutationPlugins) != 0 {
  klog.Infof("Loaded %d mutating admission controller(s) successfully in the following order: %s.", len(mutationPlugins), strings.Join(mutationPlugins, ","))
 }
 if len(validationPlugins) != 0 {
  klog.Infof("Loaded %d validating admission controller(s) successfully in the following order: %s.", len(validationPlugins), strings.Join(validationPlugins, ","))
 }
 return newReinvocationHandler(chainAdmissionHandler(handlers)), nil
}

// InitPlugin creates an instance of the named interface.
func (ps *Plugins) InitPlugin(name string, config io.Reader, pluginInitializer PluginInitializer) (Interface, error) {
 if name == "" {
  klog.Info("No admission plugin specified.")
  return nil, nil
 }
 //获取插件
 plugin, found, err := ps.getPlugin(name, config)
 if err != nil {
  return nil, fmt.Errorf("couldn't init admission plugin %q: %v", name, err)
 }
 if !found {
  return nil, fmt.Errorf("unknown admission plugin: %s", name)
 }

 pluginInitializer.Initialize(plugin)
 // ensure that plugins have been properly initialized
 if err := ValidateInitialization(plugin); err != nil {
  return nil, fmt.Errorf("failed to initialize admission plugin %q: %v", name, err)
 }

 return plugin, nil
}
func (ps *Plugins) getPlugin(name string, config io.Reader) (Interface, bool, error) {
 ps.lock.Lock()
 defer ps.lock.Unlock()
    //获取插件
 f, found := ps.registry[name]
 if !found {
  return nil, false, nil
 }

 config1, config2, err := splitStream(config)
 if err != nil {
  return nil, true, err
 }
 if !PluginEnabledFn(name, config1) {
  return nil, true, nil
 }

 ret, err := f(config2)
 return ret, true, err
}

3.1 初始化

ps.registry这个是在哪初始化的呢?

//cmd/kube-apiserver/app/server.go
// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {
 s := options.NewServerRunOptions()
    ...
}

// NewServerRunOptions creates a new ServerRunOptions object with default parameters
func NewServerRunOptions() *ServerRunOptions {
 s := ServerRunOptions{
  GenericServerRunOptions: genericoptions.NewServerRunOptions(),
  Etcd:                    genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, nil)),
  SecureServing:           kubeoptions.NewSecureServingOptions(),
  Audit:                   genericoptions.NewAuditOptions(),
  Features:                genericoptions.NewFeatureOptions(),
        //new Admission options
  Admission:               kubeoptions.NewAdmissionOptions(),
    }
    ...
}
func NewAdmissionOptions() *AdmissionOptions {
 options := genericoptions.NewAdmissionOptions()
 // register all admission plugins
 RegisterAllAdmissionPlugins(options.Plugins)
}
func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {
    //注册了很多plugin
}
//准入机制的定义 所有的plugin
var AllOrderedPlugins = []string{
 admit.PluginName,                        // AlwaysAdmit
 autoprovision.PluginName,                // NamespaceAutoProvision
 lifecycle.PluginName,                    // NamespaceLifecycle
 exists.PluginName,                       // NamespaceExists
 scdeny.PluginName,                       // SecurityContextDeny
 antiaffinity.PluginName,                 // LimitPodHardAntiAffinityTopology
 limitranger.PluginName,                  // LimitRanger
 serviceaccount.PluginName,               // ServiceAccount
 noderestriction.PluginName,              // NodeRestriction
 nodetaint.PluginName,                    // TaintNodesByCondition
 alwayspullimages.PluginName,             // AlwaysPullImages
 imagepolicy.PluginName,                  // ImagePolicyWebhook
 podsecurity.PluginName,                  // PodSecurity - before PodSecurityPolicy so audit/warn get exercised even if PodSecurityPolicy denies
 podsecuritypolicy.PluginName,            // PodSecurityPolicy
 podnodeselector.PluginName,              // PodNodeSelector
 podpriority.PluginName,                  // Priority
 defaulttolerationseconds.PluginName,     // DefaultTolerationSeconds
 podtolerationrestriction.PluginName,     // PodTolerationRestriction
 eventratelimit.PluginName,               // EventRateLimit
 extendedresourcetoleration.PluginName,   // ExtendedResourceToleration
 label.PluginName,                        // PersistentVolumeLabel
 setdefault.PluginName,                   // DefaultStorageClass
 storageobjectinuseprotection.PluginName, // StorageObjectInUseProtection
 gc.PluginName,                           // OwnerReferencesPermissionEnforcement
 resize.PluginName,                       // PersistentVolumeClaimResize
 runtimeclass.PluginName,                 // RuntimeClass
 certapproval.PluginName,                 // CertificateApproval
 certsigning.PluginName,                  // CertificateSigning
 certsubjectrestriction.PluginName,       // CertificateSubjectRestriction
 defaultingressclass.PluginName,          // DefaultIngressClass
 denyserviceexternalips.PluginName,       // DenyServiceExternalIPs

 // new admission plugins should generally be inserted above here
 // webhook, resourcequota, and deny plugins must go at the end

 mutatingwebhook.PluginName,   // MutatingAdmissionWebhook
 validatingwebhook.PluginName, // ValidatingAdmissionWebhook
 resourcequota.PluginName,     // ResourceQuota
 deny.PluginName,              // AlwaysDeny
}

4. handler

前面我们讲过代码最终是通过installer.Install将handler和web服务进行绑定,最终实现访问rest api可以请求到对应的hander。

在其中有个registerResourceHandlers方法,通过此方法来进行的注册。

//这个方法比较长

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
 ...
}

这个方法有三个入参

  • 代表URL的path

  • 资源存储相关的类storage

  • 用于存放路由的go-rest对象webservice

以pod举例:

//pod是命名空间级别的资源
case "POST": // Create a resource.
   var handler restful.RouteFunction
   if isNamedCreater {
    handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
   } else {
    handler = restfulCreateResource(creater, reqScope, admit)
   }
func restfulCreateNamedResource(r rest.NamedCreater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
 return func(req *restful.Request, res *restful.Response) {
  handlers.CreateNamedResource(r, &scope, admit)(res.ResponseWriter, req.Request)
 }
}
// CreateNamedResource returns a function that will handle a resource creation with name.
func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
 return createHandler(r, scope, admission, true)
}

func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
    return func(w http.ResponseWriter, req *http.Request) {
        //从请求中获取资源的namespace,name,GV等信息
        namespace, name, err := scope.Namer.Name(req)
        gv := scope.Kind.GroupVersion()
        //获取反序列化器,
        decodeSerializer := s.Serializer
  decoder := scope.Serializer.DecoderToVersion(decodeSerializer, scope.HubGroupVersion)
        //将body的数据反序列化为runtime.Object
        obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
        //调用storage的create,同时传入Validate准入控制器,准备持久化到Etcd
        requestFunc := func() (runtime.Object, error) {
   return r.Create(
    ctx,
    name,
    obj,
    rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
    options,
   )
  }
        result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
   if scope.FieldManager != nil {
    liveObj, err := scope.Creater.New(scope.Kind)
    if err != nil {
     return nil, fmt.Errorf("failed to create new object (Create for %v): %v", scope.Kind, err)
    }
    obj = scope.FieldManager.UpdateNoErrors(liveObj, obj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
    admit = fieldmanager.NewManagedFieldsValidatingAdmissionController(admit)
   }
            //执行mutating准入控制器
   if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
    if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
     return nil, err
    }
   }
   // Dedup owner references again after mutating admission happens
   dedupOwnerReferencesAndAddWarning(obj, req.Context(), true)
   result, err := requestFunc()
   // If the object wasn't committed to storage because it's serialized size was too large,
   // it is safe to remove managedFields (which can be large) and try again.
   if isTooLargeError(err) {
    if accessor, accessorErr := meta.Accessor(obj); accessorErr == nil {
     accessor.SetManagedFields(nil)
     result, err = requestFunc()
    }
   }
   return result, err
  })
    }
}

5. 持久化到ETCD

通过r.Create 我们最终能找到这个接口:

type Creater interface {
 // New returns an empty object that can be used with Create after request data has been put into it.
 // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
 New() runtime.Object

 // Create creates a new version of a resource.
 Create(ctx context.Context, obj runtime.Object, createValidation ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error)
}

看其实现,我们可以找到比如pod的存储实现:

//pkg/registry/core/pod/storage/storage.go
// REST implements a RESTStorage for pods
type REST struct {
 *genericregistry.Store
 proxyTransport http.RoundTripper
}

func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
 var finishCreate FinishFunc = finishNothing

 if e.BeginCreate != nil {
  fn, err := e.BeginCreate(ctx, obj, options)
  if err != nil {
   return nil, err
  }
  finishCreate = fn
  defer func() {
   finishCreate(ctx, false)
  }()
 }

 if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
  return nil, err
 }
 // at this point we have a fully formed object.  It is time to call the validators that the apiserver
 // handling chain wants to enforce.
 if createValidation != nil {
        //调用了validate准入控制器验证资源
  if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
   return nil, err
  }
 }
 ////生成name,key等信息用于后续持久化到Etcd
 name, err := e.ObjectNameFunc(obj)
 if err != nil {
  return nil, err
 }
 key, err := e.KeyFunc(ctx, name)
 if err != nil {
  return nil, err
 }
 qualifiedResource := e.qualifiedResourceFromContext(ctx)
 ttl, err := e.calculateTTL(obj, 0, false)
 if err != nil {
  return nil, err
 }
    //创建一个新的空的资源用于成功时返回结果
 out := e.NewFunc()
    ////调用storage的Create,准备持久化到Etcd
 //如果持久化成功,out里面就会填上持久化后的所有信息到里面
 if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
  err = storeerr.InterpretCreateError(err, qualifiedResource, name)
  err = rest.CheckGeneratedNameError(ctx, e.CreateStrategy, err, obj)
  if !apierrors.IsAlreadyExists(err) {
   return nil, err
  }
  if errGet := e.Storage.Get(ctx, key, storage.GetOptions{}, out); errGet != nil {
   return nil, err
  }
  accessor, errGetAcc := meta.Accessor(out)
  if errGetAcc != nil {
   return nil, err
  }
  if accessor.GetDeletionTimestamp() != nil {
   msg := &err.(*apierrors.StatusError).ErrStatus.Message
   *msg = fmt.Sprintf("object is being deleted: %s", *msg)
  }
  return nil, err
 }
 // The operation has succeeded.  Call the finish function if there is one,
 // and then make sure the defer doesn't call it again.
 fn := finishCreate
 finishCreate = finishNothing
 fn(ctx, true)

 if e.AfterCreate != nil {
  e.AfterCreate(out, options)
 }
 if e.Decorator != nil {
  e.Decorator(out)
 }
 return out, nil
}

// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
 preparedKey, err := s.prepareKey(key)
 if err != nil {
  return err
 }

 if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
  return errors.New("resourceVersion should not be set on objects to be created")
 }
 if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
  return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
 }
 data, err := runtime.Encode(s.codec, obj)
 if err != nil {
  return err
 }

 opts, err := s.ttlOpts(ctx, int64(ttl))
 if err != nil {
  return err
 }
 //将资源转换成适合存储的格式
 newData, err := s.transformer.TransformToStorage(ctx, data, authenticatedDataString(preparedKey))
 if err != nil {
  return storage.NewInternalError(err.Error())
 }

 startTime := time.Now()
    //检查资源是否已经存在了
 txnResp, err := s.client.KV.Txn(ctx).If(
  notFound(preparedKey),
 ).Then(
        //不存在才调用Put把资源存进去
  clientv3.OpPut(preparedKey, string(newData), opts...),
 ).Commit()
 metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
 if err != nil {
  return err
 }
 if !txnResp.Succeeded {
  return storage.NewKeyExistsError(preparedKey, 0)
 }

 if out != nil {
        //转换响应结果
  putResp := txnResp.Responses[0].GetResponsePut()
  return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
 }
 return nil
}


k8s源码解析(4)--apiserver请求处理
http://47.123.5.226:8090//archives/k8syuan-ma-jie-xi-4---apiserverqing-qiu-chu-li
作者
pony
发布于
2024年05月09日
许可协议