Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Second draft: configmiddleware and extensionlimiter #12633

Closed
wants to merge 1 commit into from

Conversation

jmacd
Copy link
Contributor

@jmacd jmacd commented Mar 13, 2025

Description

Second draft for Limiter extension w/ Middleware config as discussed in #12603 incorporating feedback from @axw and @bogdandrutu and following @dmitryax's example in #12574.

Relative to the first draft, this adds:

  • Settings field passed to Acquire() for signal kind, can be extended w/ caller's component ID
  • Weight field passed to Acquire() supports limiting by request count, item count, and bytes count
  • Middleware have multiple types as an indicator of what they do.

As an example (and to emphasize that previous examples were incorrect), note that to follow the configauth pattern here means each entry in a middleware list has a nested field.

receivers:
  otlp:
    protocols:
      grpc:
        // ...
        middleware:
        - interceptor: blockinterceptor
        - limiter: ratelimiter
        - limiter: admissionlimiter
        - interceptor: decoratorinterceptor

If we don't follow this example, each of the "limiter" and "interecptor" fields above will become "middleware" which feels not right.

Link to tracking issue

Part of #7441
Part of #9591
Part of #12603

Testing

TODO

Documentation

👍

Comment on lines +41 to +45
// LimiterID specifies the name of a limiter extension to be used.
LimiterID component.ID `mapstructure:"limiter,omitempty"`

// InterceptorID specifies the name of an interceptor extension to be used.
InterceptorID component.ID `mapstructure:"interceptor,omitempty"`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to distinguish between them? Can we have only one ID, and when we check for that extension, if it is a limiter we "convert" it to interceptor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about the case of a scraper-type receiver, which is able to call a limiter in various ways but cannot be configured with interceptors. Then, I would expect to see

receivers:
  filescraper: # it reads and re-reads a file, so neither gRPC or HTTP interceptors apply
    limiters: # config type is []configmiddleware.Middleware
      - limiter: xyz
      - limiter: abc

However, it would be an error to specify an interceptor-type (e.g., - interceptor: decorator) in this context. That's why I was thinking the distinction would be helpful.

The scraper would be able to request from limiter(s) for request count, item count, and byte-size quantities.

Yes, if we request an interceptor and get a limiter, there can be an automatic conversion. However, if you request a limiter and get an interceptor, it can simply be an error.


// Settings describe additional details of the request.
type Settings struct {
// Kind describes which signal is being used. This field permits

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then use pipeline.Signal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! I did mean pipeline.Signal and I'm not sure if component.Kind will be useful. For limiters, I would encourage receivers to perform limiting. For interceptors, it could be meaningful: e.g., a Prometheus receiver uses HTTP clients for receiving, while an OTLP exporter uses HTTP clients for exporting, maybe. I'll leave it at Signal for now.

Comment on lines +34 to +36
// set; if a field is zero, no limit will apply. Receivers are meant
// to call a limiter one or more times as this information becomes
// available.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit confused about the "one or more times". How do a limiter deal with that? Let's say we want a limiter to do N concurrent requests, how do I know that is the same request? Do you expect to call Done before calling second time?

// is known, the receiver should call the limiter again:
//
// rel, err := Acquire(ctx, Settings{...}, Weight{
// Requests: 0,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why request is 0?

// rel, err := Acquire(ctx, Settings{...}, Weight{
// Requests: 0,
// Records: recordNum,
// Bytes: len(uncompressedData) - len(compressedData),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be a sort of "sum" involved here, which is not clear from the comments.

// of the two calls. On the other hand, for this example, if the
// limiter is based on bytes there will be two real calls to the
// limiter, to account for the two non-zero Bytes fields.
type Weight struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm finding it hard to reason about this. Are they all part of the same limit? It seems to me we're looking at multiple distinct limits here, which would be applied in order:

  1. HTTP/gRPC requests
  2. HTTP/gRPC request body size, i.e. potentially-compressed bytes
  3. Signal-specific record count
  4. Estimated size of decoded signals

Would it be simpler to have a single numeric weight, and distinct limiters for each unit? e.g.

type Limiter interface {
	// Acquire requests admission of a predefined unit, such as "requests", "records", or "bytes".
	Acquire(ctx context.Context, settings Settings, n int64) (ReleaseFunc, error)
}

Then you would configure a separate limiter for each unit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@axw Thank you. Between your comment and the three above @sfc-gh-bdrutu (cc @bogdandrutu 🤣), I realize that more work is needed.

@axw as you pointed out here, there are 4 ways we can limit a pipeline at the receiver, including compressed body size, request number, item number, and "resident memory size". I am excluding "(uncompressed) body size" for reasons discussed below: mainly that "resident memory size" is better measurement. Instead of having a Weight with multiple fields, following your suggestion, we can have a single-valued weight with an enum indicating which kind of weight it is. What I would like to preserve, is that callers can call the limiter fewer than 4 times.

@bogdandrutu About your questions "why 0?", "why one or more times?". I think in most cases a limiter will be called once or twice. In some cases, we can derive all four values from a single interceptor (especially OTLP signals); in other cases, the interceptor may report compressed bytes and request count, but a later stage computes the item-count and resident-memory-size.

In pseudo-code, I see this looking like:

type Variety int

const (
    CompressedBytesCount Variety = iota
    RequestCount
    ItemCount
    ResidentBytesCount
)

type Weight struct {
    Variety Variety
    Value uint64
}

type Limiter interface {
    Acquire(ctx context.Context, settings Settings, weights []Weight) (ReleaseFunc, error)
}

Some details, as I understand them:

An interceptor middleware can be used to measure and limit compressed network bytes or request count.

A limiter extension can be used to measure and limit item count and resident memory size.

Note: Uncompressed network bytes, as calculated by stream interceptors in particular, is not always useful, especially not in the otelarrow case I've mentioned in the past. For a streaming protocol, the "uncompressed" size as seen by gRPC is not a good estimate for the uncompressed size in an OTLP representation.

As an aside, OTel-Arrow exporter and receiver have integrated middleware used for monitoring compression rate, which requires computing a ratio of compressed to uncompressed bytes. For the reason stated above, we use different middleware configuration for the unary and stream cases (i.e., OTLP vs. OTAP); in some cases, uncompressed bytes is reported from gRPC, in other cases it is reported explicitly using resident memory size. Roughly speaking, this demonstrates how we can derive a compression-rate signal from the same list of weights presented to the limiter: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/2042f437c45db1f53694bd574c2bfa33ec9dae94/internal/otelarrow/netstats/netstats.go#L65. I see a way to address #6638 using the limitermiddlewareextension, consequently: the limiter sees both compressed bytes and resident memory size signals, i.e. the necessary inputs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can have a single-valued weight with an enum indicating which kind of weight it is

Is it important to have an enum with predefined/restricted units? Would it make sense to allow arbitrary units, specified by the caller? Then you could have a processor that rate limits by some arbitrary properties of the data. This may be a bit contrived though.

Anyway, essentially what I mean is change Weight to look like:

type Weight struct {
    Variety string
    Value uint64
}

There may be some constants like "bytes" or "records" defined for convenience, but limiter implementations would be expected to handle arbitrary units/weight varieties. I think it make make it a little simpler to grok: there's nothing special about the unit, it exists only to distinguish one rate limit from another.

FWIW in our rate limiter processor we use the processor component ID and client metadata keys for the identity of the rate limit: https://github.com/elastic/opentelemetry-collector-components/blob/9cf1b1ae7ee37042c5dc5a5c74c10fe205be9f23/processor/ratelimitprocessor/gubernator.go#L92-L93

You can then have a separate instance of the processor for each unit (e.g. "ratelimit/records"), so the unit effectively becomes part of the rate limit identity. That won't work if you want to allow acquiring multiple resources at once though of course.

}

// GetLimiter requests to locate a named limiter extension.
func (m Middleware) GetLimiter(_ context.Context, extensions map[component.ID]component.Component) (extensionlimiter.Limiter, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider creating a middleware implementation that uses a limiter, rather than coupling middleware to the limiter interface? I mean, as an alternative, we could introduce a package like extension/limitermiddlewareextension. Then you would configure the collector like:

extensions:
  # Used with gRPC traffic, consults GC statistics.
  memory_limiter/cold
    request_limit_mib: 100
    waiting_limit_mib: 10

  # Used with HTTP traffic, counts request bytes in flight.
  admission_limiter/warm:
    request_limit_mib: 10
    waiting_limit_mib: 10

  limitermiddleware/memory:
    limiter: memory_limiter_cold

  limitermiddleware/admission:
    limiter: admission_limiter/warm

receivers:
  otlp:
    protocols:
      http:
        # ...
        middleware:
           - limitermiddleware/admission
      grpc:
        # ...
        middleware:
          - limitermiddleware/memory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@axw I like this idea for the way it separates limiters and middleware.

I've referred to the case of otelarrow receiver, which uses both gRPC unary (for OTLP) and gRPC stream (for OTAP). I would use a limitermiddleware at the gRPC level for the unary case, and for the stream case I would call the configured limiter directly.

Copy link
Contributor Author

@jmacd jmacd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you reviewers. I will close this PR in 24 hours. I will open another with the idea described first by @axw below, which is (1) to separate limiters from middleware, with a dedicated limitermiddleware extension to apply limiters as middleware, (2) to use a slice of weights in the Acquire function(), giving receivers flexibility to handle some compressed-bytes and request-count limits in interceptors while counting items and resident memory size at the receiver level.

Comment on lines +41 to +45
// LimiterID specifies the name of a limiter extension to be used.
LimiterID component.ID `mapstructure:"limiter,omitempty"`

// InterceptorID specifies the name of an interceptor extension to be used.
InterceptorID component.ID `mapstructure:"interceptor,omitempty"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about the case of a scraper-type receiver, which is able to call a limiter in various ways but cannot be configured with interceptors. Then, I would expect to see

receivers:
  filescraper: # it reads and re-reads a file, so neither gRPC or HTTP interceptors apply
    limiters: # config type is []configmiddleware.Middleware
      - limiter: xyz
      - limiter: abc

However, it would be an error to specify an interceptor-type (e.g., - interceptor: decorator) in this context. That's why I was thinking the distinction would be helpful.

The scraper would be able to request from limiter(s) for request count, item count, and byte-size quantities.

Yes, if we request an interceptor and get a limiter, there can be an automatic conversion. However, if you request a limiter and get an interceptor, it can simply be an error.

}

// GetLimiter requests to locate a named limiter extension.
func (m Middleware) GetLimiter(_ context.Context, extensions map[component.ID]component.Component) (extensionlimiter.Limiter, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@axw I like this idea for the way it separates limiters and middleware.

I've referred to the case of otelarrow receiver, which uses both gRPC unary (for OTLP) and gRPC stream (for OTAP). I would use a limitermiddleware at the gRPC level for the unary case, and for the stream case I would call the configured limiter directly.


// Settings describe additional details of the request.
type Settings struct {
// Kind describes which signal is being used. This field permits
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! I did mean pipeline.Signal and I'm not sure if component.Kind will be useful. For limiters, I would encourage receivers to perform limiting. For interceptors, it could be meaningful: e.g., a Prometheus receiver uses HTTP clients for receiving, while an OTLP exporter uses HTTP clients for exporting, maybe. I'll leave it at Signal for now.

// of the two calls. On the other hand, for this example, if the
// limiter is based on bytes there will be two real calls to the
// limiter, to account for the two non-zero Bytes fields.
type Weight struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@axw Thank you. Between your comment and the three above @sfc-gh-bdrutu (cc @bogdandrutu 🤣), I realize that more work is needed.

@axw as you pointed out here, there are 4 ways we can limit a pipeline at the receiver, including compressed body size, request number, item number, and "resident memory size". I am excluding "(uncompressed) body size" for reasons discussed below: mainly that "resident memory size" is better measurement. Instead of having a Weight with multiple fields, following your suggestion, we can have a single-valued weight with an enum indicating which kind of weight it is. What I would like to preserve, is that callers can call the limiter fewer than 4 times.

@bogdandrutu About your questions "why 0?", "why one or more times?". I think in most cases a limiter will be called once or twice. In some cases, we can derive all four values from a single interceptor (especially OTLP signals); in other cases, the interceptor may report compressed bytes and request count, but a later stage computes the item-count and resident-memory-size.

In pseudo-code, I see this looking like:

type Variety int

const (
    CompressedBytesCount Variety = iota
    RequestCount
    ItemCount
    ResidentBytesCount
)

type Weight struct {
    Variety Variety
    Value uint64
}

type Limiter interface {
    Acquire(ctx context.Context, settings Settings, weights []Weight) (ReleaseFunc, error)
}

Some details, as I understand them:

An interceptor middleware can be used to measure and limit compressed network bytes or request count.

A limiter extension can be used to measure and limit item count and resident memory size.

Note: Uncompressed network bytes, as calculated by stream interceptors in particular, is not always useful, especially not in the otelarrow case I've mentioned in the past. For a streaming protocol, the "uncompressed" size as seen by gRPC is not a good estimate for the uncompressed size in an OTLP representation.

As an aside, OTel-Arrow exporter and receiver have integrated middleware used for monitoring compression rate, which requires computing a ratio of compressed to uncompressed bytes. For the reason stated above, we use different middleware configuration for the unary and stream cases (i.e., OTLP vs. OTAP); in some cases, uncompressed bytes is reported from gRPC, in other cases it is reported explicitly using resident memory size. Roughly speaking, this demonstrates how we can derive a compression-rate signal from the same list of weights presented to the limiter: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/2042f437c45db1f53694bd574c2bfa33ec9dae94/internal/otelarrow/netstats/netstats.go#L65. I see a way to address #6638 using the limitermiddlewareextension, consequently: the limiter sees both compressed bytes and resident memory size signals, i.e. the necessary inputs.

@jmacd jmacd closed this Mar 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants