X7ROOT File Manager
Current Path:
/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations
opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
sentry_sdk
/
integrations
/
??
..
??
__init__.py
(12.44 KB)
??
__pycache__
??
_asgi_common.py
(3.11 KB)
??
_wsgi_common.py
(7.38 KB)
??
aiohttp.py
(12.7 KB)
??
anthropic.py
(14.17 KB)
??
argv.py
(911 B)
??
ariadne.py
(5.7 KB)
??
arq.py
(7.7 KB)
??
asgi.py
(12.5 KB)
??
asyncio.py
(4.5 KB)
??
asyncpg.py
(6.37 KB)
??
atexit.py
(1.61 KB)
??
aws_lambda.py
(17.65 KB)
??
beam.py
(5.06 KB)
??
boto3.py
(4.31 KB)
??
bottle.py
(6.46 KB)
??
celery
??
chalice.py
(4.59 KB)
??
clickhouse_driver.py
(5.96 KB)
??
cloud_resource_context.py
(7.6 KB)
??
cohere.py
(9.18 KB)
??
dedupe.py
(1.93 KB)
??
django
??
dramatiq.py
(7.28 KB)
??
excepthook.py
(2.35 KB)
??
executing.py
(1.95 KB)
??
falcon.py
(9.28 KB)
??
fastapi.py
(4.48 KB)
??
flask.py
(8.54 KB)
??
gcp.py
(8.26 KB)
??
gnu_backtrace.py
(2.75 KB)
??
google_genai
??
gql.py
(4.69 KB)
??
graphene.py
(4.92 KB)
??
grpc
??
httpx.py
(5.94 KB)
??
huey.py
(5.32 KB)
??
huggingface_hub.py
(14.6 KB)
??
langchain.py
(39.02 KB)
??
langgraph.py
(11.56 KB)
??
launchdarkly.py
(1.89 KB)
??
litellm.py
(9.99 KB)
??
litestar.py
(11.55 KB)
??
logging.py
(13.57 KB)
??
loguru.py
(6.38 KB)
??
mcp.py
(19.58 KB)
??
modules.py
(820 B)
??
openai.py
(24.54 KB)
??
openai_agents
??
openfeature.py
(1.1 KB)
??
opentelemetry
??
otlp.py
(3.06 KB)
??
pure_eval.py
(4.5 KB)
??
pydantic_ai
??
pymongo.py
(6.23 KB)
??
pyramid.py
(7.19 KB)
??
quart.py
(7.24 KB)
??
ray.py
(5.2 KB)
??
redis
??
rq.py
(5.18 KB)
??
rust_tracing.py
(8.87 KB)
??
sanic.py
(12.66 KB)
??
serverless.py
(1.76 KB)
??
socket.py
(3.09 KB)
??
spark
??
sqlalchemy.py
(4.24 KB)
??
starlette.py
(25.62 KB)
??
starlite.py
(10.31 KB)
??
statsig.py
(1.2 KB)
??
stdlib.py
(8.76 KB)
??
strawberry.py
(13.82 KB)
??
sys_exit.py
(2.43 KB)
??
threading.py
(6.94 KB)
??
tornado.py
(7.04 KB)
??
trytond.py
(1.61 KB)
??
typer.py
(1.77 KB)
??
unleash.py
(1.03 KB)
??
unraisablehook.py
(1.71 KB)
??
wsgi.py
(10.56 KB)
Editing: anthropic.py
from functools import wraps from typing import TYPE_CHECKING import sentry_sdk from sentry_sdk.ai.monitoring import record_token_usage from sentry_sdk.ai.utils import ( set_data_normalized, normalize_message_roles, truncate_and_annotate_messages, get_start_span_function, ) from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS from sentry_sdk.integrations import _check_minimum_version, DidNotEnable, Integration from sentry_sdk.scope import should_send_default_pii from sentry_sdk.tracing_utils import set_span_errored from sentry_sdk.utils import ( capture_internal_exceptions, event_from_exception, package_version, safe_serialize, ) try: try: from anthropic import NotGiven except ImportError: NotGiven = None try: from anthropic import Omit except ImportError: Omit = None from anthropic.resources import AsyncMessages, Messages if TYPE_CHECKING: from anthropic.types import MessageStreamEvent except ImportError: raise DidNotEnable("Anthropic not installed") if TYPE_CHECKING: from typing import Any, AsyncIterator, Iterator from sentry_sdk.tracing import Span class AnthropicIntegration(Integration): identifier = "anthropic" origin = f"auto.ai.{identifier}" def __init__(self, include_prompts=True): # type: (AnthropicIntegration, bool) -> None self.include_prompts = include_prompts @staticmethod def setup_once(): # type: () -> None version = package_version("anthropic") _check_minimum_version(AnthropicIntegration, version) Messages.create = _wrap_message_create(Messages.create) AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) def _capture_exception(exc): # type: (Any) -> None set_span_errored() event, hint = event_from_exception( exc, client_options=sentry_sdk.get_client().options, mechanism={"type": "anthropic", "handled": False}, ) sentry_sdk.capture_event(event, hint=hint) def _get_token_usage(result): # type: (Messages) -> tuple[int, int] """ Get token usage from the Anthropic response. """ input_tokens = 0 output_tokens = 0 if hasattr(result, "usage"): usage = result.usage if hasattr(usage, "input_tokens") and isinstance(usage.input_tokens, int): input_tokens = usage.input_tokens if hasattr(usage, "output_tokens") and isinstance(usage.output_tokens, int): output_tokens = usage.output_tokens return input_tokens, output_tokens def _collect_ai_data(event, model, input_tokens, output_tokens, content_blocks): # type: (MessageStreamEvent, str | None, int, int, list[str]) -> tuple[str | None, int, int, list[str]] """ Collect model information, token usage, and collect content blocks from the AI streaming response. """ with capture_internal_exceptions(): if hasattr(event, "type"): if event.type == "message_start": usage = event.message.usage input_tokens += usage.input_tokens output_tokens += usage.output_tokens model = event.message.model or model elif event.type == "content_block_start": pass elif event.type == "content_block_delta": if hasattr(event.delta, "text"): content_blocks.append(event.delta.text) elif hasattr(event.delta, "partial_json"): content_blocks.append(event.delta.partial_json) elif event.type == "content_block_stop": pass elif event.type == "message_delta": output_tokens += event.usage.output_tokens return model, input_tokens, output_tokens, content_blocks def _set_input_data(span, kwargs, integration): # type: (Span, dict[str, Any], AnthropicIntegration) -> None """ Set input data for the span based on the provided keyword arguments for the anthropic message creation. """ messages = kwargs.get("messages") if ( messages is not None and len(messages) > 0 and should_send_default_pii() and integration.include_prompts ): normalized_messages = [] for message in messages: if ( message.get("role") == "user" and "content" in message and isinstance(message["content"], (list, tuple)) ): for item in message["content"]: if item.get("type") == "tool_result": normalized_messages.append( { "role": "tool", "content": { "tool_use_id": item.get("tool_use_id"), "output": item.get("content"), }, } ) else: normalized_messages.append(message) role_normalized_messages = normalize_message_roles(normalized_messages) scope = sentry_sdk.get_current_scope() messages_data = truncate_and_annotate_messages( role_normalized_messages, span, scope ) if messages_data is not None: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False ) set_data_normalized( span, SPANDATA.GEN_AI_RESPONSE_STREAMING, kwargs.get("stream", False) ) kwargs_keys_to_attributes = { "max_tokens": SPANDATA.GEN_AI_REQUEST_MAX_TOKENS, "model": SPANDATA.GEN_AI_REQUEST_MODEL, "temperature": SPANDATA.GEN_AI_REQUEST_TEMPERATURE, "top_k": SPANDATA.GEN_AI_REQUEST_TOP_K, "top_p": SPANDATA.GEN_AI_REQUEST_TOP_P, } for key, attribute in kwargs_keys_to_attributes.items(): value = kwargs.get(key) if value is not None and _is_given(value): set_data_normalized(span, attribute, value) # Input attributes: Tools tools = kwargs.get("tools") if tools is not None and _is_given(tools) and len(tools) > 0: set_data_normalized( span, SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools) ) def _set_output_data( span, integration, model, input_tokens, output_tokens, content_blocks, finish_span=False, ): # type: (Span, AnthropicIntegration, str | None, int | None, int | None, list[Any], bool) -> None """ Set output data for the span based on the AI response.""" span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, model) if should_send_default_pii() and integration.include_prompts: output_messages = { "response": [], "tool": [], } # type: (dict[str, list[Any]]) for output in content_blocks: if output["type"] == "text": output_messages["response"].append(output["text"]) elif output["type"] == "tool_use": output_messages["tool"].append(output) if len(output_messages["tool"]) > 0: set_data_normalized( span, SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS, output_messages["tool"], unpack=False, ) if len(output_messages["response"]) > 0: set_data_normalized( span, SPANDATA.GEN_AI_RESPONSE_TEXT, output_messages["response"] ) record_token_usage( span, input_tokens=input_tokens, output_tokens=output_tokens, ) if finish_span: span.__exit__(None, None, None) def _sentry_patched_create_common(f, *args, **kwargs): # type: (Any, *Any, **Any) -> Any integration = kwargs.pop("integration") if integration is None: return f(*args, **kwargs) if "messages" not in kwargs: return f(*args, **kwargs) try: iter(kwargs["messages"]) except TypeError: return f(*args, **kwargs) model = kwargs.get("model", "") span = get_start_span_function()( op=OP.GEN_AI_CHAT, name=f"chat {model}".strip(), origin=AnthropicIntegration.origin, ) span.__enter__() _set_input_data(span, kwargs, integration) result = yield f, args, kwargs with capture_internal_exceptions(): if hasattr(result, "content"): input_tokens, output_tokens = _get_token_usage(result) content_blocks = [] for content_block in result.content: if hasattr(content_block, "to_dict"): content_blocks.append(content_block.to_dict()) elif hasattr(content_block, "model_dump"): content_blocks.append(content_block.model_dump()) elif hasattr(content_block, "text"): content_blocks.append({"type": "text", "text": content_block.text}) _set_output_data( span=span, integration=integration, model=getattr(result, "model", None), input_tokens=input_tokens, output_tokens=output_tokens, content_blocks=content_blocks, finish_span=True, ) # Streaming response elif hasattr(result, "_iterator"): old_iterator = result._iterator def new_iterator(): # type: () -> Iterator[MessageStreamEvent] model = None input_tokens = 0 output_tokens = 0 content_blocks = [] # type: list[str] for event in old_iterator: model, input_tokens, output_tokens, content_blocks = ( _collect_ai_data( event, model, input_tokens, output_tokens, content_blocks ) ) yield event _set_output_data( span=span, integration=integration, model=model, input_tokens=input_tokens, output_tokens=output_tokens, content_blocks=[{"text": "".join(content_blocks), "type": "text"}], finish_span=True, ) async def new_iterator_async(): # type: () -> AsyncIterator[MessageStreamEvent] model = None input_tokens = 0 output_tokens = 0 content_blocks = [] # type: list[str] async for event in old_iterator: model, input_tokens, output_tokens, content_blocks = ( _collect_ai_data( event, model, input_tokens, output_tokens, content_blocks ) ) yield event _set_output_data( span=span, integration=integration, model=model, input_tokens=input_tokens, output_tokens=output_tokens, content_blocks=[{"text": "".join(content_blocks), "type": "text"}], finish_span=True, ) if str(type(result._iterator)) == "<class 'async_generator'>": result._iterator = new_iterator_async() else: result._iterator = new_iterator() else: span.set_data("unknown_response", True) span.__exit__(None, None, None) return result def _wrap_message_create(f): # type: (Any) -> Any def _execute_sync(f, *args, **kwargs): # type: (Any, *Any, **Any) -> Any gen = _sentry_patched_create_common(f, *args, **kwargs) try: f, args, kwargs = next(gen) except StopIteration as e: return e.value try: try: result = f(*args, **kwargs) except Exception as exc: _capture_exception(exc) raise exc from None return gen.send(result) except StopIteration as e: return e.value @wraps(f) def _sentry_patched_create_sync(*args, **kwargs): # type: (*Any, **Any) -> Any integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) kwargs["integration"] = integration try: return _execute_sync(f, *args, **kwargs) finally: span = sentry_sdk.get_current_span() if span is not None and span.status == SPANSTATUS.ERROR: with capture_internal_exceptions(): span.__exit__(None, None, None) return _sentry_patched_create_sync def _wrap_message_create_async(f): # type: (Any) -> Any async def _execute_async(f, *args, **kwargs): # type: (Any, *Any, **Any) -> Any gen = _sentry_patched_create_common(f, *args, **kwargs) try: f, args, kwargs = next(gen) except StopIteration as e: return await e.value try: try: result = await f(*args, **kwargs) except Exception as exc: _capture_exception(exc) raise exc from None return gen.send(result) except StopIteration as e: return e.value @wraps(f) async def _sentry_patched_create_async(*args, **kwargs): # type: (*Any, **Any) -> Any integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) kwargs["integration"] = integration try: return await _execute_async(f, *args, **kwargs) finally: span = sentry_sdk.get_current_span() if span is not None and span.status == SPANSTATUS.ERROR: with capture_internal_exceptions(): span.__exit__(None, None, None) return _sentry_patched_create_async def _is_given(obj): # type: (Any) -> bool """ Check for givenness safely across different anthropic versions. """ if NotGiven is not None and isinstance(obj, NotGiven): return False if Omit is not None and isinstance(obj, Omit): return False return True
Upload File
Create Folder