X7ROOT File Manager
Current Path:
/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations
opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
sentry_sdk
/
integrations
/
??
..
??
__init__.py
(12.44 KB)
??
__pycache__
??
_asgi_common.py
(3.11 KB)
??
_wsgi_common.py
(7.38 KB)
??
aiohttp.py
(12.7 KB)
??
anthropic.py
(14.17 KB)
??
argv.py
(911 B)
??
ariadne.py
(5.7 KB)
??
arq.py
(7.7 KB)
??
asgi.py
(12.5 KB)
??
asyncio.py
(4.5 KB)
??
asyncpg.py
(6.37 KB)
??
atexit.py
(1.61 KB)
??
aws_lambda.py
(17.65 KB)
??
beam.py
(5.06 KB)
??
boto3.py
(4.31 KB)
??
bottle.py
(6.46 KB)
??
celery
??
chalice.py
(4.59 KB)
??
clickhouse_driver.py
(5.96 KB)
??
cloud_resource_context.py
(7.6 KB)
??
cohere.py
(9.18 KB)
??
dedupe.py
(1.93 KB)
??
django
??
dramatiq.py
(7.28 KB)
??
excepthook.py
(2.35 KB)
??
executing.py
(1.95 KB)
??
falcon.py
(9.28 KB)
??
fastapi.py
(4.48 KB)
??
flask.py
(8.54 KB)
??
gcp.py
(8.26 KB)
??
gnu_backtrace.py
(2.75 KB)
??
google_genai
??
gql.py
(4.69 KB)
??
graphene.py
(4.92 KB)
??
grpc
??
httpx.py
(5.94 KB)
??
huey.py
(5.32 KB)
??
huggingface_hub.py
(14.6 KB)
??
langchain.py
(39.02 KB)
??
langgraph.py
(11.56 KB)
??
launchdarkly.py
(1.89 KB)
??
litellm.py
(9.99 KB)
??
litestar.py
(11.55 KB)
??
logging.py
(13.57 KB)
??
loguru.py
(6.38 KB)
??
mcp.py
(19.58 KB)
??
modules.py
(820 B)
??
openai.py
(24.54 KB)
??
openai_agents
??
openfeature.py
(1.1 KB)
??
opentelemetry
??
otlp.py
(3.06 KB)
??
pure_eval.py
(4.5 KB)
??
pydantic_ai
??
pymongo.py
(6.23 KB)
??
pyramid.py
(7.19 KB)
??
quart.py
(7.24 KB)
??
ray.py
(5.2 KB)
??
redis
??
rq.py
(5.18 KB)
??
rust_tracing.py
(8.87 KB)
??
sanic.py
(12.66 KB)
??
serverless.py
(1.76 KB)
??
socket.py
(3.09 KB)
??
spark
??
sqlalchemy.py
(4.24 KB)
??
starlette.py
(25.62 KB)
??
starlite.py
(10.31 KB)
??
statsig.py
(1.2 KB)
??
stdlib.py
(8.76 KB)
??
strawberry.py
(13.82 KB)
??
sys_exit.py
(2.43 KB)
??
threading.py
(6.94 KB)
??
tornado.py
(7.04 KB)
??
trytond.py
(1.61 KB)
??
typer.py
(1.77 KB)
??
unleash.py
(1.03 KB)
??
unraisablehook.py
(1.71 KB)
??
wsgi.py
(10.56 KB)
Editing: langchain.py
import contextvars import itertools import warnings from collections import OrderedDict from functools import wraps import sys import sentry_sdk from sentry_sdk.ai.monitoring import set_ai_pipeline_name from sentry_sdk.ai.utils import ( GEN_AI_ALLOWED_MESSAGE_ROLES, normalize_message_roles, set_data_normalized, get_start_span_function, truncate_and_annotate_messages, ) from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.scope import should_send_default_pii from sentry_sdk.tracing_utils import _get_value, set_span_errored from sentry_sdk.utils import logger, capture_internal_exceptions from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import ( Any, AsyncIterator, Callable, Dict, Iterator, List, Optional, Union, ) from uuid import UUID from sentry_sdk.tracing import Span try: from langchain_core.agents import AgentFinish from langchain_core.callbacks import ( BaseCallbackHandler, BaseCallbackManager, Callbacks, manager, ) from langchain_core.messages import BaseMessage from langchain_core.outputs import LLMResult except ImportError: raise DidNotEnable("langchain not installed") try: # >=v1 from langchain_classic.agents import AgentExecutor # type: ignore[import-not-found] except ImportError: try: # <v1 from langchain.agents import AgentExecutor except ImportError: AgentExecutor = None # Conditional imports for embeddings providers try: from langchain_openai import OpenAIEmbeddings # type: ignore[import-not-found] except ImportError: OpenAIEmbeddings = None try: from langchain_openai import AzureOpenAIEmbeddings except ImportError: AzureOpenAIEmbeddings = None try: from langchain_google_vertexai import VertexAIEmbeddings # type: ignore[import-not-found] except ImportError: VertexAIEmbeddings = None try: from langchain_aws import BedrockEmbeddings # type: ignore[import-not-found] except ImportError: BedrockEmbeddings = None try: from langchain_cohere import CohereEmbeddings # type: ignore[import-not-found] except ImportError: CohereEmbeddings = None try: from langchain_mistralai import MistralAIEmbeddings # type: ignore[import-not-found] except ImportError: MistralAIEmbeddings = None try: from langchain_huggingface import HuggingFaceEmbeddings # type: ignore[import-not-found] except ImportError: HuggingFaceEmbeddings = None try: from langchain_ollama import OllamaEmbeddings # type: ignore[import-not-found] except ImportError: OllamaEmbeddings = None DATA_FIELDS = { "frequency_penalty": SPANDATA.GEN_AI_REQUEST_FREQUENCY_PENALTY, "function_call": SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS, "max_tokens": SPANDATA.GEN_AI_REQUEST_MAX_TOKENS, "presence_penalty": SPANDATA.GEN_AI_REQUEST_PRESENCE_PENALTY, "temperature": SPANDATA.GEN_AI_REQUEST_TEMPERATURE, "tool_calls": SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS, "top_k": SPANDATA.GEN_AI_REQUEST_TOP_K, "top_p": SPANDATA.GEN_AI_REQUEST_TOP_P, } # Contextvar to track agent names in a stack for re-entrant agent support _agent_stack = contextvars.ContextVar("langchain_agent_stack", default=None) # type: contextvars.ContextVar[Optional[List[Optional[str]]]] def _push_agent(agent_name): # type: (Optional[str]) -> None """Push an agent name onto the stack.""" stack = _agent_stack.get() if stack is None: stack = [] else: # Copy the list to maintain contextvar isolation across async contexts stack = stack.copy() stack.append(agent_name) _agent_stack.set(stack) def _pop_agent(): # type: () -> Optional[str] """Pop an agent name from the stack and return it.""" stack = _agent_stack.get() if stack: # Copy the list to maintain contextvar isolation across async contexts stack = stack.copy() agent_name = stack.pop() _agent_stack.set(stack) return agent_name return None def _get_current_agent(): # type: () -> Optional[str] """Get the current agent name (top of stack) without removing it.""" stack = _agent_stack.get() if stack: return stack[-1] return None class LangchainIntegration(Integration): identifier = "langchain" origin = f"auto.ai.{identifier}" def __init__(self, include_prompts=True, max_spans=None): # type: (LangchainIntegration, bool, Optional[int]) -> None self.include_prompts = include_prompts self.max_spans = max_spans if max_spans is not None: warnings.warn( "The `max_spans` parameter of `LangchainIntegration` is " "deprecated and will be removed in version 3.0 of sentry-sdk.", DeprecationWarning, stacklevel=2, ) @staticmethod def setup_once(): # type: () -> None manager._configure = _wrap_configure(manager._configure) if AgentExecutor is not None: AgentExecutor.invoke = _wrap_agent_executor_invoke(AgentExecutor.invoke) AgentExecutor.stream = _wrap_agent_executor_stream(AgentExecutor.stream) # Patch embeddings providers _patch_embeddings_provider(OpenAIEmbeddings) _patch_embeddings_provider(AzureOpenAIEmbeddings) _patch_embeddings_provider(VertexAIEmbeddings) _patch_embeddings_provider(BedrockEmbeddings) _patch_embeddings_provider(CohereEmbeddings) _patch_embeddings_provider(MistralAIEmbeddings) _patch_embeddings_provider(HuggingFaceEmbeddings) _patch_embeddings_provider(OllamaEmbeddings) class WatchedSpan: span = None # type: Span children = [] # type: List[WatchedSpan] is_pipeline = False # type: bool def __init__(self, span): # type: (Span) -> None self.span = span class SentryLangchainCallback(BaseCallbackHandler): # type: ignore[misc] """Callback handler that creates Sentry spans.""" def __init__(self, max_span_map_size, include_prompts): # type: (Optional[int], bool) -> None self.span_map = OrderedDict() # type: OrderedDict[UUID, WatchedSpan] self.max_span_map_size = max_span_map_size self.include_prompts = include_prompts def gc_span_map(self): # type: () -> None if self.max_span_map_size is not None: while len(self.span_map) > self.max_span_map_size: run_id, watched_span = self.span_map.popitem(last=False) self._exit_span(watched_span, run_id) def _handle_error(self, run_id, error): # type: (UUID, Any) -> None with capture_internal_exceptions(): if not run_id or run_id not in self.span_map: return span_data = self.span_map[run_id] span = span_data.span set_span_errored(span) sentry_sdk.capture_exception(error, span.scope) span.__exit__(None, None, None) del self.span_map[run_id] def _normalize_langchain_message(self, message): # type: (BaseMessage) -> Any parsed = {"role": message.type, "content": message.content} parsed.update(message.additional_kwargs) return parsed def _create_span(self, run_id, parent_id, **kwargs): # type: (SentryLangchainCallback, UUID, Optional[Any], Any) -> WatchedSpan watched_span = None # type: Optional[WatchedSpan] if parent_id: parent_span = self.span_map.get(parent_id) # type: Optional[WatchedSpan] if parent_span: watched_span = WatchedSpan(parent_span.span.start_child(**kwargs)) parent_span.children.append(watched_span) if watched_span is None: watched_span = WatchedSpan(sentry_sdk.start_span(**kwargs)) watched_span.span.__enter__() self.span_map[run_id] = watched_span self.gc_span_map() return watched_span def _exit_span(self, span_data, run_id): # type: (SentryLangchainCallback, WatchedSpan, UUID) -> None if span_data.is_pipeline: set_ai_pipeline_name(None) span_data.span.__exit__(None, None, None) del self.span_map[run_id] def on_llm_start( self, serialized, prompts, *, run_id, tags=None, parent_run_id=None, metadata=None, **kwargs, ): # type: (SentryLangchainCallback, Dict[str, Any], List[str], UUID, Optional[List[str]], Optional[UUID], Optional[Dict[str, Any]], Any) -> Any """Run when LLM starts running.""" with capture_internal_exceptions(): if not run_id: return all_params = kwargs.get("invocation_params", {}) all_params.update(serialized.get("kwargs", {})) model = ( all_params.get("model") or all_params.get("model_name") or all_params.get("model_id") or "" ) watched_span = self._create_span( run_id, parent_run_id, op=OP.GEN_AI_PIPELINE, name=kwargs.get("name") or "Langchain LLM call", origin=LangchainIntegration.origin, ) span = watched_span.span if model: span.set_data( SPANDATA.GEN_AI_REQUEST_MODEL, model, ) ai_type = all_params.get("_type", "") if "anthropic" in ai_type: span.set_data(SPANDATA.GEN_AI_SYSTEM, "anthropic") elif "openai" in ai_type: span.set_data(SPANDATA.GEN_AI_SYSTEM, "openai") for key, attribute in DATA_FIELDS.items(): if key in all_params and all_params[key] is not None: set_data_normalized(span, attribute, all_params[key], unpack=False) _set_tools_on_span(span, all_params.get("tools")) if should_send_default_pii() and self.include_prompts: normalized_messages = [ { "role": GEN_AI_ALLOWED_MESSAGE_ROLES.USER, "content": {"type": "text", "text": prompt}, } for prompt in prompts ] scope = sentry_sdk.get_current_scope() messages_data = truncate_and_annotate_messages( normalized_messages, span, scope ) if messages_data is not None: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False, ) def on_chat_model_start(self, serialized, messages, *, run_id, **kwargs): # type: (SentryLangchainCallback, Dict[str, Any], List[List[BaseMessage]], UUID, Any) -> Any """Run when Chat Model starts running.""" with capture_internal_exceptions(): if not run_id: return all_params = kwargs.get("invocation_params", {}) all_params.update(serialized.get("kwargs", {})) model = ( all_params.get("model") or all_params.get("model_name") or all_params.get("model_id") or "" ) watched_span = self._create_span( run_id, kwargs.get("parent_run_id"), op=OP.GEN_AI_CHAT, name=f"chat {model}".strip(), origin=LangchainIntegration.origin, ) span = watched_span.span span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "chat") if model: span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model) ai_type = all_params.get("_type", "") if "anthropic" in ai_type: span.set_data(SPANDATA.GEN_AI_SYSTEM, "anthropic") elif "openai" in ai_type: span.set_data(SPANDATA.GEN_AI_SYSTEM, "openai") agent_name = _get_current_agent() if agent_name: span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name) for key, attribute in DATA_FIELDS.items(): if key in all_params and all_params[key] is not None: set_data_normalized(span, attribute, all_params[key], unpack=False) _set_tools_on_span(span, all_params.get("tools")) if should_send_default_pii() and self.include_prompts: normalized_messages = [] for list_ in messages: for message in list_: normalized_messages.append( self._normalize_langchain_message(message) ) normalized_messages = normalize_message_roles(normalized_messages) scope = sentry_sdk.get_current_scope() messages_data = truncate_and_annotate_messages( normalized_messages, span, scope ) if messages_data is not None: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False, ) def on_chat_model_end(self, response, *, run_id, **kwargs): # type: (SentryLangchainCallback, LLMResult, UUID, Any) -> Any """Run when Chat Model ends running.""" with capture_internal_exceptions(): if not run_id or run_id not in self.span_map: return span_data = self.span_map[run_id] span = span_data.span if should_send_default_pii() and self.include_prompts: set_data_normalized( span, SPANDATA.GEN_AI_RESPONSE_TEXT, [[x.text for x in list_] for list_ in response.generations], ) _record_token_usage(span, response) self._exit_span(span_data, run_id) def on_llm_end(self, response, *, run_id, **kwargs): # type: (SentryLangchainCallback, LLMResult, UUID, Any) -> Any """Run when LLM ends running.""" with capture_internal_exceptions(): if not run_id or run_id not in self.span_map: return span_data = self.span_map[run_id] span = span_data.span try: generation = response.generations[0][0] except IndexError: generation = None if generation is not None: try: response_model = generation.generation_info.get("model_name") if response_model is not None: span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model) except AttributeError: pass try: finish_reason = generation.generation_info.get("finish_reason") if finish_reason is not None: span.set_data( SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS, finish_reason ) except AttributeError: pass try: if should_send_default_pii() and self.include_prompts: tool_calls = getattr(generation.message, "tool_calls", None) if tool_calls is not None and tool_calls != []: set_data_normalized( span, SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS, tool_calls, unpack=False, ) except AttributeError: pass if should_send_default_pii() and self.include_prompts: set_data_normalized( span, SPANDATA.GEN_AI_RESPONSE_TEXT, [[x.text for x in list_] for list_ in response.generations], ) _record_token_usage(span, response) self._exit_span(span_data, run_id) def on_llm_error(self, error, *, run_id, **kwargs): # type: (SentryLangchainCallback, Union[Exception, KeyboardInterrupt], UUID, Any) -> Any """Run when LLM errors.""" self._handle_error(run_id, error) def on_chat_model_error(self, error, *, run_id, **kwargs): # type: (SentryLangchainCallback, Union[Exception, KeyboardInterrupt], UUID, Any) -> Any """Run when Chat Model errors.""" self._handle_error(run_id, error) def on_agent_finish(self, finish, *, run_id, **kwargs): # type: (SentryLangchainCallback, AgentFinish, UUID, Any) -> Any with capture_internal_exceptions(): if not run_id or run_id not in self.span_map: return span_data = self.span_map[run_id] span = span_data.span if should_send_default_pii() and self.include_prompts: set_data_normalized( span, SPANDATA.GEN_AI_RESPONSE_TEXT, finish.return_values.items() ) self._exit_span(span_data, run_id) def on_tool_start(self, serialized, input_str, *, run_id, **kwargs): # type: (SentryLangchainCallback, Dict[str, Any], str, UUID, Any) -> Any """Run when tool starts running.""" with capture_internal_exceptions(): if not run_id: return tool_name = serialized.get("name") or kwargs.get("name") or "" watched_span = self._create_span( run_id, kwargs.get("parent_run_id"), op=OP.GEN_AI_EXECUTE_TOOL, name=f"execute_tool {tool_name}".strip(), origin=LangchainIntegration.origin, ) span = watched_span.span span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "execute_tool") span.set_data(SPANDATA.GEN_AI_TOOL_NAME, tool_name) tool_description = serialized.get("description") if tool_description is not None: span.set_data(SPANDATA.GEN_AI_TOOL_DESCRIPTION, tool_description) agent_name = _get_current_agent() if agent_name: span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name) if should_send_default_pii() and self.include_prompts: set_data_normalized( span, SPANDATA.GEN_AI_TOOL_INPUT, kwargs.get("inputs", [input_str]), ) def on_tool_end(self, output, *, run_id, **kwargs): # type: (SentryLangchainCallback, str, UUID, Any) -> Any """Run when tool ends running.""" with capture_internal_exceptions(): if not run_id or run_id not in self.span_map: return span_data = self.span_map[run_id] span = span_data.span if should_send_default_pii() and self.include_prompts: set_data_normalized(span, SPANDATA.GEN_AI_TOOL_OUTPUT, output) self._exit_span(span_data, run_id) def on_tool_error(self, error, *args, run_id, **kwargs): # type: (SentryLangchainCallback, Union[Exception, KeyboardInterrupt], UUID, Any) -> Any """Run when tool errors.""" self._handle_error(run_id, error) def _extract_tokens(token_usage): # type: (Any) -> tuple[Optional[int], Optional[int], Optional[int]] if not token_usage: return None, None, None input_tokens = _get_value(token_usage, "prompt_tokens") or _get_value( token_usage, "input_tokens" ) output_tokens = _get_value(token_usage, "completion_tokens") or _get_value( token_usage, "output_tokens" ) total_tokens = _get_value(token_usage, "total_tokens") return input_tokens, output_tokens, total_tokens def _extract_tokens_from_generations(generations): # type: (Any) -> tuple[Optional[int], Optional[int], Optional[int]] """Extract token usage from response.generations structure.""" if not generations: return None, None, None total_input = 0 total_output = 0 total_total = 0 for gen_list in generations: for gen in gen_list: token_usage = _get_token_usage(gen) input_tokens, output_tokens, total_tokens = _extract_tokens(token_usage) total_input += input_tokens if input_tokens is not None else 0 total_output += output_tokens if output_tokens is not None else 0 total_total += total_tokens if total_tokens is not None else 0 return ( total_input if total_input > 0 else None, total_output if total_output > 0 else None, total_total if total_total > 0 else None, ) def _get_token_usage(obj): # type: (Any) -> Optional[Dict[str, Any]] """ Check multiple paths to extract token usage from different objects. """ possible_names = ("usage", "token_usage", "usage_metadata") message = _get_value(obj, "message") if message is not None: for name in possible_names: usage = _get_value(message, name) if usage is not None: return usage llm_output = _get_value(obj, "llm_output") if llm_output is not None: for name in possible_names: usage = _get_value(llm_output, name) if usage is not None: return usage for name in possible_names: usage = _get_value(obj, name) if usage is not None: return usage return None def _record_token_usage(span, response): # type: (Span, Any) -> None token_usage = _get_token_usage(response) if token_usage: input_tokens, output_tokens, total_tokens = _extract_tokens(token_usage) else: input_tokens, output_tokens, total_tokens = _extract_tokens_from_generations( response.generations ) if input_tokens is not None: span.set_data(SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, input_tokens) if output_tokens is not None: span.set_data(SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens) if total_tokens is not None: span.set_data(SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS, total_tokens) def _get_request_data(obj, args, kwargs): # type: (Any, Any, Any) -> tuple[Optional[str], Optional[List[Any]]] """ Get the agent name and available tools for the agent. """ agent = getattr(obj, "agent", None) runnable = getattr(agent, "runnable", None) runnable_config = getattr(runnable, "config", {}) tools = ( getattr(obj, "tools", None) or getattr(agent, "tools", None) or runnable_config.get("tools") or runnable_config.get("available_tools") ) tools = tools if tools and len(tools) > 0 else None try: agent_name = None if len(args) > 1: agent_name = args[1].get("run_name") if agent_name is None: agent_name = runnable_config.get("run_name") except Exception: pass return (agent_name, tools) def _simplify_langchain_tools(tools): # type: (Any) -> Optional[List[Any]] """Parse and simplify tools into a cleaner format.""" if not tools: return None if not isinstance(tools, (list, tuple)): return None simplified_tools = [] for tool in tools: try: if isinstance(tool, dict): if "function" in tool and isinstance(tool["function"], dict): func = tool["function"] simplified_tool = { "name": func.get("name"), "description": func.get("description"), } if simplified_tool["name"]: simplified_tools.append(simplified_tool) elif "name" in tool: simplified_tool = { "name": tool.get("name"), "description": tool.get("description"), } simplified_tools.append(simplified_tool) else: name = ( tool.get("name") or tool.get("tool_name") or tool.get("function_name") ) if name: simplified_tools.append( { "name": name, "description": tool.get("description") or tool.get("desc"), } ) elif hasattr(tool, "name"): simplified_tool = { "name": getattr(tool, "name", None), "description": getattr(tool, "description", None) or getattr(tool, "desc", None), } if simplified_tool["name"]: simplified_tools.append(simplified_tool) elif hasattr(tool, "__name__"): simplified_tools.append( { "name": tool.__name__, "description": getattr(tool, "__doc__", None), } ) else: tool_str = str(tool) if tool_str and tool_str != "": simplified_tools.append({"name": tool_str, "description": None}) except Exception: continue return simplified_tools if simplified_tools else None def _set_tools_on_span(span, tools): # type: (Span, Any) -> None """Set available tools data on a span if tools are provided.""" if tools is not None: simplified_tools = _simplify_langchain_tools(tools) if simplified_tools: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, simplified_tools, unpack=False, ) def _wrap_configure(f): # type: (Callable[..., Any]) -> Callable[..., Any] @wraps(f) def new_configure( callback_manager_cls, # type: type inheritable_callbacks=None, # type: Callbacks local_callbacks=None, # type: Callbacks *args, # type: Any **kwargs, # type: Any ): # type: (...) -> Any integration = sentry_sdk.get_client().get_integration(LangchainIntegration) if integration is None: return f( callback_manager_cls, inheritable_callbacks, local_callbacks, *args, **kwargs, ) local_callbacks = local_callbacks or [] # Handle each possible type of local_callbacks. For each type, we # extract the list of callbacks to check for SentryLangchainCallback, # and define a function that would add the SentryLangchainCallback # to the existing callbacks list. if isinstance(local_callbacks, BaseCallbackManager): callbacks_list = local_callbacks.handlers elif isinstance(local_callbacks, BaseCallbackHandler): callbacks_list = [local_callbacks] elif isinstance(local_callbacks, list): callbacks_list = local_callbacks else: logger.debug("Unknown callback type: %s", local_callbacks) # Just proceed with original function call return f( callback_manager_cls, inheritable_callbacks, local_callbacks, *args, **kwargs, ) # Handle each possible type of inheritable_callbacks. if isinstance(inheritable_callbacks, BaseCallbackManager): inheritable_callbacks_list = inheritable_callbacks.handlers elif isinstance(inheritable_callbacks, list): inheritable_callbacks_list = inheritable_callbacks else: inheritable_callbacks_list = [] if not any( isinstance(cb, SentryLangchainCallback) for cb in itertools.chain(callbacks_list, inheritable_callbacks_list) ): sentry_handler = SentryLangchainCallback( integration.max_spans, integration.include_prompts, ) if isinstance(local_callbacks, BaseCallbackManager): local_callbacks = local_callbacks.copy() local_callbacks.handlers = [ *local_callbacks.handlers, sentry_handler, ] elif isinstance(local_callbacks, BaseCallbackHandler): local_callbacks = [local_callbacks, sentry_handler] else: local_callbacks = [*local_callbacks, sentry_handler] return f( callback_manager_cls, inheritable_callbacks, local_callbacks, *args, **kwargs, ) return new_configure def _wrap_agent_executor_invoke(f): # type: (Callable[..., Any]) -> Callable[..., Any] @wraps(f) def new_invoke(self, *args, **kwargs): # type: (Any, Any, Any) -> Any integration = sentry_sdk.get_client().get_integration(LangchainIntegration) if integration is None: return f(self, *args, **kwargs) agent_name, tools = _get_request_data(self, args, kwargs) start_span_function = get_start_span_function() with start_span_function( op=OP.GEN_AI_INVOKE_AGENT, name=f"invoke_agent {agent_name}" if agent_name else "invoke_agent", origin=LangchainIntegration.origin, ) as span: _push_agent(agent_name) try: if agent_name: span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name) span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, False) _set_tools_on_span(span, tools) # Run the agent result = f(self, *args, **kwargs) input = result.get("input") if ( input is not None and should_send_default_pii() and integration.include_prompts ): normalized_messages = normalize_message_roles([input]) scope = sentry_sdk.get_current_scope() messages_data = truncate_and_annotate_messages( normalized_messages, span, scope ) if messages_data is not None: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False, ) output = result.get("output") if ( output is not None and should_send_default_pii() and integration.include_prompts ): set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output) return result finally: # Ensure agent is popped even if an exception occurs _pop_agent() return new_invoke def _wrap_agent_executor_stream(f): # type: (Callable[..., Any]) -> Callable[..., Any] @wraps(f) def new_stream(self, *args, **kwargs): # type: (Any, Any, Any) -> Any integration = sentry_sdk.get_client().get_integration(LangchainIntegration) if integration is None: return f(self, *args, **kwargs) agent_name, tools = _get_request_data(self, args, kwargs) start_span_function = get_start_span_function() span = start_span_function( op=OP.GEN_AI_INVOKE_AGENT, name=f"invoke_agent {agent_name}" if agent_name else "invoke_agent", origin=LangchainIntegration.origin, ) span.__enter__() _push_agent(agent_name) if agent_name: span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name) span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) _set_tools_on_span(span, tools) input = args[0].get("input") if len(args) >= 1 else None if ( input is not None and should_send_default_pii() and integration.include_prompts ): normalized_messages = normalize_message_roles([input]) scope = sentry_sdk.get_current_scope() messages_data = truncate_and_annotate_messages( normalized_messages, span, scope ) if messages_data is not None: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False, ) # Run the agent result = f(self, *args, **kwargs) old_iterator = result def new_iterator(): # type: () -> Iterator[Any] exc_info = (None, None, None) # type: tuple[Any, Any, Any] try: for event in old_iterator: yield event try: output = event.get("output") except Exception: output = None if ( output is not None and should_send_default_pii() and integration.include_prompts ): set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output) except Exception: exc_info = sys.exc_info() set_span_errored(span) raise finally: # Ensure cleanup happens even if iterator is abandoned or fails _pop_agent() span.__exit__(*exc_info) async def new_iterator_async(): # type: () -> AsyncIterator[Any] exc_info = (None, None, None) # type: tuple[Any, Any, Any] try: async for event in old_iterator: yield event try: output = event.get("output") except Exception: output = None if ( output is not None and should_send_default_pii() and integration.include_prompts ): set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output) except Exception: exc_info = sys.exc_info() set_span_errored(span) raise finally: # Ensure cleanup happens even if iterator is abandoned or fails _pop_agent() span.__exit__(*exc_info) if str(type(result)) == "<class 'async_generator'>": result = new_iterator_async() else: result = new_iterator() return result return new_stream def _patch_embeddings_provider(provider_class): # type: (Any) -> None """Patch an embeddings provider class with monitoring wrappers.""" if provider_class is None: return if hasattr(provider_class, "embed_documents"): provider_class.embed_documents = _wrap_embedding_method( provider_class.embed_documents ) if hasattr(provider_class, "embed_query"): provider_class.embed_query = _wrap_embedding_method(provider_class.embed_query) if hasattr(provider_class, "aembed_documents"): provider_class.aembed_documents = _wrap_async_embedding_method( provider_class.aembed_documents ) if hasattr(provider_class, "aembed_query"): provider_class.aembed_query = _wrap_async_embedding_method( provider_class.aembed_query ) def _wrap_embedding_method(f): # type: (Callable[..., Any]) -> Callable[..., Any] """Wrap sync embedding methods (embed_documents and embed_query).""" @wraps(f) def new_embedding_method(self, *args, **kwargs): # type: (Any, Any, Any) -> Any integration = sentry_sdk.get_client().get_integration(LangchainIntegration) if integration is None: return f(self, *args, **kwargs) model_name = getattr(self, "model", None) or getattr(self, "model_name", None) with sentry_sdk.start_span( op=OP.GEN_AI_EMBEDDINGS, name=f"embeddings {model_name}" if model_name else "embeddings", origin=LangchainIntegration.origin, ) as span: span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings") if model_name: span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) # Capture input if PII is allowed if ( should_send_default_pii() and integration.include_prompts and len(args) > 0 ): input_data = args[0] # Normalize to list format texts = input_data if isinstance(input_data, list) else [input_data] set_data_normalized( span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False ) result = f(self, *args, **kwargs) return result return new_embedding_method def _wrap_async_embedding_method(f): # type: (Callable[..., Any]) -> Callable[..., Any] """Wrap async embedding methods (aembed_documents and aembed_query).""" @wraps(f) async def new_async_embedding_method(self, *args, **kwargs): # type: (Any, Any, Any) -> Any integration = sentry_sdk.get_client().get_integration(LangchainIntegration) if integration is None: return await f(self, *args, **kwargs) model_name = getattr(self, "model", None) or getattr(self, "model_name", None) with sentry_sdk.start_span( op=OP.GEN_AI_EMBEDDINGS, name=f"embeddings {model_name}" if model_name else "embeddings", origin=LangchainIntegration.origin, ) as span: span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings") if model_name: span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) # Capture input if PII is allowed if ( should_send_default_pii() and integration.include_prompts and len(args) > 0 ): input_data = args[0] # Normalize to list format texts = input_data if isinstance(input_data, list) else [input_data] set_data_normalized( span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False ) result = await f(self, *args, **kwargs) return result return new_async_embedding_method
Upload File
Create Folder