Désencyclopédie:Jabber/Bot

Un article de la désencyclopédie.
Aller à la navigation Aller à la recherche

Réalisé en python avec la bibliothèque xmpppy.

Requiert python [1] et xmpppy [2].

  • cauchy_bot.py: fichier principal
  • recentchanges.py: changements récents
  • pendu.py: jeu du pendu
  • artough: artough way

cauchy_bot.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import xmpp
import time,os,sys

class JabParameter():
   def __init__(self):
       self.yid = ""
       self.pas = ""
       self.nam = ""
       self.cha = ""
       self.adm = ""
       self.InitParam()
       self.run = True
       self.roster = []
       self.pendu = ""
       self.antiflood = 10000
       self.deaf = 0
       self.logdir = "./"+repr(time.localtime()[:6]).replace(", ","_")[1:-1]
       if not os.path.exists(self.logdir):
           os.mkdir(self.logdir)

   def InitParam(self):
       self.yid = raw_input("JID: ")
       self.pas = raw_input("Password for this JID: ")
       self.nam = raw_input("Alias of the bot: ")
       self.cha = raw_input("Channel adress: ")
       while self.adm=="": self.adm = raw_input("Admin password: ")

   def Log(self,text,fi):
       f = open(self.logdir+"/"+fi+".log","a")
       f.write(text+"\n")
       f.close()

JP = JabParameter()

if JP.cha=="": JP.cha = "desencyclopedie@chat.jabberfr.org"

print "#######################"
print "# STARTING CAUCHY_BOT #"
print "#######################"
print " admin pwd:",JP.adm
print " log dir:",JP.logdir
print " init RecentChanges"
import recentchanges
recentchanges.rc.logdir = JP.logdir
print "  rc.webpage:","".join([recentchanges.rc.webpage,recentchanges.rc.webpage2])
print "  rc.num:",recentchanges.rc.num
print "  rc.time:",recentchanges.rc.time
print " init Pendu"
import pendu
JP.pendu = pendu.Pendu()
JP.pendu.logdir = JP.logdir
print " init Artough"
import artough

def RemoveB(t):
   u = t[:]
   while "<" in u:
       i1 = u.index("<")
       i2 = u.index(">")+1
       u = u[:i1]+u[i2:]
   return u

def Sender(text):
   text_plain = RemoveB(text)
   p = xmpp.protocol.Message(to=JP.cha, body=text_plain, frm=JP.cha+"/"+JP.nam, typ="groupchat")
   if text_plain != text:
       p_html = xmpp.Node("html",{"xmlns":'http://jabber.org/protocol/xhtml-im'})
       t1 = "<body xmlns='http://www.w3.org/1999/xhtml'>"
       t2 = "</body>"
       try:
           p_a = xmpp.simplexml.XML2Node(t1+text+t2)
           p_html.addChild(node=p_a)
           p.addChild(node=p_html)
       except :
           p = xmpp.protocol.Message(to=JP.cha, body=text_plain, frm=JP.cha+"/"+JP.nam, typ="groupchat")
   JP.cl.send(p)

def Talk(text,nick):
   if JP.pendu.play:
       if len(text.split())==2 and len(text.split()[1])==1:
           #proposition d'une lettre
           t = JP.pendu.proposition(text.split()[1],nick)
           for l in t: Sender(l)
       if len(text.split())>1:
           if JP.pendu.normalise(" ".join(text.split()[1:])) == JP.pendu.mot:
               t = JP.pendu.bonmot(nick)
               for l in t: Sender(l)
   intext = ""
   if len(text.split())>1: intext=" ".join(text.split()[1:])
   JP.Log(intext,"command")
   if "deaf" in intext:
       if JP.adm in intext:
          JP.deaf = intext.count("1")
          intext = ""
   if JP.deaf:  intext = ""
   if "help" in intext:
       Sender("liste des commandes:")
       Sender(" help: ce que vous venez de taper.")
       Sender(' hello  world: dit "Hello World !". Totalement inutile.')
       Sender(' echo: repete ce qui suit "echo".')
       Sender(" recentchanges N: affiche les N derniers changements de la dEsencyclopedie.")
       Sender(" rc 0/1: stoppe/demarre la surveillance des changements.")
       Sender(" pendu new: jeu du pendu.")
       Sender(" pendu sc: score du jeu du pendu.")
       Sender(" admin_pwd deaf 0/1: active/desactive la detection des ordres.")
       Sender(" antiflood N: ejecte quand un texte de plus de N lettres est envoye (-1=jamais).")
       Sender(" artough way 0/1: Artough Way !!.")
       Sender(" admin_pwd stop: arret du bot.")
       Sender(" admin_pwd master_command: permet a l'utilisateur ayant lance le bot de rentrer une commande en console")
   elif "echo" in intext:
       t = intext[intext.index("echo")+5:]
       t = t.replace("/","\\")
       t = t.replace("<\\","</")
       t = t.replace("\\>","/>")
       Sender(t)
   elif "hello world" in intext: Sender("Hello world !")
   elif "stop" in intext:
       if JP.adm in intext: JP.run=False
   #elif "execute" in intext:
   #    if self.adm in intext:
   #        t = intext[intext.index("execute")+8:]
   #        exec(compile(t,'<string>','exec'))
   elif "recentchanges" in intext:
       t = int(intext[intext.index("recentchanges")+14:])
       list = recentchanges.Status(t)
       for l in list:
           Sender(l)
   elif "rc" in intext:
       if "1" in intext and not recentchanges.rc.run: recentchanges.StartThread()
       elif "0" in intext: recentchanges.rc.run = False
   elif "master_command" in intext:
       if JP.adm in intext:
           print "To escape: type an empty string"
           while True:
               t = raw_input("Enter a command: ")
               if t!="":
                   exec(compile(t,'<string>','exec'))
               else: break
   elif "pendu" in intext:
       if "new" in intext:
           JP.pendu.init()
           Sender("Jeu du pendu")
           Sender(JP.pendu.mot_visible)
       elif "sc" in intext:
           t = JP.pendu.recapitulatif()
           for l in t:Sender(l)
   elif "antiflood" in intext:
       t = intext[intext.index("antiflood")+10:]
       if t.isdigit():
           JP.antiflood = int(t)
           Sender("antiflood: "+t)
       if t=="-1":
           JP.antiflood = -1
           Sender("antiflood: "+t)
   elif "artough way" in intext:
       if "1" in intext and not artough.aw.run:
           ro = JP.roster[:]
           ro.remove(JP.nam)
           artough.aw.nicks = ro
           artough.aw.run = True
           artough.StartThread()
       elif "0" in intext: artough.aw.run = False

def LOG(stanza,nick,text):
   global JP
   ts=stanza.getTimestamp()
   if not ts:
       ts=stanza.setTimestamp()
       ts=stanza.getTimestamp()
   fold=stanza.getFrom().getStripped().replace('@','%')
   JP.Log(repr([ts,nick,fold,text]),"histo")
   if text!= None:
       if text.startswith(JP.nam):
           Talk(text.encode('utf-8'),nick)
       if JP.antiflood>100:
           #to change: /msg cauchy_bot cauchy_bot echo ... can kick the bot :/
           if len(text)>JP.antiflood: Sender("FLOOD! FIXME TO ALLOW ME TO KICK "+nick)

def presenceCB(sess,pres):
   nick=pres.getFrom().getResource()
   text=''
   if pres.getType()=='unavailable':
       if nick in JP.roster:
           JP.roster.remove(nick)
   else:
       if nick not in JP.roster:
           JP.roster.append(nick)
   if text: LOG(pres,nick,text)

def messageCB(sess,mess):
   nick=mess.getFrom().getResource()
   text=mess.getBody()
   LOG(mess,nick,text)

def main():
   JP.cl=xmpp.Client(xmpp.JID(JP.yid).getDomain(),debug=[])
   con=JP.cl.connect()
   if not con:
       print 'could not connect!'
       sys.exit()
   print 'connected with',con
   JP.cl.RegisterHandler('message',messageCB)
   JP.cl.RegisterHandler('presence',presenceCB)
   auth=JP.cl.auth(xmpp.JID(JP.yid).getNode(),JP.pas)
   if not auth:
       print 'could not authenticate!'
       sys.exit()
   print 'authenticated using',auth
   p=xmpp.Presence(to=JP.cha+"/"+JP.nam)
   p.setTag('x',namespace=xmpp.NS_MUC).setTagData('password','')
   p.getTag('x').addChild('history',{'maxchars':'0','maxstanzas':'0'})
   JP.cl.send(p)
   while JP.run:
       JP.cl.Process(1)
       if len(recentchanges.rc.toprint) != 0:
           for l in recentchanges.rc.toprint:
               Sender(l)
           recentchanges.rc.toprint = []
       if len(artough.aw.toprint) != 0:
           for l in artough.aw.toprint:
               Sender(l)
           artough.aw.toprint = []
           ro = JP.roster[:]
           ro.remove(JP.nam)
   JP.cl.disconnect()

main()

recentchanges.py

#!/usr/bin/env python
# -*- coding: latin-1 -*-

import urllib
import time
import thread

class RecentChanges():
   def __init__(self):
       self.webpage = "http://desencyclopedie.wikia.com/index.php?title=Special:Modifications_récentes"
       self.webpage2 = "&hideminor=1&hideenhanced=1"
       self.num = 5
       self.list_old = self.getN()
       self.dico = {" ":" ","×":"x"," ":"",'href="/':'href="http://desencyclopedie.wikia.com/'}
       self.time = 15
       self.run = False
       self.toprint = []
       self.logdir = ""

   def remove_b(self,t,sauf=[]):
       u = t[:]
       loi = [0]
       for i1 in range(len(u)):
           if u[i1] == "<":
               su = u[i1:]
               i2 = i1+su.index(">")+1
               su = u[i1:i2]
               boo = True
               for l in sauf:
                   if "<"+l in su: boo = False
                   if "</"+l in su: boo = False
               if boo:
                   loi.append(i1)
                   loi.append(i2)
       u = ""
       loi.append(len(u)-1)
       for l in range(len(loi)/2):
           i1 = loi[l*2]
           i2 = loi[(l*2)+1]
           u += t[i1:i2]
       return u

   def prettyprint(self,t):
       for l,v in self.dico.items():
           t = t.replace(l,v)
       rt = ''
       for l in range(len(t)):
           if t[l] == "&" and t[l:l+5] != "&": rt = "".join([rt,"&"])
           else: rt = "".join([rt,t[l]])
       return rt

   def getN(self,num=-1):
       if num==-1: num = self.num
       fi = urllib.urlopen(self.webpage+self.webpage2)
       while True:
           t = fi.readline()
           if "h4" in t: break
       list = []
       for l in range(num):
           list.append(self.remove_b(fi.readline()[:-1],["a"]))
       return list[:]

   def Log(self,text):
       f = open(self.logdir+"/recentchanges.log","a")
       f.write(text+"\n")
       f.close()

def Status(n):
   list = rc.getN(n)
   tp = []
   for l in list:
       tp.append(rc.prettyprint(l))
   return tp

def StartThread():
   rc.run = True
   thread.start_new_thread(StartAutomatic,('',''))

rc = RecentChanges()

def StartAutomatic(a,b):
   while rc.run:
       time.sleep(rc.time)
       list_new,list_new2 = rc.getN(),[]
       for l in list_new:
           if rc.remove_b(l) not in rc.list_old: rc.toprint.append(rc.prettyprint(l))
           list_new2.append(rc.remove_b(l))
       rc.Log(time.ctime()+"\nOld:"+repr(rc.list_old)+"\nNew:"+repr(list_new)+"\nTP:"+repr(rc.toprint)+"\n")
       rc.list_old = list_new2[:]

pendu.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import urllib, string

class Pendu():
   def __init__(self):
       self.logdir = ""
       self.webpage = "http://desencyclopedie.wikia.com/wiki/Special:Page_au_hasard"
       self.point = {}
       self.dico = {"é":"e","è":"e","ê":"e","ë":"e","à":"a","â":"a","û":"u","î":"i","ô":"o","ç":"c","ù":"u","É":"e","È":"e","À":"a","ï":"i","ü":"u"}
       self.mot = ""
       self.mot_visible = ""
       self.lettres = string.ascii_lowercase
       self.play = False
       self.link = ""
 
   def init(self):
       self.play = True
       fi = urllib.urlopen(self.webpage)
       while True:
           t = fi.readline()
           if '<link rel="alternate" type="application/x-wiki" ' in t: self.link = t.split('href="')[1].split("&")[0]
           if "<title>" in t: break
       t = t[9:-44]
       t = self.normalise(t)
       self.mot = t[:]
       for l in self.lettres:
           t = t.replace(l,"?")
       self.mot_visible = t[:]
       if self.mot_visible.count("?") < 8: self.init()

   def normalise(self,t):
       t = t.lower()
       for l,v in self.dico.items():
           while l in t:
               t = t.replace(l,v)
       return t

   def recapitulatif(self,numb=-1):
       tdico = {}
       tsc = []
       for l,v in self.point.items():
           if v in tdico: tdico[v].append(l)
           else:
               tdico[v] = [l]
               tsc.append(v)
       tsc.sort(reverse=True)
       text = []
       if numb==-1: numb=len(tsc)
       for l,x in enumerate(tsc[:numb]):
           text += [str(l+1)+") "+str(x)+" points: "+", ".join(tdico[x])]
       return text

   def fin(self):
       self.play = False
       text = ["Jeu fini ! Recaputilatif:"]
       text += self.recapitulatif(5)
       return text

   def bonmot(self,joueur):
       gain = self.mot_visible.count("?")*2+2
       if joueur not in self.point: self.point[joueur] = 0+gain
       else: self.point[joueur] += gain
       text = [joueur+", Felicitation ! Le mot est bien: <a href='"+self.link+"'>"+self.mot+"</a>",joueur+", "+str(gain)]
       self.fin()
       return text

   def proposition(self,le,joueur):
       lettre = le.lower()
       gain,rep = -1,"<span style='color:red'>non</span>"
       if lettre in self.mot_visible: gain,rep = 0,"fait"
       elif lettre in self.mot:
           gain,rep = self.mot.count(lettre)*2,"<span style='color:blue'>oui</span>"
       for l in range(len(self.mot)):
           if lettre == self.mot[l]: self.mot_visible = self.mot_visible[:l]+lettre+self.mot_visible[l+1:]
       if self.mot == self.mot_visible: gain+=1
       if joueur not in self.point: self.point[joueur] = 0+gain
       else: self.point[joueur] += gain
       text = [joueur+", "+rep+" ! "+str(gain),self.mot_visible]
       if self.mot == self.mot_visible:
          text[-1] = "<a href='"+self.link+"'>"+self.mot+"</a>"
          text+=self.fin()
       return text

artough.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import time
import thread
import random

class ArtoughWay():
   def __init__(self):
       self.file = "insultes.txt"
       self.time_min = 10
       self.time_max = 120
       self.nicks = []
       self.toprint = []
       self.run = False

   def insulte(self):
       fi = open(self.file,"r")
       da = fi.readlines()
       random.shuffle(da)
       t = da[0]
       random.shuffle(self.nicks)
       t = t.decode('utf-8')
       t = t.replace("<user>",self.nicks[0])
       for l in range(6):
           t = t.replace("<user"+str(l+2)+">",self.nicks[(l+1)%len(self.nicks)])
       fi.close()
       return t[:-1]

def StartThread():
   thread.start_new_thread(StartAutomatic,('',''))

aw = ArtoughWay()

def StartAutomatic(a,b):
   while aw.run:
       aw.toprint = [aw.insulte()]
       time.sleep(aw.time_min+random.randint(0,aw.time_max))