Agora que já conhecemos o Scapy, podemos criar ferramentas de rede com ele. Se você não sabe do que eu estou falando, clique aqui!

Todo o código exposto neste artigo se encontra em meu github, você pode acessá-lo através da URL abaixo:

Github - GTPv1-Echo-Monitoring

Visão Geral

A proposta neste artigo é criarmos juntos um monitor GTP. Pra quem não sabe, o GTP (GPRS Tunneling Protocol) é o protocolo de rede mais importante nas redes móveis, ele é responsável por todo o controle de sessão do usuário, entre suas tarefas está manter o usuário conectado mesmo quando ele está em deslocamento.

Portanto, é muito importante, numa rede móvel (Packet Core), monitorarmos as interfaces que utilizam o protocolo GTP.

O próprio protocolo, em sua especificação, define um procedimento de monitoramento, realizado através de duas mensagens:

  • GTP Echo Request
  • GTP Echo Response

O documento também define que a mensagem GTP Echo Request só deve ser enviada no máximo de 1 em 1 minuto.

Por sorte, o Scapy já possui o protocolo GTP definido, portanto vamos utilizá-lo para implementar nossa ferramenta.

Importando o módulo GTP

from scapy.all import *
from scapy.contrib import gtp

Na primeira linha do código, estamos importando todas as funções e classes padrões do scapy.

Na segunda linha do código, estamos importando um protocolo que não é padrão do scapy, porém foi feito por algum membro da comunidade e adicionado à biblioteca.

Neste caso, estamos importando especificamente o protocolo GTPv1 que corresponde ao módulo denominado gtp.

Além desses dois blocos de importação, iremos utilizar as seguintes bibliotecas:

import time
import argparse
import logging
import sys

Definindo os argumentos

A biblioteca argparse nos ajuda a utilizar argumentos passados na chamada do script python como variáveis deste script.

Vamos pensar em que tipos de opções queremos que o usuário possua ao iniciar nosso monitor GTP.

Lista de IPs

Para inciar, precisamos que o usuário forneça uma lista de IPs para nosso script, a fim de sabermos quais interfaces iremos monitorar. Portanto, vamos iniciar nosso parser.

parser = argparse.ArgumentParser()
parser.add_argument("-i", "--IP", help="Provide a list of IPs to monitor separated by comma ','", type=str)

Pronto, criamos o argumento -i que irá receber uma string como parâmetro contendo uma lista de IPs separados por vírgula, por exemplo:

wizard@thepacketwizards:~$ python gtp_monitoring.py -i 192.168.1.1,192.168.1.2

Adicionando os demais argumentos

Além da lista de IPs, vamos utilizar outros argumentos. Queremos também que seja escolha do usuário se ele precisa criar arquivos de log, enviar a saída do log para o console, se irá monitorar a lista de IPs passada infinitamente e também se o nível do log será DEBUG ou não.

Portanto, vamos adicionar os demais argumentos ao nosso código.

parser = argparse.ArgumentParser()
parser.add_argument("-l", "--loop", help="Choose option to loop", action='store_true')
parser.add_argument("-i", "--IP", help="Provide a list of IPs to monitor separated by comma ','", type=str)
parser.add_argument("-log", "--LOG", help="Choose option to log to file", action='store_true')
parser.add_argument("-d", "--DEBUG", help="Choose option to enable debug mode", action='store_true')
parser.add_argument("-nv", "--NOVERBOSE", help="Choose option to output log INFO to stderr", action='store_true')
args = parser.parse_args()

Pronto, agora que já terminamos de definir quais argumentos o usuário poderá adicionar à chamada do nosso script, vamos iniciar as chamadas para a biblioteca logging.

Iniciando nosso "Logger"

Em resumo, o "logger" é o nosso gerador de logs, é sempre interessante reservamos um bloco no ínicio do nosso código para iniciarmos as chamados aos geradores de logs para que isso fique bem claro e não passe desapercebido.

O python já nos fornece uma biblioteca padrão para geração de logs, portanto vamos utilizá-la em nosso código. Chamaremos nosso gerador de log de "gtp_echo_monitoring" e iremos atribuir formato padrão e nível padrão à ele.

logger = logging.getLogger('gtp_echo_monitoring')
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

Uma vez realizada essa configuração, podemos começar a verificar os argumentos utilizados pelo nosso usuário, adicionando comportamentos derivados dessas entradas.

# Init Console Handler for Error Messages
if not args.NOVERBOSE:
    ch = logging.StreamHandler()
    ch.setLevel(logging.INFO)
    ch.setFormatter(formatter)
    logger.addHandler(ch)

if args.LOG:
    # Create File Handler which Logs up to ERROR Messages
    fh = logging.FileHandler('info.log')
    fh.setLevel(logging.INFO)
    fh.setFormatter(formatter)
    logger.addHandler(fh)

# Create File Handler which Logs Debug Messages
if args.DEBUG:
    dh = logging.FileHandler('debug.log')
    dh.setLevel(logging.DEBUG)
    dh.setFormatter(formatter)
    logger.addHandler(dh)

Calma, eu sei que introduzimos um monte de novos conceitos com o código acima, vamos explicá-los para que fique clara a função de cada bloco acima.

Primeramente, vamos verificar se o usuário utilizou o argumento -nv, indicando que ele não quer que o log seja enviado ao console, em caso negativo, ou seja, quando este argumento não for especificado, iremos criar um "handler", ou seja, uma função que está atrelada ao "logger", estamos utilizando o nível de log como INFO e o formato padrão que definimos anteriormente.

Depois dessa primeira verificação, vamos verificar se o usuário indicou, através do argumento -log, se ele precisa que os logs sejam gravados em um arquivo. Em caso positivo, salvaremos todos os logs cujo nível for INFO em um arquivo chamado info.log.

Enfim, avaliaremos se o nosso usuário deseja criar um arquivo de DEBUG, ou seja, um arquivo mais detalhado para qualquer problema que possa estar ocorrendo com nosso script. Em caso afirmativo, iremos salvar todas as mensagens cujo nível é DEBUG em um arquivo chamado debug.log.

Criando a Função main()

Agora que já definimos os argumentos e os loggers , podemos utiliza-los em nossa função main().

No primeiro bloco de código desta função, definiremos a criação dos pacotes GTP Echo Request, utlizando como entrada, os IPs passados via argumento --IP.

def main():
    if args.IP:
        ip_lst = args.IP.split(',')
        logger.info('GTP Interfaces IP list: {}'.format(ip_lst))
        echo_request = IP(dst=ip_lst) / UDP(sport=2123, dport=2123) / gtp.GTPHeader() / gtp.GTPEchoRequest()

        if args.DEBUG:
            logger.debug('List of Echo Request Packets Generated: {}'.format([p for p in echo_request]))

    else:
        logger.error('Must Specify an IP')
        if args.DEBUG:
            logger.debug("Could not find any IP on user's input")
sys.exit()

Primeiramente, testamos se o argumento IP não é nulo, logo após isso utilizamos o método split para que possamos transformar a string de IPs em uma lista.

Enviamos ao logger em nível INFO, os IPs passados como argumento, enfim criamos os pacotes GTP Echo Requests, percebam como é fácil criar esta lista de pacotes. A ferramenta scapy ao receber um argumento do tipo lista, já cria uma lista de pacotes com os diferentes IPs contidos naquela lista.

Caso o nível DEBUG tenha sido ativado pelo usuário, nós enviamos ao logger com nível DEBUG a lista de pacotes gerados pelo scapy.

Por fim, caso o usuário não tenha especificado qualquer valor no argumento --IP, iremos enviar um erro ao logger e terminar nosso script.

Enviando e Recebendo pacotes GTP Echo Request/Response

Quando desenhamos os argumentos que poderiam ser utilizados pelos nossos usuários, definimos que o script poderia enviar e receber pacotes em loop ou não, portanto para que possamos enviar e receber os pacotes, a primeira varíavel que precisamos validar é exatamente essa.

Verificando o argumento --loop

    if args.loop:
        logger.info('######## LOOP Initiated ########')
        while True:
                ...
    else:
        logger.info('Sending Packets...')
        ...

O bloco que se segue à esta validação, será exatamente igual, a única diferença é que quando o argumento --loop é avaliado como verdadeiro, inserimos um loop que roda para sempre, ou até que um comando de parada seja executado, tal como um CTRL+C.

O que ainda não paramos para fazer é uma função que valide a resposta da interface GTP ao nosso pacote GTP Echo Request. Portanto, antes de continuarmos desenvolvendo o bloco de envio e recebimento, vamos criar a função validate_response()

Criando a função validate_response()

O protocolo GTP é formado por elementos de informação, que denominamos de IE (Information Elements), o elemento de informação da mensagem GTP Echo Response que nos interessa é chamado de Restart Counter, ou seja, toda vez que ocorre uma reinicialização desta interface, este número é incrementado.

Portanto, para que possamos verificar a disponibilidade de uma certa interface, podemos apenas verificar se este elemento de informação foi modificado ou não.

Nossa função precisa receber apenas o pacote GTP Echo Response que a interface GTP monitorada nos enviou.

restart_counter_dict = dict()
def validate_response(pkt):
    try:
        if args.DEBUG:
            logger.debug('Packet: {}'.format(pkt.summary))
            logger.debug('IE_LIST: {}'.format(pkt.IE_list))
        for ie in pkt.IE_list:
            if ie.ietype == 14:
                restart_counter = ie.restart_counter
            else:
                restart_counter = None
    except Exception as exc:
        restart_counter = None
        logger.error(exc)
        pass

    logger.debug('Getting Previous value of Restart Counter from GSN IP: {}'.format(pkt.getlayer(IP).src))
    previous_restart_counter = restart_counter_dict.get(pkt.getlayer(IP).src)

    if restart_counter:
        if previous_restart_counter:
            result = previous_restart_counter == restart_counter
            if args.DEBUG:
                logger.debug('Updated Restart Counter Value: {}'.format(restart_counter))
                logger.debug('Previous Restart Counter Value: {}'.format(previous_restart_counter))
                logger.debug('Result of GSN IP: {} is {}'.format(pkt.getlayer(IP).src, result))

        else:
            restart_counter_dict.update({pkt.getlayer(IP).src: restart_counter})
            logger.debug('No Previous value of Restart Counter from GSN IP: {}'.format(pkt.getlayer(IP).src))
            logger.debug('First Restart Counter Value: {}'.format(restart_counter))
            result = True
            return result
    else:
        logger.error('No Restart Counter Found on Packet: {}'.format(pkt.summary()))
        logger.debug('No Restart Counter Found on Packet: {}'.format(pkt.summary()))
        result = False

    logger.debug('GTP Interface has not been restarted' if result else 'GTP Interface has been restarted')
    return result

Como podem ver pelo código da nossa função, recebemos o pacote GTP Echo Response como parâmetro e tentamos extrair o elemento de informação Restart Counter, em caso de sucesso, verificamos se já existia um valor para esse elemento previamente armazenado no nosso objeto restart_counter_dict, caso esse valor seja igual ao valor atual, retornamos o valor True da função, ou seja, a interface GTP continua disponível desde o último monitoramento, caso o valor seja diferente, retornamos o valor False, ou seja, em algum momento desde o último monitoramento aquela interface ficou fora de serviço.

Aqui, gostaria de encorajá-los a estudarem esta função e entenderem a utilização dos blocos de tratamento de exceção, try , except e também os tipos de mensagem que estamos enviando ao nosso logger.

É legal perceber que nosso objeto de armazenamento de elementos de informação é um dicionário cuja chave é o IP da interface monitorada.

>>> restart_counter_dict
>>> {'192.168.1.1': 2, 
     '192.168.1.2': 0}

Enviando e Recebendo... Finalmente

def main():
...
        try:
            ans, uans = sr(echo_request, verbose=False, timeout=5)
            if len(uans):
                for i in range(len(uans)):
                    gtp_ip = uans[i].getlayer(IP).dst
                    logger.warning('GTP Interface: {} || Status: {} || Cause: {}'.format(gtp_ip, 'NOK', 'TIMEOUT'))

            if len(ans):
                for i in range(len(ans)):
                    gtp_ip = ans[i][0].getlayer(IP).dst
                    status = "OK" if validate_response(ans[i][1]) else "NOK"
                    if status == 'OK':
                        logger.info('GTP Interface: {} || Status: {}'.format(gtp_ip, status))
                    else:
                        logger.warning('GTP Interface: {} || Status: {} || Cause: {}'.format(gtp_ip, 'NOK',
                                                                                             'RestartCounter Changed'))
            logger.debug('TIMEOUT: 60s')
            time.sleep(60)
        except Exception as exc:
            logger.error(exc)
            if args.DEBUG:
                logger.warning('While trying to send packet got an exception.')
                logger.debug(exc)

Para enviar e receber pacotes, como vimos em nosso tutorial, utilizaremos a função sr(). Estabelecemos que o tempo máximo de espera de resposta de um pacote é de 5s, após este tempo esgotado, em caso de não haver resposta iremos enviar ao nosso logger uma mensagem com nível de WARNING indicando que aquela interface GTP está fora de serviço.

Para as respostas que recebermos, iremos validar utilizando a função que criamos anteriormente e iremos enviar ao nosso logger as mensagens de acordo com a saída da função validate_response().

Enfim, respeitando a especificação do protocolo, iremos esperar 60s para tomar qualquer outra ação, como por exemplo, reenviar os pacotes de monitoramento caso o argumento --loop tenha sido avaliado como verdadeiro.

Conclusão

Como vimos no tutorial scapy, podemos expandir o scapy de diversas maneiras e utilizá-lo para nos ajudar em nosso trabalho ou apenas em nossos estudos.

O Scapy é uma ferramenta extremamente poderosa para prototipagem de protocolos e ferramentas de rede. Como podemos ver, uma ferramenta de monitoramento de protocolo GTP foi escrita em meras 157 linhas!