update ironwood dependency, fix ansible code, go mod tidy

This commit is contained in:
Arceliar 2021-05-23 17:52:10 -05:00
parent 018f35d9a2
commit 6bc2044ced
13 changed files with 25 additions and 3198 deletions

View File

@ -1,61 +0,0 @@
package main
import (
"fmt"
"sort"
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
)
func doListen(recvNode *simNode) {
// TODO be able to stop the listeners somehow so they don't leak across different tests
for {
c, err := recvNode.listener.Accept()
if err != nil {
panic(err)
}
c.Close()
}
}
func dialTest(sendNode, recvNode *simNode) {
if sendNode.id == recvNode.id {
fmt.Println("Skipping dial to self")
return
}
var mask crypto.NodeID
for idx := range mask {
mask[idx] = 0xff
}
for {
c, err := sendNode.dialer.DialByNodeIDandMask(nil, &recvNode.nodeID, &mask)
if c != nil {
c.Close()
return
}
if err != nil {
fmt.Println("Dial failed:", err)
}
time.Sleep(time.Second)
}
}
func dialStore(store nodeStore) {
var nodeIdxs []int
for idx, n := range store {
nodeIdxs = append(nodeIdxs, idx)
go doListen(n)
}
sort.Slice(nodeIdxs, func(i, j int) bool {
return nodeIdxs[i] < nodeIdxs[j]
})
for _, idx := range nodeIdxs {
sendNode := store[idx]
for _, jdx := range nodeIdxs {
recvNode := store[jdx]
fmt.Printf("Dialing from node %d to node %d / %d...\n", idx, jdx, len(store))
dialTest(sendNode, recvNode)
}
}
}

View File

@ -1,6 +0,0 @@
package main
func main() {
store := makeStoreSquareGrid(4)
dialStore(store)
}

View File

@ -1,28 +0,0 @@
package main
import (
"io/ioutil"
"github.com/gologme/log"
"github.com/yggdrasil-network/yggdrasil-go/src/config"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
)
type simNode struct {
core yggdrasil.Core
id int
nodeID crypto.NodeID
dialer *yggdrasil.Dialer
listener *yggdrasil.Listener
}
func newNode(id int) *simNode {
n := simNode{id: id}
n.core.Start(config.GenerateConfig(), log.New(ioutil.Discard, "", 0))
n.nodeID = *n.core.NodeID()
n.dialer, _ = n.core.ConnDialer()
n.listener, _ = n.core.ConnListen()
return &n
}

View File

@ -1,41 +0,0 @@
package main
type nodeStore map[int]*simNode
func makeStoreSingle() nodeStore {
s := make(nodeStore)
s[0] = newNode(0)
return s
}
func linkNodes(a *simNode, b *simNode) {
la := a.core.NewSimlink()
lb := b.core.NewSimlink()
la.SetDestination(lb)
lb.SetDestination(la)
la.Start()
lb.Start()
}
func makeStoreSquareGrid(sideLength int) nodeStore {
store := make(nodeStore)
nNodes := sideLength * sideLength
idxs := make([]int, 0, nNodes)
// TODO shuffle nodeIDs
for idx := 1; idx <= nNodes; idx++ {
idxs = append(idxs, idx)
}
for _, idx := range idxs {
n := newNode(idx)
store[idx] = n
}
for idx := 0; idx < nNodes; idx++ {
if (idx % sideLength) != 0 {
linkNodes(store[idxs[idx]], store[idxs[idx-1]])
}
if idx >= sideLength {
linkNodes(store[idxs[idx]], store[idxs[idx-sideLength]])
}
}
return store
}

View File

@ -6,6 +6,7 @@ This file generates crypto keys for [ansible-yggdrasil](https://github.com/jcgru
package main
import (
"crypto/ed25519"
"encoding/hex"
"flag"
"fmt"
@ -14,7 +15,6 @@ import (
"github.com/cheggaaa/pb/v3"
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
)
var numHosts = flag.Int("hosts", 1, "number of host vars to generate")
@ -23,7 +23,6 @@ var keyTries = flag.Int("tries", 1000, "number of tries before taking the best k
type keySet struct {
priv []byte
pub []byte
id []byte
ip string
}
@ -37,27 +36,15 @@ func main() {
return
}
var encryptionKeys []keySet
var keys []keySet
for i := 0; i < *numHosts+1; i++ {
encryptionKeys = append(encryptionKeys, newBoxKey())
keys = append(keys, newKey())
bar.Increment()
}
encryptionKeys = sortKeySetArray(encryptionKeys)
keys = sortKeySetArray(keys)
for i := 0; i < *keyTries-*numHosts-1; i++ {
encryptionKeys[0] = newBoxKey()
encryptionKeys = bubbleUpTo(encryptionKeys, 0)
bar.Increment()
}
var signatureKeys []keySet
for i := 0; i < *numHosts+1; i++ {
signatureKeys = append(signatureKeys, newSigKey())
bar.Increment()
}
signatureKeys = sortKeySetArray(signatureKeys)
for i := 0; i < *keyTries-*numHosts-1; i++ {
signatureKeys[0] = newSigKey()
signatureKeys = bubbleUpTo(signatureKeys, 0)
keys[0] = newKey()
keys = bubbleUpTo(keys, 0)
bar.Increment()
}
@ -70,43 +57,36 @@ func main() {
return
}
defer file.Close()
file.WriteString(fmt.Sprintf("yggdrasil_encryption_public_key: %v\n", hex.EncodeToString(encryptionKeys[i].pub)))
file.WriteString("yggdrasil_encryption_private_key: \"{{ vault_yggdrasil_encryption_private_key }}\"\n")
file.WriteString(fmt.Sprintf("yggdrasil_signing_public_key: %v\n", hex.EncodeToString(signatureKeys[i].pub)))
file.WriteString("yggdrasil_signing_private_key: \"{{ vault_yggdrasil_signing_private_key }}\"\n")
file.WriteString(fmt.Sprintf("ansible_host: %v\n", encryptionKeys[i].ip))
file.WriteString(fmt.Sprintf("yggdrasil_public_key: %v\n", hex.EncodeToString(keys[i].pub)))
file.WriteString("yggdrasil_private_key: \"{{ vault_yggdrasil_private_key }}\"\n")
file.WriteString(fmt.Sprintf("ansible_host: %v\n", keys[i].ip))
file, err = os.Create(fmt.Sprintf("host_vars/%x/vault", i))
if err != nil {
return
}
defer file.Close()
file.WriteString(fmt.Sprintf("vault_yggdrasil_encryption_private_key: %v\n", hex.EncodeToString(encryptionKeys[i].priv)))
file.WriteString(fmt.Sprintf("vault_yggdrasil_signing_private_key: %v\n", hex.EncodeToString(signatureKeys[i].priv)))
file.WriteString(fmt.Sprintf("vault_yggdrasil_private_key: %v\n", hex.EncodeToString(keys[i].priv)))
bar.Increment()
}
bar.Finish()
}
func newBoxKey() keySet {
pub, priv := crypto.NewBoxKeys()
id := crypto.GetNodeID(pub)
ip := net.IP(address.AddrForNodeID(id)[:]).String()
return keySet{priv[:], pub[:], id[:], ip}
}
func newSigKey() keySet {
pub, priv := crypto.NewSigKeys()
id := crypto.GetTreeID(pub)
return keySet{priv[:], pub[:], id[:], ""}
func newKey() keySet {
pub, priv, err := ed25519.GenerateKey(nil)
if err != nil {
panic(err)
}
ip := net.IP(address.AddrForKey(pub)[:]).String()
return keySet{priv[:], pub[:], ip}
}
func isBetter(oldID, newID []byte) bool {
for idx := range oldID {
if newID[idx] > oldID[idx] {
if newID[idx] < oldID[idx] {
return true
}
if newID[idx] < oldID[idx] {
if newID[idx] > oldID[idx] {
return false
}
}
@ -122,7 +102,7 @@ func sortKeySetArray(sets []keySet) []keySet {
func bubbleUpTo(sets []keySet, num int) []keySet {
for i := 0; i < len(sets)-num-1; i++ {
if isBetter(sets[i+1].id, sets[i].id) {
if isBetter(sets[i+1].pub, sets[i].pub) {
var tmp = sets[i]
sets[i] = sets[i+1]
sets[i+1] = tmp

3
go.mod
View File

@ -3,7 +3,7 @@ module github.com/yggdrasil-network/yggdrasil-go
go 1.16
require (
github.com/Arceliar/ironwood v0.0.0-20210519013150-a5401869b037
github.com/Arceliar/ironwood v0.0.0-20210523223424-d320cf0ed78e
github.com/Arceliar/phony v0.0.0-20210209235338-dde1a8dca979
github.com/cheggaaa/pb/v3 v3.0.6
github.com/fatih/color v1.10.0 // indirect
@ -16,7 +16,6 @@ require (
github.com/rivo/uniseg v0.2.0 // indirect
github.com/vishvananda/netlink v1.1.0
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b
golang.org/x/text v0.3.6-0.20210220033129-8f690f22cf1c

4
go.sum
View File

@ -1,5 +1,5 @@
github.com/Arceliar/ironwood v0.0.0-20210519013150-a5401869b037 h1:SQ7opLc8dCNAgyYIeVZUGwvZ5YrfqRLHMwOGWfH/S/k=
github.com/Arceliar/ironwood v0.0.0-20210519013150-a5401869b037/go.mod h1:RP72rucOFm5udrnEzTmIWLRVGQiV/fSUAQXJ0RST/nk=
github.com/Arceliar/ironwood v0.0.0-20210523223424-d320cf0ed78e h1:EoZ4Dfm3xBDFjXRUzZUH+44NVvQ8tLf/VyESuC0BijI=
github.com/Arceliar/ironwood v0.0.0-20210523223424-d320cf0ed78e/go.mod h1:RP72rucOFm5udrnEzTmIWLRVGQiV/fSUAQXJ0RST/nk=
github.com/Arceliar/phony v0.0.0-20210209235338-dde1a8dca979 h1:WndgpSW13S32VLQ3ugUxx2EnnWmgba1kCqPkd4Gk1yQ=
github.com/Arceliar/phony v0.0.0-20210209235338-dde1a8dca979/go.mod h1:6Lkn+/zJilRMsKmbmG1RPoamiArC6HS73xbwRyp3UyI=
github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM=

File diff suppressed because it is too large Load Diff

View File

@ -1,62 +0,0 @@
import glob
import sys
inputDirPath = sys.argv[1]
inputFilePaths = glob.glob(inputDirPath+"/*")
inputFilePaths.sort()
merged = dict()
stretches = []
total = 0
for inputFilePath in inputFilePaths:
print "Processing file {}".format(inputFilePath)
with open(inputFilePath, 'r') as f:
inData = f.readlines()
pathsChecked = 0.
avgStretch = 0.
for line in inData:
dat = line.rstrip('\n').split(' ')
eHops = int(dat[0])
nHops = int(dat[1])
count = int(dat[2])
if eHops not in merged: merged[eHops] = dict()
if nHops not in merged[eHops]: merged[eHops][nHops] = 0
merged[eHops][nHops] += count
total += count
pathsChecked += count
stretch = float(nHops)/eHops
avgStretch += stretch*count
finStretch = avgStretch / max(1, pathsChecked)
stretches.append(str(finStretch))
hopsUsed = 0.
hopsNeeded = 0.
avgStretch = 0.
results = []
for eHops in sorted(merged.keys()):
for nHops in sorted(merged[eHops].keys()):
count = merged[eHops][nHops]
result = "{} {} {}".format(eHops, nHops, count)
results.append(result)
hopsUsed += nHops*count
hopsNeeded += eHops*count
stretch = float(nHops)/eHops
avgStretch += stretch*count
print result
bandwidthUsage = hopsUsed/max(1, hopsNeeded)
avgStretch /= max(1, total)
with open("results.txt", "w") as f:
f.write('\n'.join(results))
with open("stretches.txt", "w") as f:
f.write('\n'.join(stretches))
print "Total files processed: {}".format(len(inputFilePaths))
print "Total paths found: {}".format(total)
print "Bandwidth usage: {}".format(bandwidthUsage)
print "Average stretch: {}".format(avgStretch)

View File

@ -1,2 +0,0 @@
#!/bin/bash
go run -tags debug misc/sim/treesim.go "$@"

View File

@ -1,901 +0,0 @@
# Tree routing scheme (named Yggdrasil, after the world tree from Norse mythology)
# Steps:
# 1: Pick any node, here I'm using highest nodeID
# 2: Build spanning tree, each node stores path back to root
# Optionally with weights for each hop
# Ties broken by preferring a parent with higher degree
# 3: Distance metric: self->peer + (via tree) peer->dest
# 4: Perform (modified) greedy lookup via this metric for each direction (A->B and B->A)
# 5: Source-route traffic using the better of those two paths
# Note: This makes no attempt to simulate a dynamic network
# E.g. A node's peers cannot be disconnected
# TODO:
# Make better use of drop?
# In particular, we should be ignoring *all* recently dropped *paths* to the root
# To minimize route flapping
# Not really an issue in the sim, but probably needed for a real network
import array
import gc
import glob
import gzip
import heapq
import os
import random
import time
#############
# Constants #
#############
# Reminder of where link cost comes in
LINK_COST = 1
# Timeout before dropping something, in simulated seconds
TIMEOUT = 60
###########
# Classes #
###########
class PathInfo:
def __init__(self, nodeID):
self.nodeID = nodeID # e.g. IP
self.coords = [] # Position in tree
self.tstamp = 0 # Timestamp from sender, to keep track of old vs new info
self.degree = 0 # Number of peers the sender has, used to break ties
# The above should be signed
self.path = [nodeID] # Path to node (in path-vector route)
self.time = 0 # Time info was updated, to keep track of e.g. timeouts
self.treeID = nodeID # Hack, let tree use different ID than IP, used so we can dijkstra once and test many roots
def clone(self):
# Return a deep-enough copy of the path
clone = PathInfo(None)
clone.nodeID = self.nodeID
clone.coords = self.coords[:]
clone.tstamp = self.tstamp
clone.degree = self.degree
clone.path = self.path[:]
clone.time = self.time
clone.treeID = self.treeID
return clone
# End class PathInfo
class Node:
def __init__(self, nodeID):
self.info = PathInfo(nodeID) # Self NodeInfo
self.root = None # PathInfo to node at root of tree
self.drop = dict() # PathInfo to nodes from clus that have timed out
self.peers = dict() # PathInfo to peers
self.links = dict() # Links to peers (to pass messages)
self.msgs = [] # Said messages
self.table = dict() # Pre-computed lookup table of peer info
def tick(self):
# Do periodic maintenance stuff, including push updates
self.info.time += 1
if self.info.time > self.info.tstamp + TIMEOUT/4:
# Update timestamp at least once every 1/4 timeout period
# This should probably be randomized in a real implementation
self.info.tstamp = self.info.time
self.info.degree = 0# TODO decide if degree should be used, len(self.peers)
changed = False # Used to track when the network has converged
changed |= self.cleanRoot()
self.cleanDropped()
# Should probably send messages infrequently if there's nothing new to report
if self.info.tstamp == self.info.time:
msg = self.createMessage()
self.sendMessage(msg)
return changed
def cleanRoot(self):
changed = False
if self.root and self.info.time - self.root.time > TIMEOUT:
print "DEBUG: clean root,", self.root.path
self.drop[self.root.treeID] = self.root
self.root = None
changed = True
if not self.root or self.root.treeID < self.info.treeID:
# No need to drop someone who'se worse than us
self.info.coords = [self.info.nodeID]
self.root = self.info.clone()
changed = True
elif self.root.treeID == self.info.treeID:
self.root = self.info.clone()
return changed
def cleanDropped(self):
# May actually be a treeID... better to iterate over keys explicitly
nodeIDs = sorted(self.drop.keys())
for nodeID in nodeIDs:
node = self.drop[nodeID]
if self.info.time - node.time > 4*TIMEOUT:
del self.drop[nodeID]
return None
def createMessage(self):
# Message is just a tuple
# First element is the sender
# Second element is the root
# We will .clone() everything during the send operation
msg = (self.info, self.root)
return msg
def sendMessage(self, msg):
for link in self.links.values():
newMsg = (msg[0].clone(), msg[1].clone())
link.msgs.append(newMsg)
return None
def handleMessages(self):
changed = False
while self.msgs:
changed |= self.handleMessage(self.msgs.pop())
return changed
def handleMessage(self, msg):
changed = False
for node in msg:
# Update the path and timestamp for the sender and root info
node.path.append(self.info.nodeID)
node.time = self.info.time
# Update the sender's info in our list of peers
sender = msg[0]
self.peers[sender.nodeID] = sender
# Decide if we want to update the root
root = msg[1]
updateRoot = False
isSameParent = False
isBetterParent = False
if len(self.root.path) > 1 and len(root.path) > 1:
parent = self.peers[self.root.path[-2]]
if parent.nodeID == sender.nodeID: isSameParent = True
if sender.degree > parent.degree:
# This would also be where you check path uptime/reliability/whatever
# All else being equal, we prefer parents with high degree
# We are trusting peers to report degree correctly in this case
# So expect some performance reduction if your peers aren't trustworthy
# (Lies can increase average stretch by a few %)
isBetterParent = True
if self.info.nodeID in root.path[:-1]: pass # No loopy routes allowed
elif root.treeID in self.drop and self.drop[root.treeID].tstamp >= root.tstamp: pass
elif not self.root: updateRoot = True
elif self.root.treeID < root.treeID: updateRoot = True
elif self.root.treeID != root.treeID: pass
elif self.root.tstamp > root.tstamp: pass
elif len(root.path) < len(self.root.path): updateRoot = True
elif isBetterParent and len(root.path) == len(self.root.path): updateRoot = True
elif isSameParent and self.root.tstamp < root.tstamp: updateRoot = True
if updateRoot:
if not self.root or self.root.path != root.path: changed = True
self.root = root
self.info.coords = self.root.path
return changed
def lookup(self, dest):
# Note: Can loop in an unconverged network
# The person looking up the route is responsible for checking for loops
best = None
bestDist = 0
for node in self.peers.itervalues():
# dist = distance to node + dist (on tree) from node to dest
dist = len(node.path)-1 + treeDist(node.coords, dest.coords)
if not best or dist < bestDist:
best = node
bestDist = dist
if best:
next = best.path[-2]
assert next in self.peers
return next
else:
# We failed to look something up
# TODO some way to signal this which doesn't crash
assert False
def initTable(self):
# Pre-computes a lookup table for destination coords
# Insert parent first so you prefer them as a next-hop
self.table.clear()
parent = self.info.nodeID
if len(self.info.coords) >= 2: parent = self.info.coords[-2]
for peer in self.peers.itervalues():
current = self.table
for coord in peer.coords:
if coord not in current: current[coord] = (peer.nodeID, dict())
old = current[coord]
next = old[1]
oldPeer = self.peers[old[0]]
oldDist = len(oldPeer.coords)
oldDeg = oldPeer.degree
newDist = len(peer.coords)
newDeg = peer.degree
# Prefer parent
# Else prefer short distance from root
# If equal distance, prefer high degree
if peer.nodeID == parent: current[coord] = (peer.nodeID, next)
elif newDist < oldDist: current[coord] = (peer.nodeID, next)
elif newDist == oldDist and newDeg > oldDeg: current[coord] = (peer.nodeID, next)
current = next
return None
def lookup_new(self, dest):
# Use pre-computed lookup table to look up next hop for dest coords
assert self.table
if len(self.info.coords) >= 2: parent = self.info.coords[-2]
else: parent = None
current = (parent, self.table)
c = None
for coord in dest.coords:
c = coord
if coord not in current[1]: break
current = current[1][coord]
next = current[0]
if c in self.peers: next = c
if next not in self.peers:
assert next == None
# You're the root of a different connected component
# You'd drop the packet in this case
# To make the path cache not die, need to return a valid next hop...
# Returning self for that reason
next = self.info.nodeID
return next
# End class Node
####################
# Helper Functions #
####################
def getIndexOfLCA(source, dest):
# Return index of last common ancestor in source/dest coords
# -1 if no common ancestor (e.g. different roots)
lcaIdx = -1
minLen = min(len(source), len(dest))
for idx in xrange(minLen):
if source[idx] == dest[idx]: lcaIdx = idx
else: break
return lcaIdx
def treePath(source, dest):
# Return path with source at head and dest at tail
lastMatch = getIndexOfLCA(source, dest)
path = dest[-1:lastMatch:-1] + source[lastMatch:]
assert path[0] == dest[-1]
assert path[-1] == source[-1]
return path
def treeDist(source, dest):
dist = len(source) + len(dest)
lcaIdx = getIndexOfLCA(source, dest)
dist -= 2*(lcaIdx+1)
return dist
def dijkstra(nodestore, startingNodeID):
# Idea to use heapq and basic implementation taken from stackexchange post
# http://codereview.stackexchange.com/questions/79025/dijkstras-algorithm-in-python
nodeIDs = sorted(nodestore.keys())
nNodes = len(nodeIDs)
idxs = dict()
for nodeIdx in xrange(nNodes):
nodeID = nodeIDs[nodeIdx]
idxs[nodeID] = nodeIdx
dists = array.array("H", [0]*nNodes)
queue = [(0, startingNodeID)]
while queue:
dist, nodeID = heapq.heappop(queue)
idx = idxs[nodeID]
if not dists[idx]: # Unvisited, otherwise we skip it
dists[idx] = dist
for peer in nodestore[nodeID].links:
if not dists[idxs[peer]]:
# Peer is also unvisited, so add to queue
heapq.heappush(queue, (dist+LINK_COST, peer))
return dists
def dijkstrall(nodestore):
# Idea to use heapq and basic implementation taken from stackexchange post
# http://codereview.stackexchange.com/questions/79025/dijkstras-algorithm-in-python
nodeIDs = sorted(nodestore.keys())
nNodes = len(nodeIDs)
idxs = dict()
for nodeIdx in xrange(nNodes):
nodeID = nodeIDs[nodeIdx]
idxs[nodeID] = nodeIdx
dists = array.array("H", [0]*nNodes*nNodes) # use GetCacheIndex(nNodes, start, end)
for sourceIdx in xrange(nNodes):
print "Finding shortest paths for node {} / {} ({})".format(sourceIdx+1, nNodes, nodeIDs[sourceIdx])
queue = [(0, sourceIdx)]
while queue:
dist, nodeIdx = heapq.heappop(queue)
distIdx = getCacheIndex(nNodes, sourceIdx, nodeIdx)
if not dists[distIdx]: # Unvisited, otherwise we skip it
dists[distIdx] = dist
for peer in nodestore[nodeIDs[nodeIdx]].links:
pIdx = idxs[peer]
pdIdx = getCacheIndex(nNodes, sourceIdx, pIdx)
if not dists[pdIdx]:
# Peer is also unvisited, so add to queue
heapq.heappush(queue, (dist+LINK_COST, pIdx))
return dists
def linkNodes(node1, node2):
node1.links[node2.info.nodeID] = node2
node2.links[node1.info.nodeID] = node1
############################
# Store topology functions #
############################
def makeStoreSquareGrid(sideLength, randomize=True):
# Simple grid in a sideLength*sideLength square
# Just used to validate that the code runs
store = dict()
nodeIDs = list(range(sideLength*sideLength))
if randomize: random.shuffle(nodeIDs)
for nodeID in nodeIDs:
store[nodeID] = Node(nodeID)
for index in xrange(len(nodeIDs)):
if (index % sideLength != 0): linkNodes(store[nodeIDs[index]], store[nodeIDs[index-1]])
if (index >= sideLength): linkNodes(store[nodeIDs[index]], store[nodeIDs[index-sideLength]])
print "Grid store created, size {}".format(len(store))
return store
def makeStoreASRelGraph(pathToGraph):
#Existing network graphs, in caida.org's asrel format (ASx|ASy|z per line, z denotes relationship type)
with open(pathToGraph, "r") as f:
inData = f.readlines()
store = dict()
for line in inData:
if line.strip()[0] == "#": continue # Skip comment lines
line = line.replace('|'," ")
nodes = map(int, line.split()[0:2])
if nodes[0] not in store: store[nodes[0]] = Node(nodes[0])
if nodes[1] not in store: store[nodes[1]] = Node(nodes[1])
linkNodes(store[nodes[0]], store[nodes[1]])
print "CAIDA AS-relation graph successfully imported, size {}".format(len(store))
return store
def makeStoreASRelGraphMaxDeg(pathToGraph, degIdx=0):
with open(pathToGraph, "r") as f:
inData = f.readlines()
store = dict()
nodeDeg = dict()
for line in inData:
if line.strip()[0] == "#": continue # Skip comment lines
line = line.replace('|'," ")
nodes = map(int, line.split()[0:2])
if nodes[0] not in nodeDeg: nodeDeg[nodes[0]] = 0
if nodes[1] not in nodeDeg: nodeDeg[nodes[1]] = 0
nodeDeg[nodes[0]] += 1
nodeDeg[nodes[1]] += 1
sortedNodes = sorted(nodeDeg.keys(), \
key=lambda x: (nodeDeg[x], x), \
reverse=True)
maxDegNodeID = sortedNodes[degIdx]
return makeStoreASRelGraphFixedRoot(pathToGraph, maxDegNodeID)
def makeStoreASRelGraphFixedRoot(pathToGraph, rootNodeID):
with open(pathToGraph, "r") as f:
inData = f.readlines()
store = dict()
for line in inData:
if line.strip()[0] == "#": continue # Skip comment lines
line = line.replace('|'," ")
nodes = map(int, line.split()[0:2])
if nodes[0] not in store:
store[nodes[0]] = Node(nodes[0])
if nodes[0] == rootNodeID: store[nodes[0]].info.treeID += 1000000000
if nodes[1] not in store:
store[nodes[1]] = Node(nodes[1])
if nodes[1] == rootNodeID: store[nodes[1]].info.treeID += 1000000000
linkNodes(store[nodes[0]], store[nodes[1]])
print "CAIDA AS-relation graph successfully imported, size {}".format(len(store))
return store
def makeStoreDimesEdges(pathToGraph, rootNodeID=None):
# Read from a DIMES csv-formatted graph from a gzip file
store = dict()
with gzip.open(pathToGraph, "r") as f:
inData = f.readlines()
size = len(inData)
index = 0
for edge in inData:
if not index % 1000:
pct = 100.0*index/size
print "Processing edge {}, {:.2f}%".format(index, pct)
index += 1
dat = edge.rstrip().split(',')
node1 = "N" + str(dat[0].strip())
node2 = "N" + str(dat[1].strip())
if '?' in node1 or '?' in node2: continue #Unknown node
if node1 == rootNodeID: node1 = "R" + str(dat[0].strip())
if node2 == rootNodeID: node2 = "R" + str(dat[1].strip())
if node1 not in store: store[node1] = Node(node1)
if node2 not in store: store[node2] = Node(node2)
if node1 != node2: linkNodes(store[node1], store[node2])
print "DIMES graph successfully imported, size {}".format(len(store))
return store
def makeStoreGeneratedGraph(pathToGraph, root=None):
with open(pathToGraph, "r") as f:
inData = f.readlines()
store = dict()
for line in inData:
if line.strip()[0] == "#": continue # Skip comment lines
nodes = map(int, line.strip().split(' ')[0:2])
node1 = nodes[0]
node2 = nodes[1]
if node1 == root: node1 += 1000000
if node2 == root: node2 += 1000000
if node1 not in store: store[node1] = Node(node1)
if node2 not in store: store[node2] = Node(node2)
linkNodes(store[node1], store[node2])
print "Generated graph successfully imported, size {}".format(len(store))
return store
############################################
# Functions used as parts of network tests #
############################################
def idleUntilConverged(store):
nodeIDs = sorted(store.keys())
timeOfLastChange = 0
step = 0
# Idle until the network has converged
while step - timeOfLastChange < 4*TIMEOUT:
step += 1
print "Step: {}, last change: {}".format(step, timeOfLastChange)
changed = False
for nodeID in nodeIDs:
# Update node status, send messages
changed |= store[nodeID].tick()
for nodeID in nodeIDs:
# Process messages
changed |= store[nodeID].handleMessages()
if changed: timeOfLastChange = step
initTables(store)
return store
def getCacheIndex(nodes, sourceIndex, destIndex):
return sourceIndex*nodes + destIndex
def initTables(store):
nodeIDs = sorted(store.keys())
nNodes = len(nodeIDs)
print "Initializing routing tables for {} nodes".format(nNodes)
for idx in xrange(nNodes):
nodeID = nodeIDs[idx]
store[nodeID].initTable()
print "Routing tables initialized"
return None
def getCache(store):
nodeIDs = sorted(store.keys())
nNodes = len(nodeIDs)
nodeIdxs = dict()
for nodeIdx in xrange(nNodes):
nodeIdxs[nodeIDs[nodeIdx]] = nodeIdx
cache = array.array("H", [0]*nNodes*nNodes)
for sourceIdx in xrange(nNodes):
sourceID = nodeIDs[sourceIdx]
print "Building fast lookup table for node {} / {} ({})".format(sourceIdx+1, nNodes, sourceID)
for destIdx in xrange(nNodes):
destID = nodeIDs[destIdx]
if sourceID == destID: nextHop = destID # lookup would fail
else: nextHop = store[sourceID].lookup(store[destID].info)
nextHopIdx = nodeIdxs[nextHop]
cache[getCacheIndex(nNodes, sourceIdx, destIdx)] = nextHopIdx
return cache
def testPaths(store, dists):
cache = getCache(store)
nodeIDs = sorted(store.keys())
nNodes = len(nodeIDs)
idxs = dict()
for nodeIdx in xrange(nNodes):
nodeID = nodeIDs[nodeIdx]
idxs[nodeID] = nodeIdx
results = dict()
for sourceIdx in xrange(nNodes):
sourceID = nodeIDs[sourceIdx]
print "Testing paths from node {} / {} ({})".format(sourceIdx+1, len(nodeIDs), sourceID)
#dists = dijkstra(store, sourceID)
for destIdx in xrange(nNodes):
destID = nodeIDs[destIdx]
if destID == sourceID: continue # Skip self
distIdx = getCacheIndex(nNodes, sourceIdx, destIdx)
eHops = dists[distIdx]
if not eHops: continue # The network is split, no path exists
hops = 0
for pair in ((sourceIdx, destIdx),):
nHops = 0
locIdx = pair[0]
dIdx = pair[1]
while locIdx != dIdx:
locIdx = cache[getCacheIndex(nNodes, locIdx, dIdx)]
nHops += 1
if not hops or nHops < hops: hops = nHops
if eHops not in results: results[eHops] = dict()
if hops not in results[eHops]: results[eHops][hops] = 0
results[eHops][hops] += 1
return results
def getAvgStretch(pathMatrix):
avgStretch = 0.
checked = 0.
for eHops in sorted(pathMatrix.keys()):
for nHops in sorted(pathMatrix[eHops].keys()):
count = pathMatrix[eHops][nHops]
stretch = float(nHops)/float(max(1, eHops))
avgStretch += stretch*count
checked += count
avgStretch /= max(1, checked)
return avgStretch
def getMaxStretch(pathMatrix):
maxStretch = 0.
for eHops in sorted(pathMatrix.keys()):
for nHops in sorted(pathMatrix[eHops].keys()):
stretch = float(nHops)/float(max(1, eHops))
maxStretch = max(maxStretch, stretch)
return maxStretch
def getCertSizes(store):
# Returns nCerts frequency distribution
# De-duplicates common certs (for shared prefixes in the path)
sizes = dict()
for node in store.values():
certs = set()
for peer in node.peers.values():
pCerts = set()
assert len(peer.path) == 2
assert peer.coords[-1] == peer.path[0]
hops = peer.coords + peer.path[1:]
for hopIdx in xrange(len(hops)-1):
send = hops[hopIdx]
if send == node.info.nodeID: continue # We created it, already have it
path = hops[0:hopIdx+2]
# Each cert is signed by the sender
# Includes information about the path from the sender to the next hop
# Next hop is at hopIdx+1, so the path to next hop is hops[0:hopIdx+2]
cert = "{}:{}".format(send, path)
certs.add(cert)
size = len(certs)
if size not in sizes: sizes[size] = 0
sizes[size] += 1
return sizes
def getMinLinkCertSizes(store):
# Returns nCerts frequency distribution
# De-duplicates common certs (for shared prefixes in the path)
# Based on the minimum number of certs that must be traded through a particular link
# Handled per link
sizes = dict()
for node in store.values():
peerCerts = dict()
for peer in node.peers.values():
pCerts = set()
assert len(peer.path) == 2
assert peer.coords[-1] == peer.path[0]
hops = peer.coords + peer.path[1:]
for hopIdx in xrange(len(hops)-1):
send = hops[hopIdx]
if send == node.info.nodeID: continue # We created it, already have it
path = hops[0:hopIdx+2]
# Each cert is signed by the sender
# Includes information about the path from the sender to the next hop
# Next hop is at hopIdx+1, so the path to next hop is hops[0:hopIdx+2]
cert = "{}:{}".format(send, path)
pCerts.add(cert)
peerCerts[peer.nodeID] = pCerts
for peer in peerCerts:
size = 0
pCerts = peerCerts[peer]
for cert in pCerts:
required = True
for p2 in peerCerts:
if p2 == peer: continue
p2Certs = peerCerts[p2]
if cert in p2Certs: required = False
if required: size += 1
if size not in sizes: sizes[size] = 0
sizes[size] += 1
return sizes
def getPathSizes(store):
# Returns frequency distribution of the total number of hops the routing table
# I.e. a node with 3 peers, each with 5 hop coord+path, would count as 3x5=15
sizes = dict()
for node in store.values():
size = 0
for peer in node.peers.values():
assert len(peer.path) == 2
assert peer.coords[-1] == peer.path[0]
peerSize = len(peer.coords) + len(peer.path) - 1 # double-counts peer, -1
size += peerSize
if size not in sizes: sizes[size] = 0
sizes[size] += 1
return sizes
def getPeerSizes(store):
# Returns frequency distribution of the number of peers each node has
sizes = dict()
for node in store.values():
nPeers = len(node.peers)
if nPeers not in sizes: sizes[nPeers] = 0
sizes[nPeers] += 1
return sizes
def getAvgSize(sizes):
sumSizes = 0
nNodes = 0
for size in sizes:
count = sizes[size]
sumSizes += size*count
nNodes += count
avgSize = float(sumSizes)/max(1, nNodes)
return avgSize
def getMaxSize(sizes):
return max(sizes.keys())
def getMinSize(sizes):
return min(sizes.keys())
def getResults(pathMatrix):
results = []
for eHops in sorted(pathMatrix.keys()):
for nHops in sorted(pathMatrix[eHops].keys()):
count = pathMatrix[eHops][nHops]
results.append("{} {} {}".format(eHops, nHops, count))
return '\n'.join(results)
####################################
# Functions to run different tests #
####################################
def runTest(store):
# Runs the usual set of tests on the store
# Does not save results, so only meant for quick tests
# To e.g. check the code works, maybe warm up the pypy jit
for node in store.values():
node.info.time = random.randint(0, TIMEOUT)
node.info.tstamp = TIMEOUT
print "Begin testing network"
dists = None
if not dists: dists = dijkstrall(store)
idleUntilConverged(store)
pathMatrix = testPaths(store, dists)
avgStretch = getAvgStretch(pathMatrix)
maxStretch = getMaxStretch(pathMatrix)
peers = getPeerSizes(store)
certs = getCertSizes(store)
paths = getPathSizes(store)
linkCerts = getMinLinkCertSizes(store)
avgPeerSize = getAvgSize(peers)
maxPeerSize = getMaxSize(peers)
avgCertSize = getAvgSize(certs)
maxCertSize = getMaxSize(certs)
avgPathSize = getAvgSize(paths)
maxPathSize = getMaxSize(paths)
avgLinkCert = getAvgSize(linkCerts)
maxLinkCert = getMaxSize(linkCerts)
totalCerts = sum(map(lambda x: x*certs[x], certs.keys()))
totalLinks = sum(map(lambda x: x*peers[x], peers.keys())) # one-way links
avgCertsPerLink = float(totalCerts)/max(1, totalLinks)
print "Finished testing network"
print "Avg / Max stretch: {} / {}".format(avgStretch, maxStretch)
print "Avg / Max nPeers size: {} / {}".format(avgPeerSize, maxPeerSize)
print "Avg / Max nCerts size: {} / {}".format(avgCertSize, maxCertSize)
print "Avg / Max total hops in any node's routing table: {} / {}".format(avgPathSize, maxPathSize)
print "Avg / Max lower bound cert requests per link (one-way): {} / {}".format(avgLinkCert, maxLinkCert)
print "Avg certs per link (one-way): {}".format(avgCertsPerLink)
return # End of function
def rootNodeASTest(path, outDir="output-treesim-AS", dists=None, proc = 1):
# Checks performance for every possible choice of root node
# Saves output for each root node to a separate file on disk
# path = input path to some caida.org formatted AS-relationship graph
if not os.path.exists(outDir): os.makedirs(outDir)
assert os.path.exists(outDir)
store = makeStoreASRelGraph(path)
nodes = sorted(store.keys())
for nodeIdx in xrange(len(nodes)):
if nodeIdx % proc != 0: continue # Work belongs to someone else
rootNodeID = nodes[nodeIdx]
outpath = outDir+"/{}".format(rootNodeID)
if os.path.exists(outpath):
print "Skipping {}, already processed".format(rootNodeID)
continue
store = makeStoreASRelGraphFixedRoot(path, rootNodeID)
for node in store.values():
node.info.time = random.randint(0, TIMEOUT)
node.info.tstamp = TIMEOUT
print "Beginning {}, size {}".format(nodeIdx, len(store))
if not dists: dists = dijkstrall(store)
idleUntilConverged(store)
pathMatrix = testPaths(store, dists)
avgStretch = getAvgStretch(pathMatrix)
maxStretch = getMaxStretch(pathMatrix)
results = getResults(pathMatrix)
with open(outpath, "w") as f:
f.write(results)
print "Finished test for root AS {} ({} / {})".format(rootNodeID, nodeIdx+1, len(store))
print "Avg / Max stretch: {} / {}".format(avgStretch, maxStretch)
#break # Stop after 1, because they can take forever
return # End of function
def timelineASTest():
# Meant to study the performance of the network as a function of network size
# Loops over a set of AS-relationship graphs
# Runs a test on each graph, selecting highest-degree node as the root
# Saves results for each graph to a separate file on disk
outDir = "output-treesim-timeline-AS"
if not os.path.exists(outDir): os.makedirs(outDir)
assert os.path.exists(outDir)
paths = sorted(glob.glob("asrel/datasets/*"))
for path in paths:
date = os.path.basename(path).split(".")[0]
outpath = outDir+"/{}".format(date)
if os.path.exists(outpath):
print "Skipping {}, already processed".format(date)
continue
store = makeStoreASRelGraphMaxDeg(path)
dists = None
for node in store.values():
node.info.time = random.randint(0, TIMEOUT)
node.info.tstamp = TIMEOUT
print "Beginning {}, size {}".format(date, len(store))
if not dists: dists = dijkstrall(store)
idleUntilConverged(store)
pathMatrix = testPaths(store, dists)
avgStretch = getAvgStretch(pathMatrix)
maxStretch = getMaxStretch(pathMatrix)
results = getResults(pathMatrix)
with open(outpath, "w") as f:
f.write(results)
print "Finished {} with {} nodes".format(date, len(store))
print "Avg / Max stretch: {} / {}".format(avgStretch, maxStretch)
#break # Stop after 1, because they can take forever
return # End of function
def timelineDimesTest():
# Meant to study the performance of the network as a function of network size
# Loops over a set of AS-relationship graphs
# Runs a test on each graph, selecting highest-degree node as the root
# Saves results for each graph to a separate file on disk
outDir = "output-treesim-timeline-dimes"
if not os.path.exists(outDir): os.makedirs(outDir)
assert os.path.exists(outDir)
# Input files are named ASEdgesX_Y where X = month (no leading 0), Y = year
paths = sorted(glob.glob("DIMES/ASEdges/*.gz"))
exists = set(glob.glob(outDir+"/*"))
for path in paths:
date = os.path.basename(path).split(".")[0]
outpath = outDir+"/{}".format(date)
if outpath in exists:
print "Skipping {}, already processed".format(date)
continue
store = makeStoreDimesEdges(path)
# Get the highest degree node and make it root
# Sorted by nodeID just to make it stable in the event of a tie
nodeIDs = sorted(store.keys())
bestRoot = ""
bestDeg = 0
for nodeID in nodeIDs:
node = store[nodeID]
if len(node.links) > bestDeg:
bestRoot = nodeID
bestDeg = len(node.links)
assert bestRoot
store = makeStoreDimesEdges(path, bestRoot)
rootID = "R" + bestRoot[1:]
assert rootID in store
# Don't forget to set random seed before setting times
# To make results reproducible
nodeIDs = sorted(store.keys())
random.seed(12345)
for nodeID in nodeIDs:
node = store[nodeID]
node.info.time = random.randint(0, TIMEOUT)
node.info.tstamp = TIMEOUT
print "Beginning {}, size {}".format(date, len(store))
if not dists: dists = dijkstrall(store)
idleUntilConverged(store)
pathMatrix = testPaths(store, dists)
avgStretch = getAvgStretch(pathMatrix)
maxStretch = getMaxStretch(pathMatrix)
results = getResults(pathMatrix)
with open(outpath, "w") as f:
f.write(results)
print "Finished {} with {} nodes".format(date, len(store))
print "Avg / Max stretch: {} / {}".format(avgStretch, maxStretch)
break # Stop after 1, because they can take forever
return # End of function
def scalingTest(maxTests=None, inputDir="graphs"):
# Meant to study the performance of the network as a function of network size
# Loops over a set of nodes in a previously generated graph
# Runs a test on each graph, testing each node as the root
# if maxTests is set, tests only that number of roots (highest degree first)
# Saves results for each graph to a separate file on disk
outDir = "output-treesim-{}".format(inputDir)
if not os.path.exists(outDir): os.makedirs(outDir)
assert os.path.exists(outDir)
paths = sorted(glob.glob("{}/*".format(inputDir)))
exists = set(glob.glob(outDir+"/*"))
for path in paths:
gc.collect() # pypy waits for gc to close files
graph = os.path.basename(path).split(".")[0]
store = makeStoreGeneratedGraph(path)
# Get the highest degree node and make it root
# Sorted by nodeID just to make it stable in the event of a tie
nodeIDs = sorted(store.keys(), key=lambda x: len(store[x].links), reverse=True)
dists = None
if maxTests: nodeIDs = nodeIDs[:maxTests]
for nodeID in nodeIDs:
nodeIDStr = str(nodeID).zfill(len(str(len(store)-1)))
outpath = outDir+"/{}-{}".format(graph, nodeIDStr)
if outpath in exists:
print "Skipping {}-{}, already processed".format(graph, nodeIDStr)
continue
store = makeStoreGeneratedGraph(path, nodeID)
# Don't forget to set random seed before setting times
random.seed(12345) # To make results reproducible
nIDs = sorted(store.keys())
for nID in nIDs:
node = store[nID]
node.info.time = random.randint(0, TIMEOUT)
node.info.tstamp = TIMEOUT
print "Beginning {}, size {}".format(graph, len(store))
if not dists: dists = dijkstrall(store)
idleUntilConverged(store)
pathMatrix = testPaths(store, dists)
avgStretch = getAvgStretch(pathMatrix)
maxStretch = getMaxStretch(pathMatrix)
results = getResults(pathMatrix)
with open(outpath, "w") as f:
f.write(results)
print "Finished {} with {} nodes for root {}".format(graph, len(store), nodeID)
print "Avg / Max stretch: {} / {}".format(avgStretch, maxStretch)
return # End of function
##################
# Main Execution #
##################
if __name__ == "__main__":
if True: # Run a quick test
random.seed(12345) # DEBUG
store = makeStoreSquareGrid(4)
runTest(store) # Quick test
store = None
# Do some real work
#runTest(makeStoreDimesEdges("DIMES/ASEdges/ASEdges1_2007.csv.gz"))
#timelineDimesTest()
#rootNodeASTest("asrel/datasets/19980101.as-rel.txt")
#timelineASTest()
#rootNodeASTest("hype-2016-09-19.list", "output-treesim-hype")
#scalingTest(None, "graphs-20") # First argument 1 to only test 1 root per graph
#store = makeStoreGeneratedGraph("bgp_tables")
#store = makeStoreGeneratedGraph("skitter")
#store = makeStoreASRelGraphMaxDeg("hype-2016-09-19.list") #http://hia.cjdns.ca/watchlist/c/walk.peers.20160919
#store = makeStoreGeneratedGraph("fc00-2017-08-12.txt")
if store: runTest(store)
#rootNodeASTest("skitter", "output-treesim-skitter", None, 0, 1)
#scalingTest(1, "graphs-20") # First argument 1 to only test 1 root per graph
#scalingTest(1, "graphs-21") # First argument 1 to only test 1 root per graph
#scalingTest(1, "graphs-22") # First argument 1 to only test 1 root per graph
#scalingTest(1, "graphs-23") # First argument 1 to only test 1 root per graph
if not store:
import sys
args = sys.argv
if len(args) == 2:
job_number = int(sys.argv[1])
rootNodeASTest("fc00-2017-08-12.txt", "fc00", None, job_number)
else:
print "Usage: {} job_number".format(args[0])
print "job_number = which job set to run on this node (1-indexed)"

View File

@ -1,459 +0,0 @@
// +build !lint
package main
import (
"bufio"
"flag"
"fmt"
"os"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"time"
"github.com/gologme/log"
. "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
. "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
)
////////////////////////////////////////////////////////////////////////////////
type Node struct {
index int
core Core
send chan<- []byte
recv <-chan []byte
}
func (n *Node) init(index int) {
n.index = index
n.core.Init()
n.send = n.core.DEBUG_getSend()
n.recv = n.core.DEBUG_getRecv()
n.core.DEBUG_simFixMTU()
}
func (n *Node) printTraffic() {
for {
packet := <-n.recv
fmt.Println(n.index, packet)
//panic("Got a packet")
}
}
func (n *Node) startPeers() {
//for _, p := range n.core.Peers.Ports {
// go p.MainLoop()
//}
//go n.printTraffic()
//n.core.Peers.DEBUG_startPeers()
}
func linkNodes(m, n *Node) {
// Don't allow duplicates
if m.core.DEBUG_getPeers().DEBUG_hasPeer(n.core.DEBUG_getSigningPublicKey()) {
return
}
// Create peers
// Buffering reduces packet loss in the sim
// This slightly speeds up testing (fewer delays before retrying a ping)
pLinkPub, pLinkPriv := m.core.DEBUG_newBoxKeys()
qLinkPub, qLinkPriv := m.core.DEBUG_newBoxKeys()
p := m.core.DEBUG_getPeers().DEBUG_newPeer(n.core.DEBUG_getEncryptionPublicKey(),
n.core.DEBUG_getSigningPublicKey(), *m.core.DEBUG_getSharedKey(pLinkPriv, qLinkPub))
q := n.core.DEBUG_getPeers().DEBUG_newPeer(m.core.DEBUG_getEncryptionPublicKey(),
m.core.DEBUG_getSigningPublicKey(), *n.core.DEBUG_getSharedKey(qLinkPriv, pLinkPub))
DEBUG_simLinkPeers(p, q)
return
}
func makeStoreSquareGrid(sideLength int) map[int]*Node {
store := make(map[int]*Node)
nNodes := sideLength * sideLength
idxs := make([]int, 0, nNodes)
// TODO shuffle nodeIDs
for idx := 1; idx <= nNodes; idx++ {
idxs = append(idxs, idx)
}
for _, idx := range idxs {
node := &Node{}
node.init(idx)
store[idx] = node
}
for idx := 0; idx < nNodes; idx++ {
if (idx % sideLength) != 0 {
linkNodes(store[idxs[idx]], store[idxs[idx-1]])
}
if idx >= sideLength {
linkNodes(store[idxs[idx]], store[idxs[idx-sideLength]])
}
}
//for _, node := range store { node.initPorts() }
return store
}
func makeStoreStar(nNodes int) map[int]*Node {
store := make(map[int]*Node)
center := &Node{}
center.init(0)
store[0] = center
for idx := 1; idx < nNodes; idx++ {
node := &Node{}
node.init(idx)
store[idx] = node
linkNodes(center, node)
}
return store
}
func loadGraph(path string) map[int]*Node {
f, err := os.Open(path)
if err != nil {
panic(err)
}
defer f.Close()
store := make(map[int]*Node)
s := bufio.NewScanner(f)
for s.Scan() {
line := s.Text()
nodeIdxstrs := strings.Split(line, " ")
nodeIdx0, _ := strconv.Atoi(nodeIdxstrs[0])
nodeIdx1, _ := strconv.Atoi(nodeIdxstrs[1])
if store[nodeIdx0] == nil {
node := &Node{}
node.init(nodeIdx0)
store[nodeIdx0] = node
}
if store[nodeIdx1] == nil {
node := &Node{}
node.init(nodeIdx1)
store[nodeIdx1] = node
}
linkNodes(store[nodeIdx0], store[nodeIdx1])
}
//for _, node := range store { node.initPorts() }
return store
}
////////////////////////////////////////////////////////////////////////////////
func startNetwork(store map[[32]byte]*Node) {
for _, node := range store {
node.startPeers()
}
}
func getKeyedStore(store map[int]*Node) map[[32]byte]*Node {
newStore := make(map[[32]byte]*Node)
for _, node := range store {
newStore[node.core.DEBUG_getSigningPublicKey()] = node
}
return newStore
}
func testPaths(store map[[32]byte]*Node) bool {
nNodes := len(store)
count := 0
for _, source := range store {
count++
fmt.Printf("Testing paths from node %d / %d (%d)\n", count, nNodes, source.index)
for _, dest := range store {
//if source == dest { continue }
destLoc := dest.core.DEBUG_getLocator()
coords := destLoc.DEBUG_getCoords()
temp := 0
ttl := ^uint64(0)
oldTTL := ttl
for here := source; here != dest; {
temp++
if temp > 4096 {
fmt.Println("Loop?")
time.Sleep(time.Second)
return false
}
nextPort := here.core.DEBUG_switchLookup(coords)
// First check if "here" is accepting packets from the previous node
// TODO explain how this works
ports := here.core.DEBUG_getPeers().DEBUG_getPorts()
nextPeer := ports[nextPort]
if nextPeer == nil {
fmt.Println("Peer associated with next port is nil")
return false
}
next := store[nextPeer.DEBUG_getSigKey()]
/*
if next == here {
//for idx, link := range here.links {
// fmt.Println("DUMP:", idx, link.nodeID)
//}
if nextPort != 0 { panic("This should not be") }
fmt.Println("Failed to route:", source.index, here.index, dest.index, oldTTL, ttl)
//here.table.DEBUG_dumpTable()
//fmt.Println("Ports:", here.nodeID, here.ports)
return false
panic(fmt.Sprintln("Routing Loop:",
source.index,
here.index,
dest.index))
}
*/
if temp > 4090 {
fmt.Println("DEBUG:",
source.index, source.core.DEBUG_getLocator(),
here.index, here.core.DEBUG_getLocator(),
dest.index, dest.core.DEBUG_getLocator())
//here.core.DEBUG_getSwitchTable().DEBUG_dumpTable()
}
if here != source {
// This is sufficient to check for routing loops or blackholes
//break
}
if here == next {
fmt.Println("Drop:", source.index, here.index, dest.index, oldTTL)
return false
}
here = next
}
}
}
return true
}
func stressTest(store map[[32]byte]*Node) {
fmt.Println("Stress testing network...")
nNodes := len(store)
dests := make([][]byte, 0, nNodes)
for _, dest := range store {
loc := dest.core.DEBUG_getLocator()
coords := loc.DEBUG_getCoords()
dests = append(dests, coords)
}
lookups := 0
start := time.Now()
for _, source := range store {
for _, coords := range dests {
source.core.DEBUG_switchLookup(coords)
lookups++
}
}
timed := time.Since(start)
fmt.Printf("%d lookups in %s (%f lookups per second)\n",
lookups,
timed,
float64(lookups)/timed.Seconds())
}
func pingNodes(store map[[32]byte]*Node) {
fmt.Println("Sending pings...")
nNodes := len(store)
count := 0
equiv := func(a []byte, b []byte) bool {
if len(a) != len(b) {
return false
}
for idx := 0; idx < len(a); idx++ {
if a[idx] != b[idx] {
return false
}
}
return true
}
for _, source := range store {
count++
//if count > 16 { break }
fmt.Printf("Sending packets from node %d/%d (%d)\n", count, nNodes, source.index)
sourceKey := source.core.DEBUG_getEncryptionPublicKey()
payload := sourceKey[:]
sourceAddr := source.core.DEBUG_getAddr()[:]
sendTo := func(bs []byte, destAddr []byte) {
packet := make([]byte, 40+len(bs))
copy(packet[8:24], sourceAddr)
copy(packet[24:40], destAddr)
copy(packet[40:], bs)
packet[0] = 6 << 4
source.send <- packet
}
destCount := 0
for _, dest := range store {
destCount += 1
fmt.Printf("%d Nodes, %d Send, %d Recv\n", nNodes, count, destCount)
if dest == source {
fmt.Println("Skipping self")
continue
}
destAddr := dest.core.DEBUG_getAddr()[:]
ticker := time.NewTicker(150 * time.Millisecond)
sendTo(payload, destAddr)
for loop := true; loop; {
select {
case packet := <-dest.recv:
{
if equiv(payload, packet[len(packet)-len(payload):]) {
loop = false
}
}
case <-ticker.C:
sendTo(payload, destAddr)
//dumpDHTSize(store) // note that this uses racey functions to read things...
}
}
ticker.Stop()
}
//break // Only try sending pings from 1 node
// This is because, for some reason, stopTun() doesn't always close it
// And if two tuns are up, bad things happen (sends via wrong interface)
}
fmt.Println("Finished pinging nodes")
}
func pingBench(store map[[32]byte]*Node) {
fmt.Println("Benchmarking pings...")
nPings := 0
payload := make([]byte, 1280+40) // MTU + ipv6 header
var timed time.Duration
//nNodes := len(store)
count := 0
for _, source := range store {
count++
//fmt.Printf("Sending packets from node %d/%d (%d)\n", count, nNodes, source.index)
getPing := func(key [32]byte, decodedCoords []byte) []byte {
// TODO write some function to do this the right way, put... somewhere...
coords := DEBUG_wire_encode_coords(decodedCoords)
packet := make([]byte, 0, len(key)+len(coords)+len(payload))
packet = append(packet, key[:]...)
packet = append(packet, coords...)
packet = append(packet, payload[:]...)
return packet
}
for _, dest := range store {
key := dest.core.DEBUG_getEncryptionPublicKey()
loc := dest.core.DEBUG_getLocator()
coords := loc.DEBUG_getCoords()
ping := getPing(key, coords)
// TODO make sure the session is open first
start := time.Now()
for i := 0; i < 1000000; i++ {
source.send <- ping
nPings++
}
timed += time.Since(start)
break
}
break
}
fmt.Printf("Sent %d pings in %s (%f per second)\n",
nPings,
timed,
float64(nPings)/timed.Seconds())
}
func dumpStore(store map[NodeID]*Node) {
for _, node := range store {
fmt.Println("DUMPSTORE:", node.index, node.core.DEBUG_getLocator())
node.core.DEBUG_getSwitchTable().DEBUG_dumpTable()
}
}
func dumpDHTSize(store map[[32]byte]*Node) {
var min, max, sum int
for _, node := range store {
num := node.core.DEBUG_getDHTSize()
min = num
max = num
break
}
for _, node := range store {
num := node.core.DEBUG_getDHTSize()
if num < min {
min = num
}
if num > max {
max = num
}
sum += num
}
avg := float64(sum) / float64(len(store))
fmt.Printf("DHT min %d / avg %f / max %d\n", min, avg, max)
}
func (n *Node) startTCP(listen string) {
n.core.DEBUG_setupAndStartGlobalTCPInterface(listen)
}
func (n *Node) connectTCP(remoteAddr string) {
n.core.AddPeer(remoteAddr, remoteAddr)
}
////////////////////////////////////////////////////////////////////////////////
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile `file`")
var memprofile = flag.String("memprofile", "", "write memory profile to this file")
func main() {
flag.Parse()
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
panic(fmt.Sprintf("could not create CPU profile: ", err))
}
if err := pprof.StartCPUProfile(f); err != nil {
panic(fmt.Sprintf("could not start CPU profile: ", err))
}
defer pprof.StopCPUProfile()
}
if *memprofile != "" {
f, err := os.Create(*memprofile)
if err != nil {
panic(fmt.Sprintf("could not create memory profile: ", err))
}
defer func() { pprof.WriteHeapProfile(f); f.Close() }()
}
fmt.Println("Test")
Util_testAddrIDMask()
idxstore := makeStoreSquareGrid(4)
//idxstore := makeStoreStar(256)
//idxstore := loadGraph("misc/sim/hype-2016-09-19.list")
//idxstore := loadGraph("misc/sim/fc00-2017-08-12.txt")
//idxstore := loadGraph("skitter")
kstore := getKeyedStore(idxstore)
//*
logger := log.New(os.Stderr, "", log.Flags())
for _, n := range kstore {
n.core.DEBUG_setLogger(logger)
}
//*/
startNetwork(kstore)
//time.Sleep(10*time.Second)
// Note that testPaths only works if pressure is turned off
// Otherwise congestion can lead to routing loops?
for finished := false; !finished; {
finished = testPaths(kstore)
}
pingNodes(kstore)
//pingBench(kstore) // Only after disabling debug output
//stressTest(kstore)
//time.Sleep(120 * time.Second)
dumpDHTSize(kstore) // note that this uses racey functions to read things...
if false {
// This connects the sim to the local network
for _, node := range kstore {
node.startTCP("localhost:0")
node.connectTCP("localhost:12345")
break // just 1
}
for _, node := range kstore {
go func() {
// Just dump any packets sent to this node
for range node.recv {
}
}()
}
var block chan struct{}
<-block
}
runtime.GC()
}

View File

@ -250,7 +250,8 @@ func (intf *link) handler() (chan struct{}, error) {
intf.links.core.log.Infof("Connected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
// Run the handler
err = intf.links.core.PacketConn.HandleConn(ed25519.PublicKey(intf.info.key[:]), intf.conn)
var metric uint64 // TODO exchange metric in matadata, use max value
err = intf.links.core.PacketConn.HandleConn(ed25519.PublicKey(intf.info.key[:]), intf.conn, metric)
// TODO don't report an error if it's just a 'use of closed network connection'
if err != nil {
intf.links.core.log.Infof("Disconnected %s: %s, source %s; error: %s",