Verificando se a internet está ativa ou não, com um semáforo!

Sinalizando se tem ou não internet com um semáforo


Encontrei na internet um projeto muito bacana utilizando uma Raspberry Pi. Então resolvi compartilhar.

O Projeto trata de um medidor de internet feito a partir de um semáforo de trânsito. Ou seja, se estiver vermelho a internet está inativa, se estiver verde a internet está ativa. Interessante né!!

A iniciativa desse projeto se deu a partir de um interruptor defeituoso que gerou um buggy na rede por um tempo. A equipe levou algumas horas para tornar o terminal acessível e iniciar um ping para checar se a internet tinha voltado. Eles tinham uma raspberry pi e um semáforo de trânsito, então começou a "brincadeira". Eles fizeram uma pequena placa de relé para permitir que os raspi controlassem o semáforo e escrevesse um script shell para verificar se havia uma conexão com a internet e definir o semáforo para verde se houvesse e para vermelho se não houvesse.

Abaixo coloquei um link para você ver o vídeo que fizeram mostrando esse sistema funcionando. Bem interessante!


http://www.youtube.com/watch?v=uxdJv-h0Fks


Atualização do dia 28/07/12

Fizemos uma nova placa de relé para que possamos controlar as duas luzes independentemente, quando o raspi está inicializando ambas as luzes estão agora desligadas.

Atualização do dia 28/07/12 nr2

Nós portamos o código para Python. Quando há apenas uma quantidade limitada de perda de pacote, as luzes agora piscam.

 

Script

 

#!/usr/bin/env python
 
"""
    Uma implementação pura de python ping usando socket raw ..
 
 
  Note que as mensagens ICMP só podem ser enviadas a partir de processos executados como root.

     Derivado do ping.c distribuído no netkit do Linux. Esse código é
     Copyright (c) 1989 pelos Regents da Universidade da Califórnia.
     Esse código é, por sua vez, derivado do código escrito por Mike Muuss do
     Laboratório de Pesquisas Balísticas do Exército dos EUA em dezembro de 1983 e
     Colocado no domínio público. Eles têm meus agradecimentos.

 
    Copyright (c) Matthew Dixon Cowles, <http://www.visi.com/~mdc/>.

     Distribuível sob os termos da GNU General Public License
     Versão 2. Fornecido sem garantias de qualquer tipo.

 
    Versão Original de Matthew Dixon Cowles:
      -> ftp://ftp.visi.com/users/mdc/ping.py
 
    Reescrito por Jens Diemer:
      -> http://www.python-forum.de/post-69122.html#69122
 
    Reescrito por George Notaras:
      -> http://www.g-loaded.eu/2009/10/30/python-ping/
 
    Histórico de Revisão
    22:16, 28 July 2012 (CEST)22:16, 28 July 2012 (CEST)22:16, 28 July 2012 (CEST)~
 
    November 8, 2009
    ----------------
    Compatibilidade aprimorada com sistemas GNU / Linux.
 
    Correções por :
     * George Notaras -- http://www.g-loaded.eu
    Relatado por:
     * Chris Hallman -- http://cdhallman.blogspot.com
 
    Alterações nesta versão:
     - Re-use time.time() instead of time.clock(). A implementação de 2007 funcionou apenas com o Microsoft Windows. 
Não funcionou com GNU/Linux.
       time.clock() behaves differently under the two OSes[1].
 
    [1] http://docs.python.org/library/time.html#time.clockMay 30, 2007
    ------------
    Pequena alteração por Jens Diemer:
     -  Alterar a importação do soquete Asterisk para uma importação normal
     -  Substituir time.time() with time.clock()
     -  Apagar "return None" (ou aterar para  "return" )
     -  Em checksum() renomear "str" para "source_string"
 
    November 22, 1997
    -----------------

     Corte inicial. Não faz muito, mas em vez de tentar adivinhar quais recursos eu (ou outros) vou querer no futuro, eu só coloquei o que eu preciso agora.

December 16, 1997
    -----------------

Por algum motivo, os bytes de checksum de verificação estão na ordem errada quando
     Este é executado em Solaris 2.X para SPARC, mas funciona direito sob
     Linux x86. Até eu saber  exatamente o que está errado, vou trocar o
     Bytes sempre e, em seguida, fazer um htons ().

 
    December 4, 2000
    ----------------
   Altere o struct.pack() calls para pack the checksum e ID como
    sem assinatura. Meus agradecimentos para Jerome Poincheval pela correção.
 
 
    Últimas Informações:
    22:16, 28 July 2012 (CEST)22:16, 28 July 2012 (CEST)22:16, 28 July 2012 (CEST)~~
    $LastChangedDate: $
    $Rev: $
    $Author: $
"""

import os, sys, socket, struct, select, time
 
# From /usr/include/linux/icmp.h; your milage may vary.
ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris.

IP="8.8.8.8"
TIMEOUT=0.4
GREENPIN=24
REDPIN=23
HISTORY=10
 
def checksum(source_string):
    """
    I'm not too confident that this is right but testing seems
    to suggest that it gives the same answers as in_cksum in ping.c
    """
    sum = 0
    countTo = (len(source_string)/2)*2
    count = 0
    while count<countTo:
        thisVal = ord(source_string[count + 1])*256 + ord(source_string[count])
        sum = sum + thisVal
        sum = sum & 0xffffffff # Necessary?
        count = count + 2
 
    if countTo<len(source_string):
        sum = sum + ord(source_string[len(source_string) - 1])
        sum = sum & 0xffffffff # Necessary?
 
    sum = (sum >> 16)  +  (sum & 0xffff)
    sum = sum + (sum >> 16)
    answer = ~sum
    answer = answer & 0xffff
 
    # Swap bytes. Bugger me if I know why.
    answer = answer >> 8 | (answer << 8 & 0xff00)
 
    return answer
 
 
def receive_one_ping(my_socket, ID, timeout):
    """
    receive the ping from the socket.
    """
    timeLeft = timeout
    while True:
        startedSelect = time.time()
        whatReady = select.select([my_socket], [], [], timeLeft)
        howLongInSelect = (time.time() - startedSelect)
        if whatReady[0] == []: # Timeout
            return
 
        timeReceived = time.time()
        recPacket, addr = my_socket.recvfrom(1024)
        icmpHeader = recPacket[20:28]
        type, code, checksum, packetID, sequence = struct.unpack(
            "bbHHh", icmpHeader
        )
        if packetID == ID:
            bytesInDouble = struct.calcsize("d")
            timeSent = struct.unpack("d", recPacket[28:28 + bytesInDouble])[0]
            return timeReceived - timeSent
 
        timeLeft = timeLeft - howLongInSelect
        if timeLeft <= 0:
            return
 
 
def send_one_ping(my_socket, dest_addr, ID):
    """
    Send one ping to the given >dest_addr<.
    """
    dest_addr  =  socket.gethostbyname(dest_addr)
 
    # Header is type (8), code (8), checksum (16), id (16), sequence (16)
    my_checksum = 0
 
    # Make a dummy heder with a 0 checksum.
    header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
    bytesInDouble = struct.calcsize("d")
    data = (192 - bytesInDouble) * "Q"
    data = struct.pack("d", time.time()) + data
 
    # Calculate the checksum on the data and the dummy header.
    my_checksum = checksum(header + data)
 
    # Now that we have the right checksum, we put that in. It's just easier
    # to make up a new header than to stuff it into the dummy.
    header = struct.pack(
        "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
    )
    packet = header + data
    my_socket.sendto(packet, (dest_addr, 1)) # Don't know about the 1
 
 
def do_one(dest_addr, timeout):
    """
    Returns either the delay (in seconds) or none on timeout.
    """
    icmp = socket.getprotobyname("icmp")
    try:
        my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
    except socket.error, (errno, msg):
        if errno == 1:
            # Operation not permitted
            msg = msg + (
                " - Note that ICMP messages can only be sent from processes"
                " running as root."
            )
            raise socket.error(msg)
        raise # raise the original error
 
    my_ID = os.getpid() & 0xFFFF
 
    send_one_ping(my_socket, dest_addr, my_ID)
    delay = receive_one_ping(my_socket, my_ID, timeout)
 
    my_socket.close()
    return delay
 
 
def verbose_ping(dest_addr, timeout = 2, count = 4):
    """
    Send >count< ping to >dest_addr< with the given >timeout< and display
    the result.
    """
    for i in xrange(count):
        print "ping %s..." % dest_addr,
        try:
            delay  =  do_one(dest_addr, timeout)
        except socket.gaierror, e:
            print "failed. (socket error: '%s')" % e[1]
            break
 
        if delay  ==  None:
            print "failed. (timeout within %ssec.)" % timeout
        else:
            delay  =  delay * 1000
            print "get ping in %0.4fms" % delay
    print

def set_pin(pin,value):
    file('/sys/class/gpio/gpio'+str(pin)+'/value','w').write(str(value))

def blink_pin(pin,f):
    t=time.time()
    t=t-int(t)
    if t < 0.5:
      set_pin(pin,1)
    else:
      set_pin(pin,0)

def init_pin(pin):
  file('/sys/class/gpio/export','w').write(str(pin))
  file('/sys/class/gpio/gpio'+str(pin)+'/direction','w').write('out')

if __name__ == '__main__':
    count=0
    loss=0

    init_pin(REDPIN);
    init_pin(GREENPIN);
  
    while 1 == 1:
        lasttime=time.time() 
        print time.time()
        if count>(HISTORY-1):
            count=count-1
            if loss>(HISTORY-1):
              loss=loss-1
        ping = None
        try:
          ping = do_one(IP,TIMEOUT)
        except:
          pass
        count=count+1
        if ping == None:
            loss=loss+1
        elif loss>0:
            loss=loss-1
        
        print "ping "+str(count)+", loss "+str(loss)
        if loss==0:
            set_pin(REDPIN,0)
            set_pin(GREENPIN,1)
        elif loss==count:
            set_pin(GREENPIN,0)
            set_pin(REDPIN,1)
        else:
            set_pin(GREENPIN,1)
            blink_pin(REDPIN,0.2+((1.0-(float(loss)/float(count)))/2.0))
        wait = lasttime+1 - time.time()
        print wait
        if loss==0 and wait:
           time.sleep(wait)



ENTRE EM CONTATO COM A LOJAMUNDI.

Assine nossa Newsletter! É gratuito!

Cadastre seu nome e email para receber novidades e materiais gratuitos da Lojamundi.