Issue
I use the following script to listen for new messages from public telegram channels and groups.
import configparser
from telethon.errors import SessionPasswordNeededError
from telethon import TelegramClient, events, sync
from telethon.tl.functions.messages import (GetHistoryRequest)
from telethon.tl.types import (
PeerChannel
)
api_id = 'xxxxxx'
api_hash = 'xxxxxxxxxxxxxxxxxxxxxxx'
#target channels that you want to listen to:
input_channels = ('https://t.me/xxxxxx','https://t.me/xxxx','https://t.me/xxxxxxx')
#create a client
client = TelegramClient('anon', api_id, api_hash)
# Listen to messages from target channels
@client.on(events.NewMessage(chats=input_channels))
async def newMessageListener(event):
# Get message text
newMessage = event.message.message
print(newMessage)
with client:
client.run_until_disconnected()
When a channel is closed I get the following error: ValueError: No user has "closed_channel_name" as username and I stop receiving any data.
Is there a way to identify invalid channels?
So far I have found the following which could identify a valid channel but there might be a better way:
client.start()
result = client.get_entity('https://t.me/xxxxxx')
Solution
The following works for catching errors (inside a coroutine) on telethon
import asyncio
from telethon import TelegramClient, events
session_name = 'anon'
api_id = 'xxxxxx'
api_hash = 'xxxxxxxxxxxxxxxxxxxxx'
chat_list = ('https://t.me/xxxxxxxx', 'https://t.me/xxxxxxxxxx')
async def main():
async with TelegramClient(session_name, api_id, api_hash) as client:
@client.on(events.NewMessage(chats=chat_list))
async def handler(event):
print(event.message.message)
await client.run_until_disconnected()
def custom_exception_handler(loop, context):
# first, handle with default handler
loop.default_exception_handler(context)
exception = context.get('exception')
if isinstance(exception, ValueError):
print(context['exception'])
loop.stop()
loop = asyncio.get_event_loop()
# Set custom handler
loop.set_exception_handler(custom_exception_handler)
loop.create_task(main())
loop.run_forever()
Answered By - Stamatis Tiniakos
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.