This page is generated from the following source files:
The pyTelegramBotAPI library implements a modular architecture designed to provide both synchronous and asynchronous interfaces for the Telegram Bot API. This architecture separates concerns across distinct layers: bot instance management, API communication, type serialization, state management, and handler processing.
The library follows a layered architecture pattern with clear separation between the bot logic, API communication, and data handling layers. The design supports both synchronous (TeleBot) and asynchronous (AsyncTeleBot) implementations sharing common type definitions and storage abstractions.
正在加载图表渲染器...
Architecture Key Points:
Dual Bot Implementation: The library provides both synchronous TeleBot and asynchronous AsyncTeleBot classes that share similar APIs but differ in execution model. The async version uses asyncio.create_task for concurrent handler execution (telebot/async_telebot.py:509-510).
Unified Type System: Both bot implementations use the same types.py module for deserializing Telegram API responses, ensuring consistency across sync and async code paths (telebot/types.py:118-212).
Pluggable State Storage: State management is abstracted through StateStorageBase, allowing different backend implementations (memory, file, database) without modifying bot logic (telebot/storage/base_storage.py:1-100).
Filter-Based Routing: Handler registration uses a filter system where messages are routed based on criteria like text content, chat ID, or conversation state (telebot/asyncio_filters.py:190-212).
The TeleBot class in telebot/__init__.py serves as the primary interface for synchronous bot operations. It manages the complete lifecycle of a Telegram bot including update retrieval, handler execution, and state persistence.
Responsibility Boundary:
Key Entry Points:
| Method | Purpose | Location |
|---|---|---|
get_updates() | Retrieves updates via long polling | telebot/init.py:630-663 |
enable_saving_states() | Configures state persistence | telebot/init.py:328-340 |
process_new_updates() | Processes incoming update objects | Internal handler dispatch |
State Management Integration:
The TeleBot class integrates state management through the enable_saving_states method, which configures a StatePickleStorage instance for persisting conversation states:
python1def enable_saving_states(self, filename: Optional[str]="./.state-save/states.pkl"): 2 """ 3 Enable saving states (by default saving disabled) 4 """ 5 self.current_states = StatePickleStorage(file_path=filename) 6 self.current_states.create_dir()
This implementation at telebot/init.py:328-340 demonstrates the library's approach to optional state persistence—states are stored in memory by default but can be persisted to disk.
Update Retrieval Mechanism:
The get_updates method implements long polling with configurable timeout parameters:
python1def get_updates(self, offset: Optional[int]=None, limit: Optional[int]=None, 2 timeout: Optional[int]=20, allowed_updates: Optional[List[str]]=None, 3 long_polling_timeout: int=20) -> List[types.Update]: 4 json_updates = apihelper.get_updates( 5 self.token, offset=offset, limit=limit, timeout=timeout, 6 allowed_updates=allowed_updates, 7 long_polling_timeout=long_polling_timeout) 8 return [types.Update.de_json(ju) for ju in json_updates]
This method at telebot/init.py:630-663 bridges the low-level API helper with the type system, converting raw JSON responses into typed Update objects.
The AsyncTeleBot class provides an asynchronous interface using Python's asyncio framework. It maintains API compatibility with TeleBot while enabling concurrent handler execution.
Responsibility Boundary:
Async Task Creation:
The async implementation uses a helper method for creating concurrent tasks:
python1def _loop_create_task(coro): 2 return asyncio.create_task(coro)
This pattern at telebot/async_telebot.py:509-510 allows handlers to execute concurrently rather than sequentially, improving throughput for I/O-bound operations.
Update Listener Pattern:
AsyncTeleBot supports an update listener mechanism for monitoring all incoming updates:
python1def set_update_listener(self, func: Awaitable): 2 """ 3 Update listener is a function that gets any update. 4 """ 5 self.update_listener.append(func)
This implementation at telebot/async_telebot.py:933-951 enables logging, analytics, or preprocessing of all updates before handler dispatch.
The Update class in types.py is the central data structure for all incoming Telegram events. It encapsulates all possible update types defined by the Telegram Bot API.
Responsibility Boundary:
Class Definition:
The Update class at telebot/types.py:118-212 defines the complete structure:
python1class Update(JsonDeserializable): 2 """ 3 This object represents an incoming update. 4 At most one of the optional parameters can be present in any given update. 5 """ 6 @classmethod 7 def de_json(cls, json_string): 8 if json_string is None: return None 9 obj = cls.check_json(json_string, dict_copy=False) 10 update_id = obj['update_id'] 11 message = Message.de_json(obj.get('message')) 12 edited_message = Message.de_json(obj.get('edited_message')) 13 callback_query = CallbackQuery.de_json(obj.get('callback_query')) 14 # ... additional fields 15 return cls(update_id, message, edited_message, ...)
Supported Update Types:
| Field | Type | Description |
|---|---|---|
message | Message | New incoming message |
edited_message | Message | Edited message |
callback_query | CallbackQuery | Callback button press |
inline_query | InlineQuery | Inline mode query |
poll | Poll | Poll state update |
my_chat_member | ChatMemberUpdated | Bot's chat member status |
business_message | Message | Business account message |
ChatMemberUpdated Type:
The ChatMemberUpdated class handles member status changes with a useful difference property:
python1@property 2def difference(self) -> Dict[str, List]: 3 """ 4 Get the difference between old_chat_member and new_chat_member 5 as a dict in format {'parameter': [old_value, new_value]} 6 """ 7 old: Dict = self.old_chat_member.__dict__ 8 new: Dict = self.new_chat_member.__dict__ 9 dif = {} 10 for key in new: 11 if key == 'user': continue 12 if new[key] != old[key]: 13 dif[key] = [old[key], new[key]] 14 return dif
This implementation at telebot/types.py:279-335 provides a convenient way to detect what changed in a member's status without manually comparing all fields.
The apihelper.py module provides low-level HTTP communication with the Telegram Bot API using synchronous requests.
Responsibility Boundary:
get_updates Implementation:
python1def get_updates(token, offset=None, limit=None, timeout=None, 2 allowed_updates=None, long_polling_timeout=None): 3 method_url = r'getUpdates' 4 payload = {} 5 if offset: 6 payload['offset'] = offset 7 if limit: 8 payload['limit'] = limit 9 if timeout: 10 payload['timeout'] = timeout 11 payload['long_polling_timeout'] = long_polling_timeout if long_polling_timeout else LONG_POLLING_TIMEOUT 12 if allowed_updates is not None: 13 payload['allowed_updates'] = json.dumps(allowed_updates) 14 return _make_request(token, method_url, params=payload)
This implementation at telebot/apihelper.py:327-339 demonstrates the pattern used throughout the module: build a payload dictionary, serialize complex types to JSON, and delegate to _make_request.
API Method Examples:
The module follows a consistent pattern for all API methods:
python1def get_chat(token, chat_id): 2 method_url = r'getChat' 3 payload = {'chat_id': chat_id} 4 return _make_request(token, method_url, params=payload)
This simple implementation at telebot/apihelper.py:362-385 shows how most API methods are thin wrappers around _make_request.
The asyncio_helper.py module provides async equivalents using aiohttp for non-blocking HTTP requests.
Responsibility Boundary:
Data Preparation for Multipart Requests:
python1def _prepare_data(params=None, files=None): 2 """ 3 Adds the parameters and files to the request. 4 """ 5 data = aiohttp.formdata.FormData(quote_fields=False) 6 7 if params: 8 for key, value in params.items(): 9 data.add_field(key, str(value)) 10 if files: 11 for key, f in files.items(): 12 if isinstance(f, tuple): 13 if len(f) == 2: 14 file_name, file = f 15 if isinstance(file, types.InputFile): 16 file = file.file 17 elif isinstance(f, types.InputFile): 18 file_name = f.file_name 19 file = f.file 20 data.add_field(key, file, filename=file_name) 21 return data
This implementation at telebot/asyncio_helper.py:115-145 handles the complexity of preparing multipart form data for file uploads, supporting both tuple format and InputFile objects.
The handler backend system processes incoming updates through a pipeline of registered handlers and optional middleware.
CancelUpdate Mechanism:
The CancelUpdate class provides a way for middleware to halt update processing:
python1class CancelUpdate: 2 """ 3 Class for canceling updates. 4 Just return instance of this class 5 in middleware to skip update. 6 Update will skip handler and execution 7 of post_process in middlewares. 8 """ 9 def __init__(self) -> None: 10 pass
This implementation at telebot/handler_backends.py:226-235 allows middleware to short-circuit the handler pipeline by returning a CancelUpdate instance.
Filters determine which handler should process a given update based on message content or state.
TextMatchFilter:
python1class TextMatchFilter(AdvancedCustomFilter): 2 """ 3 Filter to check Text message. 4 """ 5 key = 'text' 6 7 async def check(self, message, text): 8 if isinstance(text, TextFilter): 9 return await text.check(message) 10 elif type(text) is list: 11 return message.text in text 12 else: 13 return text == message.text
This implementation at telebot/asyncio_filters.py:190-212 supports exact matching, list membership, and complex TextFilter patterns.
StateFilter for FSM:
The StateFilter enables conversation state-based routing:
python1class StateFilter(AdvancedCustomFilter): 2 """ 3 Filter to check state. 4 """ 5 def __init__(self, bot): 6 self.bot = bot 7 8 key = 'state' 9 10 async def check(self, message, text): 11 chat_id, user_id, business_connection_id, bot_id, message_thread_id = \ 12 resolve_context(message, self.bot.bot_id) 13 14 if isinstance(text, list): 15 new_text = [] 16 for i in text: 17 if isinstance(i, State): i = i.name 18 new_text.append(i) 19 text = new_text 20 elif isinstance(text, State): 21 text = text.name 22 23 user_state = await self.bot.current_states.get_state( 24 chat_id=chat_id, user_id=user_id, 25 business_connection_id=business_connection_id, 26 bot_id=bot_id, message_thread_id=message_thread_id 27 ) 28 29 if text == "*" and user_state is not None: 30 return True 31 if user_state == text: 32 return True 33 elif type(text) is list and user_state in text: 34 return True 35 return False
This implementation at telebot/asyncio_filters.py:380-431 demonstrates the integration between filters and the state storage system, supporting wildcard matching ("*") and list-based state matching.
The StateStorageBase class defines the contract for state storage implementations.
Responsibility Boundary:
Core Methods:
python1class StateStorageBase: 2 def set_data(self, chat_id, user_id, key, value, 3 business_connection_id=None, message_thread_id=None, bot_id=None): 4 """ 5 Set data for a user in a particular chat. 6 """ 7 raise NotImplementedError 8 9 def get_data(self, chat_id, user_id): 10 """ 11 Get data for a user in a particular chat. 12 """ 13 raise NotImplementedError 14 15 def set_state(self, chat_id, user_id, state, 16 business_connection_id=None, message_thread_id=None, bot_id=None): 17 """ 18 Set state for a particular user. 19 """ 20 raise NotImplementedError
This interface at telebot/storage/base_storage.py:1-100 supports multi-tenancy through bot_id, business connection isolation, and thread-specific states.
Key Generation:
The _get_key method generates unique storage keys:
python1def _get_key(self, chat_id: int, user_id: int, prefix: str, separator: str, 2 business_connection_id: str = None, message_thread_id: int = None, 3 bot_id: int = None) -> str: 4 """ 5 Convert parameters to a key. 6 """ 7 params = [prefix] 8 if bot_id: 9 params.append(str(bot_id)) 10 if business_connection_id: 11 params.append(business_connection_id) 12 if message_thread_id: 13 params.append(str(message_thread_id)) 14 params.append(str(chat_id)) 15 params.append(str(user_id)) 16 return separator.join(params)
This implementation ensures unique keys for different contexts while maintaining a predictable structure.
The StateDataContext class provides a context manager for atomic state updates:
python1class StateDataContext: 2 """ 3 Class for data. 4 """ 5 def __init__(self, obj, chat_id, user_id, business_connection_id=None, 6 message_thread_id=None, bot_id=None): 7 self.obj = obj 8 res = obj.get_data(chat_id=chat_id, user_id=user_id, 9 business_connection_id=business_connection_id, 10 message_thread_id=message_thread_id, bot_id=bot_id) 11 self.data = copy.deepcopy(res) 12 # ... store context parameters 13 14 def __enter__(self): 15 return self.data 16 17 def __exit__(self, exc_type, exc_val, exc_tb): 18 return self.obj.save(self.chat_id, self.user_id, self.data, 19 self.business_connection_id, self.message_thread_id, self.bot_id)
This implementation at telebot/storage/base_storage.py:102-142 ensures that state modifications are automatically persisted when exiting the context, even if exceptions occur.
States are defined using the StatesGroup pattern:
python1class MyStates(StatesGroup): 2 name = State() 3 age = State() 4 color = State() 5 hobby = State()
This example at examples/asynchronous_telebot/custom_states.py:13-17 shows how conversation flows are structured into logical states.
The CallbackData class simplifies handling inline keyboard callbacks by providing structured data encoding.
Responsibility Boundary:
CallbackDataFilter:
python1class CallbackDataFilter: 2 """ 3 Filter for CallbackData. 4 """ 5 def __init__(self, factory, config: typing.Dict[str, str]): 6 self.config = config 7 self.factory = factory 8 9 def check(self, query) -> bool: 10 """ 11 Checks if query.data appropriates to specified config 12 """ 13 try: 14 data = self.factory.parse(query.data) 15 except ValueError: 16 return False 17 18 for key, value in self.config.items(): 19 if isinstance(value, (list, tuple, set, frozenset)): 20 if data.get(key) not in value: 21 return False 22 elif data.get(key) != value: 23 return False 24 return True
This implementation at telebot/callback_data.py:35-66 supports matching against single values or collections.
CallbackData Factory:
python1class CallbackData: 2 """ 3 Callback data factory 4 This class will help you to work with CallbackQuery 5 """ 6 def __init__(self, *parts, prefix: str, sep=':'): 7 if not isinstance(prefix, str): 8 raise TypeError(f'Prefix must be instance of str not {type(prefix).__name__}') 9 if not prefix: 10 raise ValueError("Prefix can't be empty") 11 if sep in prefix: 12 raise ValueError(f"Separator {sep!r} can't be used in prefix") 13 self.prefix = prefix 14 self.sep = sep 15 self._part_names = parts 16 17 def new(self, *args, **kwargs) -> str: 18 """Generate callback data""" 19 data = [self.prefix] 20 for part in self._part_names: 21 value = kwargs.pop(part, None) 22 # ... value resolution logic 23 data.append(value) 24 callback_data = self.sep.join(data) 25 if len(callback_data.encode()) > 64: 26 raise ValueError('Resulted callback data is too long!') 27 return callback_data
This implementation at telebot/callback_data.py:69-124 enforces Telegram's 64-byte limit on callback data.
The library supports running multiple bots from a single application through webhook-based routing.
Flask Multi-Bot Example:
python1from flask import Flask, request, abort 2from telebot import TeleBot, types, util 3 4main_bot = TeleBot(config.MAIN_BOT_TOKEN) 5app = Flask(__name__) 6tokens = {config.MAIN_BOT_TOKEN: True} 7 8@app.route(f"/{config.WEBHOOK_PATH}/<token>", methods=['POST']) 9def webhook(token: str): 10 if not tokens.get(token): 11 return abort(404) 12 13 json_string = request.get_data().decode('utf-8') 14 update = types.Update.de_json(json_string) 15 if token == main_bot.token: 16 main_bot.process_new_updates([update]) 17 return '' 18 19 from_update_bot = TeleBot(token) 20 register_handlers(from_update_bot) 21 from_update_bot.process_new_updates([update]) 22 return ''
This implementation at examples/multibot/main.py:1-48 demonstrates dynamic bot instantiation based on webhook URL tokens.
The following sequence diagram illustrates the end-to-end flow of update processing in the pyTelegramBotAPI library:
正在加载图表渲染器...
Data Flow Key Points:
Long Polling Initiation: The bot initiates long polling via apihelper.get_updates() which maintains a persistent connection to Telegram's servers (telebot/apihelper.py:327-339).
JSON Deserialization: Raw JSON responses are converted to typed Update objects through the de_json class method pattern (telebot/types.py:118-212).
Filter Evaluation: Each registered handler's filters are evaluated against the update, with state filters querying the storage backend (telebot/asyncio_filters.py:380-431).
Handler Execution: Matching handlers are executed with the update object, and may modify state through the storage interface (telebot/storage/base_storage.py:1-100).
The following diagram shows the dependency relationships between core modules:
正在加载图表渲染器...
Dependency Key Points:
Shared Type System: Both sync and async implementations depend on types.py for type definitions, ensuring consistent behavior.
Storage Abstraction: State storage is a dependency of both bot classes and the filter system, enabling stateful conversations.
API Helper Separation: Sync and async API helpers are independent, allowing the library to be used with or without asyncio support.
| Decision | Rationale | Trade-offs |
|---|---|---|
| Dual Bot Classes | Maintains clear separation between sync and async code paths; allows each implementation to be optimized independently | Code duplication between TeleBot and AsyncTeleBot; requires maintaining two similar APIs |
| JsonDeserializable Pattern | Provides consistent interface for converting API responses to Python objects; enables type hints and IDE support | Manual implementation of de_json for each type; potential for deserialization bugs |
| Abstract State Storage | Enables pluggable storage backends (memory, file, Redis, etc.); allows applications to choose appropriate persistence strategy | Additional abstraction layer; storage implementations must handle concurrency |
| Filter-Based Routing | Declarative handler registration; composable filtering logic; supports custom filter types | Filter evaluation overhead; complex filter chains can be hard to debug |
| CallbackData Factory | Type-safe callback data generation; automatic length validation; structured parsing | 64-byte limit constrains data complexity; separator conflicts require careful design |
| Middleware Pipeline | Enables cross-cutting concerns (logging, auth, i18n); non-invasive request preprocessing | Execution order matters; middleware bugs can affect all handlers |
| Context-Based State | Atomic state updates via context managers; automatic persistence on exit | Deep copy overhead for large state objects; exception handling complexity |
| Multi-Bot Support | Single application can manage multiple bots; efficient resource utilization | Webhook-based only; requires external routing layer |
| Technology | Purpose | Selection Rationale | Alternatives |
|---|---|---|---|
| requests | Sync HTTP client | Mature, well-documented, extensive feature set | httpx, urllib3 |
| aiohttp | Async HTTP client | Native asyncio support, streaming uploads | httpx async, htpx |
| dataclasses | Type definitions | Type hints, immutability options, IDE support | NamedTuple, custom classes |
| asyncio | Async runtime | Standard library, wide adoption, task groups | trio, curio |
| pickle | State serialization | Built-in, handles arbitrary Python objects | JSON, msgpack, protobuf |
| Flask | Webhook server (examples) | Lightweight, extensive documentation, easy deployment | FastAPI, aiohttp server |
| aiohttp server | Async webhook server | Same library as HTTP client, native async | Starlette, Sanic |
| contextvars | Request context | Thread-safe context in async code | Thread-local storage |
State storage can be configured through the enable_saving_states method or by passing a storage instance directly:
python1# File-based state persistence 2bot = TeleBot(token) 3bot.enable_saving_states("./.state-save/states.pkl") 4 5# Custom storage backend 6from telebot.storage import StateMemoryStorage 7bot = TeleBot(token, state_storage=StateMemoryStorage())
The get_updates method accepts several configuration parameters:
| Parameter | Default | Description |
|---|---|---|
offset | None | Identifier of first update to return |
limit | None | Max updates to retrieve (1-100) |
timeout | 20 | Request connection timeout |
long_polling_timeout | 20 | Long polling timeout in seconds |
allowed_updates | None | List of update types to receive |
For webhook-based operation, the bot must be configured with a webhook URL:
python1bot.delete_webhook() 2bot.set_webhook(f"{WEBHOOK_HOST}/{WEBHOOK_PATH}/{token}")
The webhook approach is demonstrated in the multi-bot examples at examples/multibot/main.py:1-48 and examples/asynchronous_telebot/multibot/main.py:1-56.