Documentation ¶
Index ¶
- Constants
- Variables
- func AddPipelineSpanToAnyTrace(ctx context.Context, c *etcd.Client, pipeline, operation string, ...) (opentracing.Span, context.Context)
- func StartAnyExtendedTrace(ctx context.Context, operation string, pipeline string) (newCtx context.Context, ok bool)
- func TraceIn2Out(ctx context.Context) context.Context
- func TracesCol(c *etcd.Client) col.Collection
- type TraceProto
- func (*TraceProto) Descriptor() ([]byte, []int)
- func (m *TraceProto) GetPipeline() string
- func (m *TraceProto) GetSerializedTrace() map[string]string
- func (m *TraceProto) Marshal() (dAtA []byte, err error)
- func (m *TraceProto) MarshalTo(dAtA []byte) (int, error)
- func (*TraceProto) ProtoMessage()
- func (m *TraceProto) Reset()
- func (m *TraceProto) Size() (n int)
- func (m *TraceProto) String() string
- func (m *TraceProto) Unmarshal(dAtA []byte) error
- func (m *TraceProto) XXX_DiscardUnknown()
- func (m *TraceProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *TraceProto) XXX_Merge(src proto.Message)
- func (m *TraceProto) XXX_Size() int
- func (m *TraceProto) XXX_Unmarshal(b []byte) error
Constants ¶
const ( // TracesCollectionPrefix is the prefix associated with the 'traces' // collection in etcd (which maps pipelines and commits to extended traces) TracesCollectionPrefix = "commit_traces" // ExtendedTraceEnvVar determines how long extended traces are updated until // they're deleted from the cluster ExtendedTraceEnvVar = "PACH_EXTENDED_TRACE" )
Variables ¶
var ( // CommitIDIndex is a secondary index for extended traces by the set of // commit IDs watched by the trace CommitIDIndex = &col.Index{ Field: "CommitIDs", Multi: true, } // PipelineIndex is a secondary index for extended traces by the pipelint // watched by the trace PipelineIndex = &col.Index{ Field: "Pipeline", } // TraceGetOpts are the default options for retrieving a trace from // 'TracesCol' TraceGetOpts = &col.Options{Target: etcd.SortByKey, Order: etcd.SortNone, SelfSort: false} )
var ( ErrInvalidLengthExtendedTrace = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowExtendedTrace = fmt.Errorf("proto: integer overflow") )
Functions ¶
func AddPipelineSpanToAnyTrace ¶
func AddPipelineSpanToAnyTrace(ctx context.Context, c *etcd.Client, pipeline, operation string, kvs ...interface{}) (opentracing.Span, context.Context)
AddPipelineSpanToAnyTrace finds any extended traces associated with 'pipeline', and if any such trace exists, it creates a new span associated with that trace and returns it
func StartAnyExtendedTrace ¶ added in v1.9.0
func StartAnyExtendedTrace(ctx context.Context, operation string, pipeline string) (newCtx context.Context, ok bool)
StartAnyExtendedTrace adds a new trace to 'ctx' (and returns an augmented context) based on whether the environment variable in 'targetRepoEnvVar' is set. Returns a context that may have the new span attached, and 'true' if an an extended trace was created, or 'false' otherwise
func TraceIn2Out ¶
TraceIn2Out copies any extended traces from the incoming RPC context in 'ctx' into the outgoing RPC context in 'ctx'. Currently, this is only called by CreatePipeline, when it forwards extended contexts to the PutFile RPC with the new commit info.
Types ¶
type TraceProto ¶
type TraceProto struct { // serialized_trace contains the info identifying a trace in Jaeger (a // (trace ID, span ID, sampled) tuple, basically) SerializedTrace map[string]string `` /* 194-byte string literal not displayed */ // pipeline specifies the target pipeline of this trace; this would be set for // a trace created by 'pachctl create-pipeline' or 'pachctl update-pipeline' // and would include the kubernetes RPCs involved in creating a pipeline Pipeline string `protobuf:"bytes,2,opt,name=pipeline,proto3" json:"pipeline,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
TraceProto contains information identifying a Jaeger trace. It's used to propagate traces that follow the lifetime of a long operation (e.g. creating a pipeline or running a job), and which live longer than any single RPC.
func GetTraceFromCtx ¶
func GetTraceFromCtx(ctx context.Context) (*TraceProto, error)
GetTraceFromCtx extracts any extended trace embeded in 'ctx's metadata
func (*TraceProto) Descriptor ¶
func (*TraceProto) Descriptor() ([]byte, []int)
func (*TraceProto) GetPipeline ¶
func (m *TraceProto) GetPipeline() string
func (*TraceProto) GetSerializedTrace ¶
func (m *TraceProto) GetSerializedTrace() map[string]string
func (*TraceProto) Marshal ¶
func (m *TraceProto) Marshal() (dAtA []byte, err error)
func (*TraceProto) ProtoMessage ¶
func (*TraceProto) ProtoMessage()
func (*TraceProto) Reset ¶
func (m *TraceProto) Reset()
func (*TraceProto) Size ¶
func (m *TraceProto) Size() (n int)
func (*TraceProto) String ¶
func (m *TraceProto) String() string
func (*TraceProto) Unmarshal ¶
func (m *TraceProto) Unmarshal(dAtA []byte) error
func (*TraceProto) XXX_DiscardUnknown ¶
func (m *TraceProto) XXX_DiscardUnknown()
func (*TraceProto) XXX_Marshal ¶
func (m *TraceProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*TraceProto) XXX_Merge ¶
func (m *TraceProto) XXX_Merge(src proto.Message)
func (*TraceProto) XXX_Size ¶
func (m *TraceProto) XXX_Size() int
func (*TraceProto) XXX_Unmarshal ¶
func (m *TraceProto) XXX_Unmarshal(b []byte) error