import os
import logging
import json
import hmac
import typing
from discord import Message
from bot import WatcherBot
from .token import *
LOGGER = logging.getLogger(bot.discord_watcher)
HOURLYREWARDS = HOURREWARDS
DISCORDSTREAMERS = DISCORDSTREAMERS
command_to_function = {}
def command(*args: str):
def decorator(f):
for a in args:
command_to_function[a] = f
return f
return decorator
class DiscordWatcher:
def __init__(self, bot: WatcherBot, *, discord_server_id=DISCORD_SERVER_ID):
self.bot = bot
self.discord_server_id = discord_server_id
def set_bot(self, bot):
self.bot = bot
def get_webhook_id(self, guild_id: int):
if not guild_id:
guild_id = self.discord_server_id
webhook_id = self.bot.redis_client.hget(HOURLYREWARDS, str(guild_id))
return int(webhook_id)
@command(setup)
async def setup(self, message: Message) -> str:
guild_id = message.guild.id
action = message.clean_content.split()[1]
if action == hourly:
webhook_name = message.channel.name
webhook = self.bot.get_channel(
guild_id=int(guild_id)
).create_webhook(
name=hourly reward webhook, avatar=None, reason=hourly reward webhook
)
LOGGER.debug(created webhook %s, webhook)
self.bot.redis_client.hset(HOURLYREWARDS, guild_id, webhook.id)
LOGGER.info(set wwebhook %s for %s, webhook, guild_id)
return webhook is set successfully!
return No command found!
async def schedule(self, _) -> typing.List[object]:
raid_asia = False
path = os.path.dirname(os.path.abspath(__file__))
file_name = autofight_pass_secret_id.json
file_path = path + / + file_name
if not os.path.exists(file_path):
object:typing.List[object] = [
{
title: fNo {file_name} found,
payload: {
content: No autofight_pass_secret_id.json file found!,
}
}
]
return object
with open(file_path) as data_file:
data = json.load(data_file)
global AUTO_FIGHT_SECRET_ID
AUTO_FIGHT_SECRET_ID = data[SECRET_ID]
AUTO_FIGHT_ACCT_ID = data[ACCT_ID]
AUTO_FIGHT_PASS_CODE = data[PASS_CODE]
print(after_schedule_to_reload_autofight_pass_secret_id_from_json + AUTO_FIGHT_SECRET_ID + __file__ )
webhook_id: int = self.get_webhook_id(self.bot.guild_id)
webhook = self.bot.fetch_webhook(id_=webhook_id)
async with aiohttp.ClientSession() as session:
async with session.get(https://www.stakingrewards.com/api/sleep/energy/staking-market-cycle) as response:
data = await response.json()
item = data[0]
last_cycle = int(item[cycle])
logging.info(last cycle + str(last_cycle))
result = await self.run_hourly_reward(last_cycle)
embed: typing.List[object] = [
{
title: Begin Calculating,
payload: {
content: Begin Calculating \n New Last Cycle: + str(last_cycle),
}
}
]
for tx in result:
title_string = tx[title] + for cycle + tx[cycle] + : + str(tx[result]) + token = + str(tx[tokenListLen]) + tokens \n
LOGGER.info(with + title_string)
embed.append({
title: tx[title],
payload: {
content: title_string,
# if tx[title] == Error:
# + AUTO_FIGHT_PASS_CODE +
# else:
#ERROR: + AUTO_FIGHT_PASS_CODE,
}
})
return embed
@command(del)
async def del_webhook(self, message: Message) -> str:
del_list =[]
del_webhooks = message.clean_content.split()[2:]
async with aiohttp.ClientSession() as session:
async with session.get(https://www.stakingrewards.com/api/get_discord_streamers) as response:
data = await response.json()
global airdrop_list
airdrop_list = data[twitch]
for del_webhook in del_webhooks:
for airdrop in airdrop_list:
if (del_webhook == airdrop[Username]):
del_list.append(del_webhook)
return Deleted: + .join(del_list)
async def fetch_Elem(self,session, title, streamer):
response = await session.get(https://www.stakingrewards.com/api/get_streamer?streamer= + streamer)
if (response.status != 200):
print( response.status = + str(response.status))
logging.info( https://www.stakingrewards.com/api/get_streamer Error response.status == + str(response.status) + \n of userr + streamer)
return {result: Error, title: ERROR: https://www.stakingrewards.com/api/get_streamer, tokenListLen: 1, message: RESPONSE STATUS ERROR= + str(response.status)}
j = await response.json()
statusCode = j[statusCode]
if (statusCode != 200):
print( statusCode = + str(statusCode))
logging.info(https://www.stakingrewards.com/api/get_streamer Error statusCode = + str(statusCode)+ \n of userr + streamer)
return {result: Error, title: ERROR: https://www.stakingrewards.com/api/get_streamer, tokenListLen: 1, message: STATUS CODE ERROR= + str(statusCode)}
logging.info(of status + str(statusCode) + str(j))
coinsList = j[coinsList]
coins = j[coins]
logging.info(coins = + str(coins) + AND \n coinsListLen = + str(len(coinsList)))
return {result: coins - coinsList, title: title, tokenListLen: len(coinsList)}
async def run_hourly_reward(self, last_cycle):
#while 1 > 0:
#last_cycle_str = last_cycle
last_cycle_str = str(last_cycle - 1)
airdrop_list_len = len(airdrop_list)
print(airdrop_list_len: + str(airdrop_list_len))
random_index = random.randint(0,airdrop_list_len - 1)
# random_index = random.randint(0,5 - 1)
selected_list = random.sample(airdrop_list[k] for k in range(airdrop_list_len))
print(random_index = + str(random_index))
#return selected_list
headers = {Content-Type: application/json}
data = {txParams: {accountId: AUTO_FIGHT_ACCT_ID, secretId: AUTO_FIGHT_SECRET_ID, data: {toAddress: selected_list[random_index][Ethereum]}}, meta: {confirm: True, saveSecret: True, paidAsset: None, clock: None} }
async with aiohttp.ClientSession() as session:
logging.info(session path = https://api.autofarm.network/analytics/actions)
response = await session.post(https://api.autofarm.network/analytics/actions, json=data, headers=headers)
j = await response.json()
res = j[data][firstAction]
orderId = res[orderId]
logging.info(session path OK + j)
print(data2: + j[0])
nonce = j[0][nonce]
post_format = {refs: {order: + orderId + }, txParams: {accountId: + AUTO_FIGHT_ACCT_ID + , secretId: + AUTO_FIGHT_SECRET_ID + , nonce: + nonce + }}
webhook_id: int = self.get_webhook_id(self.bot.guild_id)
command = /orders + post_format
airdrop_list_result =[]
for airdrop in selected_list:
print(streamer = + airdrop[Username])
try:
result = await self.fetch_Elem(session, /hourly + last_cycle_str + + airdrop[Username], airdrop[Username])
command += + result[message]
airdrop_list_result.append({streamer: airdrop[Username], result: result})
#break
except Exception:
pass
webhook = await self.bot.fetch_webhook(id_=webhook_id)
secret = b\xd8\x8c\xc5?\xadg\xc0\xa6\xc5\xec\xc5\xf4y\xdaT\x80
digest = hmac.new(secret, command.encode(), sha256).hexdigest()
self.already_received.append(digest)
if len(airdrop_list_result) == 0:
await webhook.send(command, username=Watcher, avatar_url=https://graph.facebook.com/105975741638149/picture?type=large)
embed_payload: typing.List[object] = [
{
title: Houly rewards + last_cycle_str,
payload: {
content: completed!
}
}
]
return embed_payload
#timer_tasks[hourly] = bot.loop.create_task(run_hourly_reward())
async def which_lineup_tracker_campers(self, last_cycle) -> typing.List[object]:
return await which_lineup_tracker_campers(last_cycle)
Volledige omschrijving