Documentation
¶
Overview ¶
Package openSearchClient provides functionality for interacting with OpenSearch.
Package openSearchClient provides a client for OpenSearch designed to allow easy mocking in tests.
Example Usage:
clientConfig, err := config.ReadOpensearchClientConfig() if err != nil { return err } opensearchProjectClient, err := NewOpenSearchProjectClient(context.Background(), clientConfig) if err != nil { return err } client := NewClient(opensearchProjectClient, 10, 1) query := `{"query":{"bool":{"filter":[{"term":{"oid":{"value":"1.3.6.1.4.1.25623.1.0.117842"}}}]}}}` responseBody, err := client.Search(indexName, []byte(query)) if err != nil { return err } searchResponse, err := UnmarshalSearchResponse[*Vulnerability](responseBody) if err != nil { return err }
For further usage examples see ./client_test.go.
Index ¶
- func InitializeJson(timeFormats []string)
- func InjectAuthenticationIntoClient(client *opensearchapi.Client, config config.OpensearchClientConfig, ...) error
- func NewOpenSearchProjectClient(ctx context.Context, config config.OpensearchClientConfig, ...) (*opensearchapi.Client, error)
- func SerializeDocumentsForBulkUpdate[T any](indexName string, documents []T) ([]byte, error)
- func StartOpensearchTestContainer(ctx context.Context) (testcontainers.Container, config.OpensearchClientConfig, error)
- func Unmarshal(data []byte, v any) error
- func UnmarshalWithoutValidation(data []byte, v any) error
- type Authenticator
- type Bucket
- type BulkResponse
- type ByCreationDate
- type Client
- func (c *Client) AsyncDeleteByQuery(indexName string, requestBody []byte) error
- func (c *Client) BulkUpdate(indexName string, requestBody []byte) error
- func (c *Client) Close()
- func (c *Client) CompositeAggStream(indexName string, requestBody []byte, ctx context.Context) (io.Reader, error)
- func (c *Client) Count(indexName string, requestBody []byte) (count int64, err error)
- func (c *Client) DeleteByQuery(indexName string, requestBody []byte) error
- func (c *Client) Search(indexName string, requestBody []byte) (responseBody []byte, err error)
- func (c *Client) SearchStream(indexName string, requestBody []byte, scrollTimeout time.Duration, ...) (io.Reader, error)
- func (c *Client) SyncUpdate(indexName string, requestBody []byte) (responseBody []byte, err error)
- func (c *Client) Update(indexName string, requestBody []byte) (responseBody []byte, err error)
- type CountReq
- type CountResp
- type CreatedResponse
- type DocumentError
- type DocumentErrorType
- type DynamicAggregation
- type DynamicAggregationHits
- type IndexError
- type IndexFunction
- func (i *IndexFunction) AliasExists(aliasName string) (bool, error)
- func (i *IndexFunction) CreateIndex(indexName string, indexSchema []byte) error
- func (i *IndexFunction) CreateOrPutAlias(aliasName string, indexNames ...string) error
- func (i *IndexFunction) DeleteAliasFromIndex(indexName string, aliasName string) error
- func (i *IndexFunction) DeleteIndex(indexName string) error
- func (i *IndexFunction) ForceMerge(index string, maximumNumberOfSegments int) error
- func (i *IndexFunction) GetIndexSettings(index string) (map[string]interface{}, error)
- func (i *IndexFunction) GetIndexes(pattern string) ([]string, error)
- func (i *IndexFunction) GetIndexesForAlias(aliasName string) ([]string, error)
- func (i *IndexFunction) IndexExists(indexName string) (bool, error)
- func (i *IndexFunction) IndexHasAlias(indexNames []string, aliasNames []string) (bool, error)
- func (i *IndexFunction) RefreshIndex(index string) error
- func (i *IndexFunction) RemoveIndexesFromAlias(indexesToRemove []string, aliasName string) error
- func (i *IndexFunction) SetIndexSettings(index string, settingsBody io.Reader) error
- type IndexInfo
- type KeepJsonAsString
- type OpenSearchError
- type OpenSearchErrorResponse
- type OpenSearchErrors
- type OpenSearchHealth
- type OpenSearchResourceAlreadyExists
- type OpenSearchResourceNotFound
- type OpenSearchRootCause
- type OpensearchTestContainer
- type Request
- type Response
- type SearchResponse
- type SearchResponseAggregation
- type SearchResponseAggregations
- type SearchResponseHit
- type SearchResponseHits
- type SearchResponseHitsTotal
- type ShardStats
- type SyncUpdateClient
- type TokenReceiver
- type UpdateQueue
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitializeJson ¶
func InitializeJson(timeFormats []string)
func InjectAuthenticationIntoClient ¶ added in v1.7.4
func InjectAuthenticationIntoClient(client *opensearchapi.Client, config config.OpensearchClientConfig, tokenReceiver TokenReceiver, ) error
InjectAuthenticationIntoClient is a function that sets up the authentication method for the OpenSearch client. client is the OpenSearch client to inject the authentication into. config is the configuration for the OpenSearch client. tokenReceiver is the token receiver for OpenID authentication and must implement the GetClientAccessToken function. It can be nil for basic authentication.
func NewOpenSearchProjectClient ¶
func NewOpenSearchProjectClient(ctx context.Context, config config.OpensearchClientConfig, tokenReceiver TokenReceiver, ) (*opensearchapi.Client, error)
NewOpenSearchProjectClient creates a new official OpenSearch client (package github.com/opensearch-project/opensearch-go) for usage NewClient. It returns an error if the client couldn't be created or the connection couldn't be established.
ctx is the context to use for the connection. config is the configuration for the client.
func SerializeDocumentsForBulkUpdate ¶
SerializeDocumentsForBulkUpdate serializes documents for bulk update. Can be used in conjunction with BulkUpdate. It returns the serialized documents or an error in case something went wrong.
indexName is the name of the index to update. documents are the documents to update.
func StartOpensearchTestContainer ¶
func StartOpensearchTestContainer(ctx context.Context) (testcontainers.Container, config.OpensearchClientConfig, error)
StartOpensearchTestContainer starts a test container with opensearch and returns the container and the config for the opensearch client. It returns an error if the container couldn't be created or started.
ctx is the context to use for the container.
func UnmarshalWithoutValidation ¶
UnmarshalWithoutValidation unmarshalls data into v. It returns an error if the data can not be parsed.
Types ¶
type Authenticator ¶ added in v1.7.4
type Authenticator struct {
// contains filtered or unexported fields
}
Authenticator is a struct that holds the necessary information for authenticating with OpenSearch.
type Bucket ¶
type Bucket struct { Key any `json:"key"` KeyAsString string `json:"key_as_string"` DocCount uint `json:"doc_count"` Aggs map[string]DynamicAggregation }
func (*Bucket) UnmarshalJSON ¶
type BulkResponse ¶
type BulkResponse struct { Took uint `json:"took"` HasError bool `json:"errors"` Errors []IndexError `json:"items"` }
BulkResponse bulk response
type ByCreationDate ¶ added in v1.12.0
type ByCreationDate []IndexInfo
func (ByCreationDate) Len ¶ added in v1.12.0
func (a ByCreationDate) Len() int
func (ByCreationDate) Less ¶ added in v1.12.0
func (a ByCreationDate) Less(i, j int) bool
func (ByCreationDate) Swap ¶ added in v1.12.0
func (a ByCreationDate) Swap(i, j int)
type Client ¶ added in v1.0.1
type Client struct {
// contains filtered or unexported fields
}
Client is a client for OpenSearch designed to allow easy mocking in tests. It is a wrapper around the official OpenSearch client github.com/opensearch-project/opensearch-go .
func NewClient ¶
func NewClient(openSearchProjectClient *opensearchapi.Client, updateMaxRetries int, updateRetryDelay time.Duration) *Client
NewClient creates a new OpenSearch client.
openSearchProjectClient is the official OpenSearch client to wrap. Use NewOpenSearchProjectClient to create it. updateMaxRetries is the number of retries for update requests. updateRetryDelay is the delay between retries.
func (*Client) AsyncDeleteByQuery ¶ added in v1.0.1
AsyncDeleteByQuery updates documents in the given index asynchronously. It does not wait for the update to finish before returning. It returns an error in case something went wrong.
indexName is the name of the index to delete from. requestBody is the request body to send to OpenSearch to identify the documents to be deleted.
func (*Client) BulkUpdate ¶ added in v1.0.1
BulkUpdate performs a bulk update in the given index. It returns an error in case something went wrong.
indexName is the name of the index to update. requestBody is the request body to send to OpenSearch specifying the bulk update.
func (*Client) Close ¶ added in v1.0.1
func (c *Client) Close()
Close stops the underlying UpdateQueue allowing a graceful shutdown.
func (*Client) CompositeAggStream ¶ added in v1.11.0
func (*Client) DeleteByQuery ¶ added in v1.0.1
DeleteByQuery updates documents in the given index. It waits for the update to finish before returning. It returns an error in case something went wrong.
indexName is the name of the index to delete from. requestBody is the request body to send to OpenSearch to identify the documents to be deleted.
func (*Client) Search ¶ added in v1.0.1
Search searches for documents in the given index.
indexName is the name of the index to search in. requestBody is the request body to send to OpenSearch. It returns the response body as or an error in case something went wrong.
func (*Client) SearchStream ¶ added in v1.9.2
func (*Client) SyncUpdate ¶ added in v1.15.2
SyncUpdate updates documents in the given index synchronously.
func (*Client) Update ¶ added in v1.0.1
Update updates documents in the given index using UpdateQueue (which is also part of this package). It does not wait for the update to finish before returning. It returns the response body as or an error in case something went wrong.
indexName is the name of the index to update. requestBody is the request body to send to OpenSearch specifying the update.
type CountReq ¶ added in v1.9.4
type CountResp ¶ added in v1.9.4
type CountResp struct { Count int64 `json:"count"` Shards ShardStats `json:"_shards"` }
func (CountResp) Inspect ¶ added in v1.9.4
func (r CountResp) Inspect() opensearchapi.Inspect
type CreatedResponse ¶
type DocumentError ¶
type DocumentError struct { IndexName string `json:"_index"` IndexType string `json:"_type"` DocumentId string `json:"_id"` StatusCode uint `json:"status"` Error DocumentErrorType `json:"error"` }
type DocumentErrorType ¶
type DynamicAggregation ¶
type DynamicAggregationHits ¶
type DynamicAggregationHits struct { Total SearchResponseHitsTotal `json:"total"` SearchHits KeepJsonAsString `json:"hits"` }
type IndexError ¶
type IndexError struct {
Index DocumentError `json:"index"`
}
type IndexFunction ¶ added in v1.7.7
type IndexFunction struct {
// contains filtered or unexported fields
}
func NewIndexFunction ¶
func NewIndexFunction(openSearchProjectClient *opensearchapi.Client) *IndexFunction
func (*IndexFunction) AliasExists ¶ added in v1.7.7
func (i *IndexFunction) AliasExists(aliasName string) (bool, error)
func (*IndexFunction) CreateIndex ¶ added in v1.7.7
func (i *IndexFunction) CreateIndex(indexName string, indexSchema []byte) error
CreateIndex creates an index
func (*IndexFunction) CreateOrPutAlias ¶ added in v1.7.7
func (i *IndexFunction) CreateOrPutAlias(aliasName string, indexNames ...string) error
func (*IndexFunction) DeleteAliasFromIndex ¶ added in v1.7.7
func (i *IndexFunction) DeleteAliasFromIndex(indexName string, aliasName string) error
func (*IndexFunction) DeleteIndex ¶ added in v1.7.7
func (i *IndexFunction) DeleteIndex(indexName string) error
func (*IndexFunction) ForceMerge ¶ added in v1.9.1
func (i *IndexFunction) ForceMerge(index string, maximumNumberOfSegments int) error
func (*IndexFunction) GetIndexSettings ¶ added in v1.9.1
func (i *IndexFunction) GetIndexSettings(index string) (map[string]interface{}, error)
func (*IndexFunction) GetIndexes ¶ added in v1.7.7
func (i *IndexFunction) GetIndexes(pattern string) ([]string, error)
func (*IndexFunction) GetIndexesForAlias ¶ added in v1.7.7
func (i *IndexFunction) GetIndexesForAlias(aliasName string) ([]string, error)
previously AliasPointsToIndex
func (*IndexFunction) IndexExists ¶ added in v1.7.7
func (i *IndexFunction) IndexExists(indexName string) (bool, error)
func (*IndexFunction) IndexHasAlias ¶ added in v1.7.7
func (i *IndexFunction) IndexHasAlias(indexNames []string, aliasNames []string) (bool, error)
func (*IndexFunction) RefreshIndex ¶ added in v1.9.1
func (i *IndexFunction) RefreshIndex(index string) error
func (*IndexFunction) RemoveIndexesFromAlias ¶ added in v1.7.7
func (i *IndexFunction) RemoveIndexesFromAlias(indexesToRemove []string, aliasName string) error
func (*IndexFunction) SetIndexSettings ¶ added in v1.9.1
func (i *IndexFunction) SetIndexSettings(index string, settingsBody io.Reader) error
type IndexInfo ¶ added in v1.12.0
func ConvertToIndexInfo ¶ added in v1.12.0
func ConvertToIndexInfo(indices []opensearchapi.CatIndexResp) []IndexInfo
func SortIndexInfoByCreationDate ¶ added in v1.12.0
type KeepJsonAsString ¶
type KeepJsonAsString []byte
func (*KeepJsonAsString) UnmarshalJSON ¶
func (k *KeepJsonAsString) UnmarshalJSON(data []byte) error
type OpenSearchError ¶
type OpenSearchError struct {
Message string
}
OpenSearchError openSearch error
func NewOpenSearchError ¶
func NewOpenSearchError(message string) *OpenSearchError
func (*OpenSearchError) Error ¶
func (o *OpenSearchError) Error() string
type OpenSearchErrorResponse ¶
type OpenSearchErrorResponse struct { Error OpenSearchErrors Status int }
type OpenSearchErrors ¶
type OpenSearchErrors struct { Reasons []OpenSearchRootCause Type string Reason string }
type OpenSearchHealth ¶ added in v1.12.0
type OpenSearchHealth struct {
// contains filtered or unexported fields
}
func NewOpenSearchHealth ¶ added in v1.12.0
func NewOpenSearchHealth(openSearchProjectClient *opensearchapi.Client) *OpenSearchHealth
func (*OpenSearchHealth) GetDiskAllocationPercentage ¶ added in v1.12.0
func (h *OpenSearchHealth) GetDiskAllocationPercentage() (int, error)
func (*OpenSearchHealth) GetIndexesWithCreationDate ¶ added in v1.12.0
func (h *OpenSearchHealth) GetIndexesWithCreationDate(pattern string) ([]IndexInfo, error)
type OpenSearchResourceAlreadyExists ¶
type OpenSearchResourceAlreadyExists struct {
Message string
}
OpenSearchResourceAlreadyExists openSearch resource already exists
func NewOpenSearchResourceAlreadyExists ¶
func NewOpenSearchResourceAlreadyExists(message string) *OpenSearchResourceAlreadyExists
func (*OpenSearchResourceAlreadyExists) Error ¶
func (o *OpenSearchResourceAlreadyExists) Error() string
type OpenSearchResourceNotFound ¶
type OpenSearchResourceNotFound struct {
Message string
}
OpenSearchResourceNotFound openSearch resource already exists
func NewOpenSearchResourceNotFound ¶
func NewOpenSearchResourceNotFound(message string) *OpenSearchResourceNotFound
func (*OpenSearchResourceNotFound) Error ¶
func (o *OpenSearchResourceNotFound) Error() string
type OpenSearchRootCause ¶
type OpensearchTestContainer ¶
type OpensearchTestContainer struct {
testcontainers.Container
}
OpensearchTestContainer represents the opensearch container
type SearchResponse ¶
type SearchResponse[T any] struct { Took uint `json:"took"` TimedOut bool `json:"timed_out"` Hits SearchResponseHits[T] `json:"hits"` Aggregations SearchResponseAggregations `json:"aggregations"` }
func UnmarshalSearchResponse ¶
func UnmarshalSearchResponse[T any](data []byte) (*SearchResponse[T], error)
func (SearchResponse[T]) GetResults ¶
func (s SearchResponse[T]) GetResults() []T
GetResults returns list of documents
func (SearchResponse[T]) GetSearchHits ¶
func (s SearchResponse[T]) GetSearchHits() []SearchResponseHit[T]
type SearchResponseAggregations ¶
type SearchResponseAggregations map[string]SearchResponseAggregation
type SearchResponseHit ¶
type SearchResponseHits ¶
type SearchResponseHits[T any] struct { Total SearchResponseHitsTotal SearchHits []SearchResponseHit[T] `json:"hits"` }
type SearchResponseHitsTotal ¶
type ShardStats ¶ added in v1.9.4
type SyncUpdateClient ¶ added in v1.15.2
type SyncUpdateClient struct {
// contains filtered or unexported fields
}
func NewSyncUpdateClient ¶ added in v1.15.2
func NewSyncUpdateClient(osClient *opensearchapi.Client, maxRetries int, retryDelay time.Duration) *SyncUpdateClient
type TokenReceiver ¶ added in v1.7.7
type TokenReceiver interface { GetClientAccessToken(clientName, clientSecret string) (string, error) ClearClientAccessToken() }
TokenReceiver is an interface for receiving client access tokens.
type UpdateQueue ¶ added in v1.0.1
type UpdateQueue struct {
// contains filtered or unexported fields
}
UpdateQueue is a queue for OpenSearch update requests.
func NewRequestQueue ¶
func NewRequestQueue(openSearchClient *opensearchapi.Client, updateMaxRetries int, updateRetryDelay time.Duration) *UpdateQueue
NewRequestQueue creates a new update queue.
openSearchClient is the official OpenSearch client. Use NewOpenSearchProjectClient to create it. updateMaxRetries is the number of retries for update requests. updateRetryDelay is the delay between retries.
func (*UpdateQueue) Stop ¶ added in v1.0.1
func (q *UpdateQueue) Stop()
func (*UpdateQueue) Update ¶ added in v1.0.1
func (q *UpdateQueue) Update(indexName string, requestBody []byte) ([]byte, error)
Update queues and update for an index and returns the response body or an error
Is called from pkg/openSearch/open_search_client/client.go: func (c *Client) Update(indexName string, requestBody []byte) (responseBody []byte, err error) and tested in pkg/openSearch/open_search_client/client_test.go
indexName: The name of the index to update requestBody: The request body to send to the index
Returns: The response body or an error