mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-02-08 05:11:56 -05:00
203 lines
6.1 KiB
Go
203 lines
6.1 KiB
Go
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/segmentio/kafka-go/protocol/createacls"
|
|
)
|
|
|
|
// CreateACLsRequest represents a request sent to a kafka broker to add
|
|
// new ACLs.
|
|
type CreateACLsRequest struct {
|
|
// Address of the kafka broker to send the request to.
|
|
Addr net.Addr
|
|
|
|
// List of ACL to create.
|
|
ACLs []ACLEntry
|
|
}
|
|
|
|
// CreateACLsResponse represents a response from a kafka broker to an ACL
|
|
// creation request.
|
|
type CreateACLsResponse struct {
|
|
// The amount of time that the broker throttled the request.
|
|
Throttle time.Duration
|
|
|
|
// List of errors that occurred while attempting to create
|
|
// the ACLs.
|
|
//
|
|
// The errors contain the kafka error code. Programs may use the standard
|
|
// errors.Is function to test the error against kafka error codes.
|
|
Errors []error
|
|
}
|
|
|
|
type ACLPermissionType int8
|
|
|
|
const (
|
|
ACLPermissionTypeUnknown ACLPermissionType = 0
|
|
ACLPermissionTypeAny ACLPermissionType = 1
|
|
ACLPermissionTypeDeny ACLPermissionType = 2
|
|
ACLPermissionTypeAllow ACLPermissionType = 3
|
|
)
|
|
|
|
func (apt ACLPermissionType) String() string {
|
|
mapping := map[ACLPermissionType]string{
|
|
ACLPermissionTypeUnknown: "Unknown",
|
|
ACLPermissionTypeAny: "Any",
|
|
ACLPermissionTypeDeny: "Deny",
|
|
ACLPermissionTypeAllow: "Allow",
|
|
}
|
|
s, ok := mapping[apt]
|
|
if !ok {
|
|
s = mapping[ACLPermissionTypeUnknown]
|
|
}
|
|
return s
|
|
}
|
|
|
|
// MarshalText transforms an ACLPermissionType into its string representation.
|
|
func (apt ACLPermissionType) MarshalText() ([]byte, error) {
|
|
return []byte(apt.String()), nil
|
|
}
|
|
|
|
// UnmarshalText takes a string representation of the resource type and converts it to an ACLPermissionType.
|
|
func (apt *ACLPermissionType) UnmarshalText(text []byte) error {
|
|
normalized := strings.ToLower(string(text))
|
|
mapping := map[string]ACLPermissionType{
|
|
"unknown": ACLPermissionTypeUnknown,
|
|
"any": ACLPermissionTypeAny,
|
|
"deny": ACLPermissionTypeDeny,
|
|
"allow": ACLPermissionTypeAllow,
|
|
}
|
|
parsed, ok := mapping[normalized]
|
|
if !ok {
|
|
*apt = ACLPermissionTypeUnknown
|
|
return fmt.Errorf("cannot parse %s as an ACLPermissionType", normalized)
|
|
}
|
|
*apt = parsed
|
|
return nil
|
|
}
|
|
|
|
type ACLOperationType int8
|
|
|
|
const (
|
|
ACLOperationTypeUnknown ACLOperationType = 0
|
|
ACLOperationTypeAny ACLOperationType = 1
|
|
ACLOperationTypeAll ACLOperationType = 2
|
|
ACLOperationTypeRead ACLOperationType = 3
|
|
ACLOperationTypeWrite ACLOperationType = 4
|
|
ACLOperationTypeCreate ACLOperationType = 5
|
|
ACLOperationTypeDelete ACLOperationType = 6
|
|
ACLOperationTypeAlter ACLOperationType = 7
|
|
ACLOperationTypeDescribe ACLOperationType = 8
|
|
ACLOperationTypeClusterAction ACLOperationType = 9
|
|
ACLOperationTypeDescribeConfigs ACLOperationType = 10
|
|
ACLOperationTypeAlterConfigs ACLOperationType = 11
|
|
ACLOperationTypeIdempotentWrite ACLOperationType = 12
|
|
)
|
|
|
|
func (aot ACLOperationType) String() string {
|
|
mapping := map[ACLOperationType]string{
|
|
ACLOperationTypeUnknown: "Unknown",
|
|
ACLOperationTypeAny: "Any",
|
|
ACLOperationTypeAll: "All",
|
|
ACLOperationTypeRead: "Read",
|
|
ACLOperationTypeWrite: "Write",
|
|
ACLOperationTypeCreate: "Create",
|
|
ACLOperationTypeDelete: "Delete",
|
|
ACLOperationTypeAlter: "Alter",
|
|
ACLOperationTypeDescribe: "Describe",
|
|
ACLOperationTypeClusterAction: "ClusterAction",
|
|
ACLOperationTypeDescribeConfigs: "DescribeConfigs",
|
|
ACLOperationTypeAlterConfigs: "AlterConfigs",
|
|
ACLOperationTypeIdempotentWrite: "IdempotentWrite",
|
|
}
|
|
s, ok := mapping[aot]
|
|
if !ok {
|
|
s = mapping[ACLOperationTypeUnknown]
|
|
}
|
|
return s
|
|
}
|
|
|
|
// MarshalText transforms an ACLOperationType into its string representation.
|
|
func (aot ACLOperationType) MarshalText() ([]byte, error) {
|
|
return []byte(aot.String()), nil
|
|
}
|
|
|
|
// UnmarshalText takes a string representation of the resource type and converts it to an ACLPermissionType.
|
|
func (aot *ACLOperationType) UnmarshalText(text []byte) error {
|
|
normalized := strings.ToLower(string(text))
|
|
mapping := map[string]ACLOperationType{
|
|
"unknown": ACLOperationTypeUnknown,
|
|
"any": ACLOperationTypeAny,
|
|
"all": ACLOperationTypeAll,
|
|
"read": ACLOperationTypeRead,
|
|
"write": ACLOperationTypeWrite,
|
|
"create": ACLOperationTypeCreate,
|
|
"delete": ACLOperationTypeDelete,
|
|
"alter": ACLOperationTypeAlter,
|
|
"describe": ACLOperationTypeDescribe,
|
|
"clusteraction": ACLOperationTypeClusterAction,
|
|
"describeconfigs": ACLOperationTypeDescribeConfigs,
|
|
"alterconfigs": ACLOperationTypeAlterConfigs,
|
|
"idempotentwrite": ACLOperationTypeIdempotentWrite,
|
|
}
|
|
parsed, ok := mapping[normalized]
|
|
if !ok {
|
|
*aot = ACLOperationTypeUnknown
|
|
return fmt.Errorf("cannot parse %s as an ACLOperationType", normalized)
|
|
}
|
|
*aot = parsed
|
|
return nil
|
|
|
|
}
|
|
|
|
type ACLEntry struct {
|
|
ResourceType ResourceType
|
|
ResourceName string
|
|
ResourcePatternType PatternType
|
|
Principal string
|
|
Host string
|
|
Operation ACLOperationType
|
|
PermissionType ACLPermissionType
|
|
}
|
|
|
|
// CreateACLs sends ACLs creation request to a kafka broker and returns the
|
|
// response.
|
|
func (c *Client) CreateACLs(ctx context.Context, req *CreateACLsRequest) (*CreateACLsResponse, error) {
|
|
acls := make([]createacls.RequestACLs, 0, len(req.ACLs))
|
|
|
|
for _, acl := range req.ACLs {
|
|
acls = append(acls, createacls.RequestACLs{
|
|
ResourceType: int8(acl.ResourceType),
|
|
ResourceName: acl.ResourceName,
|
|
ResourcePatternType: int8(acl.ResourcePatternType),
|
|
Principal: acl.Principal,
|
|
Host: acl.Host,
|
|
Operation: int8(acl.Operation),
|
|
PermissionType: int8(acl.PermissionType),
|
|
})
|
|
}
|
|
|
|
m, err := c.roundTrip(ctx, req.Addr, &createacls.Request{
|
|
Creations: acls,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kafka.(*Client).CreateACLs: %w", err)
|
|
}
|
|
|
|
res := m.(*createacls.Response)
|
|
ret := &CreateACLsResponse{
|
|
Throttle: makeDuration(res.ThrottleTimeMs),
|
|
Errors: make([]error, 0, len(res.Results)),
|
|
}
|
|
|
|
for _, t := range res.Results {
|
|
ret.Errors = append(ret.Errors, makeError(t.ErrorCode, t.ErrorMessage))
|
|
}
|
|
|
|
return ret, nil
|
|
}
|