Blog

  • Downloading Torrents

    I extended the work from last week in order to actually get data from a swarm. The main change is that new sockets are now allocated for each connection, state is remembered, and the client can actually get so far as to download data from the other peers.

    What needs to happen still is that 1. I need a better way of stopping, right now it stops upon a lull in network activity, but that doesn’t really mean anything. 2. I need to actually save downloaded data somewhere, or alternatively keep track of the rate that it’s coming in (or the time delay between packets.) 3. I need to factor out the different message types into separate modules to keep the code readable.

    I’m going to set up a code page for this project soon, so that I don’t have to post stuff just in this blog. I should get to that in the coming week, so that there’s a nicer way of interacting with this code base. This model isn’t super memory efficient, but it is a very simple system to work with, and I think it’s become a good jumping off point for trying to interact with swarms. In particular it’s really easy to add in hooks to get methods to run when messages are received, and to store arbitrary state about connections.

    [python]
    # Standalone Torrent Auditor
    #
    import socket
    import time
    import sys
    import getopt
    import random
    import benc
    import binascii
    import select
    import hashlib
    import urllib
    import server

    #a bit of global state about who we are
    client = "AZ"+chr(0x05)+"31"
    myID = "".join(chr(random.randrange(0, 256)) for i in xrange(20))
    peerCache=[]

    #load in a .torrent file
    def readFile(filePath):
    f = open(filePath, ‘r’)
    data = ”.join(f.readlines())
    f.close()
    return benc.bdecode(data)

    #register with the tracker to generate potential peers
    def register(torrent,myPort):
    url = torrent[‘announce’];
    ihash = hashlib.sha1(benc.bencode(torrent[‘info’])).digest();
    query = urllib.urlencode({‘info_hash’:ihash,
    ‘peer_id’:myID,
    ‘port’:myPort,
    ‘uploaded’:0,
    ‘downloaded’:0,
    ‘left’:0,
    ‘compact’:1,
    ‘event’:’started’});
    url += "?"+query;
    trackerhandle = urllib.urlopen(url);
    trackerdata = ”.join(trackerhandle.readlines());
    trackerhandle.close();
    parseddata = benc.bdecode(trackerdata);
    initialnodes = parseddata[‘peers’];
    peers = [];
    while len(initialnodes) > 5:
    ip = initialnodes[0:4];
    port = initialnodes[4:6];
    initialnodes = initialnodes[6:];
    peers.append({‘state’:0,
    ‘ip’:socket.inet_ntoa(ip),
    ‘ihash’:ihash,
    ‘port’:ord(port[0])*256+ord(port[1]),
    ‘buffer’:”,
    ‘callback’:handleData});
    return peers;

    #register a new incoming connection
    def onNew(socket,(ip,port)):
    obj = {‘socket’:socket,
    ‘ip’:ip,
    ‘port’:port,
    ‘state’:1,
    ‘buffer’:”,
    ‘callback’:handleData};
    handleData(obj)
    server.track_socket(obj);

    #reply to a socket with a torrent message
    def reply(socket,mtype,payload):
    #first flatten the payload
    s = ”;
    while len(payload):
    p = payload.pop();
    if isinstance(p,int):
    s += struct.pack(‘!i’,p)
    else:
    s += p
    s = chr(mtype) + s;
    l = len(s);
    pl = struct.pack(‘!i’,l) + s;
    socket.sendall(ppl);
    return pl;

    #parse torrent msg
    def handleMsg(obj,msg):
    if msg[0] == 0: #choke
    obj[‘state’] &= ~4
    elif msg[0] == 1: #unchoke
    obj[‘state’] |= 4
    #and we’d like to request a piece from them.
    reply(obj[‘socket’],6,[0,0,2<<15])
    elif msg[0] == 2: #interested
    obj[‘state’] |= 8
    elif msg[0] == 3: #uninterested
    obj[‘state’] &= ~8
    elif msg[0] == 4: #have
    idx = struct.unpack(‘!i’,msg[1:5])
    obj[‘have’][idx/8] |= (1 << (8 – (idx%8)))
    elif msg[0] == 5: #bitfield
    obj[‘have’] = msg[1:]
    elif msg[0] == 6: #request
    #ignored
    return;
    elif msg[0] == 7: #piece
    #got our data
    print "succeeded in downloading!"
    elif msg[0] == 8: #cancel
    #we aren’t in that business
    return;
    #parse incoming data
    def handleData(obj):
    try: nbuf = obj[‘socket’].recv(4096)
    except socket.error, err:
    print "disconnected %s" %obj[‘ip’]
    server.closed_socket(obj[‘socket’])
    return;
    if not nbuf:
    print "disconnected %s" % obj[‘ip’]
    server.closed_socket(obj[‘socket’])
    return;
    data = obj[‘buffer’] + nbuf;
    #Handshake
    if obj[‘state’] &2 == 0:
    if data[0] == chr(19) and len(data) >= 68:
    obj[‘ihash’] = data[28:48]
    obj[‘peerid’] = data[48:68]
    obj[‘buffer’] = data[68:]
    obj[‘state’] += 2
    if obj[‘state’] & 1== 0:
    #we need to respond to the handshake
    obj[‘socket’].sendall(handshake(obj[‘ihash’]))
    obj[‘state’] += 1
    print "shook hands with %s" % obj[‘ip’];

    #all other messages are prefixed by their length
    else:
    mlen = struct.unpack(‘!i’,data[0:4])[0]
    while len(data) > mlen:
    msg = data[1:mlen+1]
    data = data[mlen+1:]
    if len(msg):
    #Actual message received
    handleMsg(obj,msg);
    if len(data):
    mlen = ord(data[0]);
    else:
    break;
    #save the rest of the data
    obj[‘buffer’] = data
    print "unknown message %s"%data

    #define the handhsake
    def handshake(ihash):
    announce = chr(19) + ‘BitTorrent protocol’
    announce += chr(0x0)*8
    announce += ihash
    announce += myID
    return announce

    #talk with cached peers
    def onTimeout():
    global peerCache;
    inited = 0;
    if len(peerCache) == 0:
    return True;
    for i in range(5):
    if len(peerCache) == 0:
    break;
    obj = peerCache.pop()
    obj = server.track_socket(obj)
    if not obj:
    continue;
    obj[‘socket’].sendall(handshake(obj[‘ihash’]))
    obj[‘state’] &= 1
    return False;

    def usage():
    global peerCache;
    print "Usage:";
    print "client –file=loc.torrent";
    print "Will report on statistics for the desired torrent";

    def main():
    filePath = "default.torrent";
    try:
    opts, args = getopt.getopt(sys.argv[1:], "hf:", ["help", "file="])
    except getopt.GetoptError, err:
    # print help information and exit:
    print str(err) # will print something like "option -a not recognized";
    usage();
    sys.exit(2);
    for o, a in opts:
    if o in ("-h", "–help"):
    usage();
    sys.exit();
    elif o in ("-f", "–file"):
    filePath = a;
    else:
    assert False, "unhandled option";
    print "Loading Info… ",
    info = readFile(filePath);
    print "okay";
    port = 6886;
    print "Detecting Swarm… ",
    seeds = register(info,port);
    print len(seeds), " peers returned";
    peerCache.extend(seeds);
    print "Entering Main Loop";
    onTimeout();
    server.main_loop(onTimeout,onNew,port);
    print "Finished Snapshot";

    if __name__ == "__main__":
    main()
    [/python]