-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Second draft: configmiddleware and extensionlimiter #12633
Conversation
// LimiterID specifies the name of a limiter extension to be used. | ||
LimiterID component.ID `mapstructure:"limiter,omitempty"` | ||
|
||
// InterceptorID specifies the name of an interceptor extension to be used. | ||
InterceptorID component.ID `mapstructure:"interceptor,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to distinguish between them? Can we have only one ID, and when we check for that extension, if it is a limiter we "convert" it to interceptor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about the case of a scraper-type receiver, which is able to call a limiter in various ways but cannot be configured with interceptors. Then, I would expect to see
receivers:
filescraper: # it reads and re-reads a file, so neither gRPC or HTTP interceptors apply
limiters: # config type is []configmiddleware.Middleware
- limiter: xyz
- limiter: abc
However, it would be an error to specify an interceptor-type (e.g., - interceptor: decorator
) in this context. That's why I was thinking the distinction would be helpful.
The scraper would be able to request from limiter(s) for request count, item count, and byte-size quantities.
Yes, if we request an interceptor and get a limiter, there can be an automatic conversion. However, if you request a limiter and get an interceptor, it can simply be an error.
|
||
// Settings describe additional details of the request. | ||
type Settings struct { | ||
// Kind describes which signal is being used. This field permits |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then use pipeline.Signal
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right! I did mean pipeline.Signal
and I'm not sure if component.Kind
will be useful. For limiters, I would encourage receivers to perform limiting. For interceptors, it could be meaningful: e.g., a Prometheus receiver uses HTTP clients for receiving, while an OTLP exporter uses HTTP clients for exporting, maybe. I'll leave it at Signal for now.
// set; if a field is zero, no limit will apply. Receivers are meant | ||
// to call a limiter one or more times as this information becomes | ||
// available. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit confused about the "one or more times". How do a limiter deal with that? Let's say we want a limiter to do N concurrent requests, how do I know that is the same request? Do you expect to call Done before calling second time?
// is known, the receiver should call the limiter again: | ||
// | ||
// rel, err := Acquire(ctx, Settings{...}, Weight{ | ||
// Requests: 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why request is 0?
// rel, err := Acquire(ctx, Settings{...}, Weight{ | ||
// Requests: 0, | ||
// Records: recordNum, | ||
// Bytes: len(uncompressedData) - len(compressedData), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to be a sort of "sum" involved here, which is not clear from the comments.
// of the two calls. On the other hand, for this example, if the | ||
// limiter is based on bytes there will be two real calls to the | ||
// limiter, to account for the two non-zero Bytes fields. | ||
type Weight struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm finding it hard to reason about this. Are they all part of the same limit? It seems to me we're looking at multiple distinct limits here, which would be applied in order:
- HTTP/gRPC requests
- HTTP/gRPC request body size, i.e. potentially-compressed bytes
- Signal-specific record count
- Estimated size of decoded signals
Would it be simpler to have a single numeric weight, and distinct limiters for each unit? e.g.
type Limiter interface {
// Acquire requests admission of a predefined unit, such as "requests", "records", or "bytes".
Acquire(ctx context.Context, settings Settings, n int64) (ReleaseFunc, error)
}
Then you would configure a separate limiter for each unit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@axw Thank you. Between your comment and the three above @sfc-gh-bdrutu (cc @bogdandrutu 🤣), I realize that more work is needed.
@axw as you pointed out here, there are 4 ways we can limit a pipeline at the receiver, including compressed body size, request number, item number, and "resident memory size". I am excluding "(uncompressed) body size" for reasons discussed below: mainly that "resident memory size" is better measurement. Instead of having a Weight
with multiple fields, following your suggestion, we can have a single-valued weight with an enum indicating which kind of weight it is. What I would like to preserve, is that callers can call the limiter fewer than 4 times.
@bogdandrutu About your questions "why 0?", "why one or more times?". I think in most cases a limiter will be called once or twice. In some cases, we can derive all four values from a single interceptor (especially OTLP signals); in other cases, the interceptor may report compressed bytes and request count, but a later stage computes the item-count and resident-memory-size.
In pseudo-code, I see this looking like:
type Variety int
const (
CompressedBytesCount Variety = iota
RequestCount
ItemCount
ResidentBytesCount
)
type Weight struct {
Variety Variety
Value uint64
}
type Limiter interface {
Acquire(ctx context.Context, settings Settings, weights []Weight) (ReleaseFunc, error)
}
Some details, as I understand them:
An interceptor middleware can be used to measure and limit compressed network bytes or request count.
A limiter extension can be used to measure and limit item count and resident memory size.
Note: Uncompressed network bytes, as calculated by stream interceptors in particular, is not always useful, especially not in the otelarrow
case I've mentioned in the past. For a streaming protocol, the "uncompressed" size as seen by gRPC is not a good estimate for the uncompressed size in an OTLP representation.
As an aside, OTel-Arrow exporter and receiver have integrated middleware used for monitoring compression rate, which requires computing a ratio of compressed to uncompressed bytes. For the reason stated above, we use different middleware configuration for the unary and stream cases (i.e., OTLP vs. OTAP); in some cases, uncompressed bytes is reported from gRPC, in other cases it is reported explicitly using resident memory size. Roughly speaking, this demonstrates how we can derive a compression-rate signal from the same list of weights presented to the limiter: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/2042f437c45db1f53694bd574c2bfa33ec9dae94/internal/otelarrow/netstats/netstats.go#L65. I see a way to address #6638 using the limitermiddlewareextension
, consequently: the limiter sees both compressed bytes and resident memory size signals, i.e. the necessary inputs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can have a single-valued weight with an enum indicating which kind of weight it is
Is it important to have an enum with predefined/restricted units? Would it make sense to allow arbitrary units, specified by the caller? Then you could have a processor that rate limits by some arbitrary properties of the data. This may be a bit contrived though.
Anyway, essentially what I mean is change Weight to look like:
type Weight struct {
Variety string
Value uint64
}
There may be some constants like "bytes" or "records" defined for convenience, but limiter implementations would be expected to handle arbitrary units/weight varieties. I think it make make it a little simpler to grok: there's nothing special about the unit, it exists only to distinguish one rate limit from another.
FWIW in our rate limiter processor we use the processor component ID and client metadata keys for the identity of the rate limit: https://github.com/elastic/opentelemetry-collector-components/blob/9cf1b1ae7ee37042c5dc5a5c74c10fe205be9f23/processor/ratelimitprocessor/gubernator.go#L92-L93
You can then have a separate instance of the processor for each unit (e.g. "ratelimit/records"), so the unit effectively becomes part of the rate limit identity. That won't work if you want to allow acquiring multiple resources at once though of course.
} | ||
|
||
// GetLimiter requests to locate a named limiter extension. | ||
func (m Middleware) GetLimiter(_ context.Context, extensions map[component.ID]component.Component) (extensionlimiter.Limiter, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you consider creating a middleware implementation that uses a limiter, rather than coupling middleware to the limiter interface? I mean, as an alternative, we could introduce a package like extension/limitermiddlewareextension
. Then you would configure the collector like:
extensions:
# Used with gRPC traffic, consults GC statistics.
memory_limiter/cold
request_limit_mib: 100
waiting_limit_mib: 10
# Used with HTTP traffic, counts request bytes in flight.
admission_limiter/warm:
request_limit_mib: 10
waiting_limit_mib: 10
limitermiddleware/memory:
limiter: memory_limiter_cold
limitermiddleware/admission:
limiter: admission_limiter/warm
receivers:
otlp:
protocols:
http:
# ...
middleware:
- limitermiddleware/admission
grpc:
# ...
middleware:
- limitermiddleware/memory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@axw I like this idea for the way it separates limiters and middleware.
I've referred to the case of otelarrow
receiver, which uses both gRPC unary (for OTLP) and gRPC stream (for OTAP). I would use a limitermiddleware
at the gRPC level for the unary case, and for the stream case I would call the configured limiter directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you reviewers. I will close this PR in 24 hours. I will open another with the idea described first by @axw below, which is (1) to separate limiters from middleware, with a dedicated limitermiddleware
extension to apply limiters as middleware, (2) to use a slice of weights in the Acquire function(), giving receivers flexibility to handle some compressed-bytes and request-count limits in interceptors while counting items and resident memory size at the receiver level.
// LimiterID specifies the name of a limiter extension to be used. | ||
LimiterID component.ID `mapstructure:"limiter,omitempty"` | ||
|
||
// InterceptorID specifies the name of an interceptor extension to be used. | ||
InterceptorID component.ID `mapstructure:"interceptor,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about the case of a scraper-type receiver, which is able to call a limiter in various ways but cannot be configured with interceptors. Then, I would expect to see
receivers:
filescraper: # it reads and re-reads a file, so neither gRPC or HTTP interceptors apply
limiters: # config type is []configmiddleware.Middleware
- limiter: xyz
- limiter: abc
However, it would be an error to specify an interceptor-type (e.g., - interceptor: decorator
) in this context. That's why I was thinking the distinction would be helpful.
The scraper would be able to request from limiter(s) for request count, item count, and byte-size quantities.
Yes, if we request an interceptor and get a limiter, there can be an automatic conversion. However, if you request a limiter and get an interceptor, it can simply be an error.
} | ||
|
||
// GetLimiter requests to locate a named limiter extension. | ||
func (m Middleware) GetLimiter(_ context.Context, extensions map[component.ID]component.Component) (extensionlimiter.Limiter, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@axw I like this idea for the way it separates limiters and middleware.
I've referred to the case of otelarrow
receiver, which uses both gRPC unary (for OTLP) and gRPC stream (for OTAP). I would use a limitermiddleware
at the gRPC level for the unary case, and for the stream case I would call the configured limiter directly.
|
||
// Settings describe additional details of the request. | ||
type Settings struct { | ||
// Kind describes which signal is being used. This field permits |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right! I did mean pipeline.Signal
and I'm not sure if component.Kind
will be useful. For limiters, I would encourage receivers to perform limiting. For interceptors, it could be meaningful: e.g., a Prometheus receiver uses HTTP clients for receiving, while an OTLP exporter uses HTTP clients for exporting, maybe. I'll leave it at Signal for now.
// of the two calls. On the other hand, for this example, if the | ||
// limiter is based on bytes there will be two real calls to the | ||
// limiter, to account for the two non-zero Bytes fields. | ||
type Weight struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@axw Thank you. Between your comment and the three above @sfc-gh-bdrutu (cc @bogdandrutu 🤣), I realize that more work is needed.
@axw as you pointed out here, there are 4 ways we can limit a pipeline at the receiver, including compressed body size, request number, item number, and "resident memory size". I am excluding "(uncompressed) body size" for reasons discussed below: mainly that "resident memory size" is better measurement. Instead of having a Weight
with multiple fields, following your suggestion, we can have a single-valued weight with an enum indicating which kind of weight it is. What I would like to preserve, is that callers can call the limiter fewer than 4 times.
@bogdandrutu About your questions "why 0?", "why one or more times?". I think in most cases a limiter will be called once or twice. In some cases, we can derive all four values from a single interceptor (especially OTLP signals); in other cases, the interceptor may report compressed bytes and request count, but a later stage computes the item-count and resident-memory-size.
In pseudo-code, I see this looking like:
type Variety int
const (
CompressedBytesCount Variety = iota
RequestCount
ItemCount
ResidentBytesCount
)
type Weight struct {
Variety Variety
Value uint64
}
type Limiter interface {
Acquire(ctx context.Context, settings Settings, weights []Weight) (ReleaseFunc, error)
}
Some details, as I understand them:
An interceptor middleware can be used to measure and limit compressed network bytes or request count.
A limiter extension can be used to measure and limit item count and resident memory size.
Note: Uncompressed network bytes, as calculated by stream interceptors in particular, is not always useful, especially not in the otelarrow
case I've mentioned in the past. For a streaming protocol, the "uncompressed" size as seen by gRPC is not a good estimate for the uncompressed size in an OTLP representation.
As an aside, OTel-Arrow exporter and receiver have integrated middleware used for monitoring compression rate, which requires computing a ratio of compressed to uncompressed bytes. For the reason stated above, we use different middleware configuration for the unary and stream cases (i.e., OTLP vs. OTAP); in some cases, uncompressed bytes is reported from gRPC, in other cases it is reported explicitly using resident memory size. Roughly speaking, this demonstrates how we can derive a compression-rate signal from the same list of weights presented to the limiter: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/2042f437c45db1f53694bd574c2bfa33ec9dae94/internal/otelarrow/netstats/netstats.go#L65. I see a way to address #6638 using the limitermiddlewareextension
, consequently: the limiter sees both compressed bytes and resident memory size signals, i.e. the necessary inputs.
Description
Second draft for Limiter extension w/ Middleware config as discussed in #12603 incorporating feedback from @axw and @bogdandrutu and following @dmitryax's example in #12574.
Relative to the first draft, this adds:
As an example (and to emphasize that previous examples were incorrect), note that to follow the
configauth
pattern here means each entry in a middleware list has a nested field.If we don't follow this example, each of the "limiter" and "interecptor" fields above will become "middleware" which feels not right.
Link to tracking issue
Part of #7441
Part of #9591
Part of #12603
Testing
TODO
Documentation
👍