```
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.MurmurHash
val df_1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../Downloads/2008.csv")
```

`.take(5)`

and you'll see exactly what we're working with./
```
val flightsFromTo = df_1.select($"Origin",$"Dest")
val airportCodes = df_1.select($"Origin", $"Dest").flatMap(x => Iterable(x(0).toString, x(1).toString))
```

Those basically prep our edges and our vertices. The next step is to convert our airport codes to primitive data types, make them distinct, and turn them into a list of vertices. That involves two conceptual maps. The distinct values of what we have now, mapped to an unique integer id, then turned into a list of vertices. You'll see below that we have to specify the types for Scala to perform this calculation correctly.
```
val airportVertices: RDD[(VertexId, String)] = airportCodes.distinct().map(x => (MurmurHash.stringHash(x), x))
val defaultAirport = ("Missing")
```

Now in order to give these edges some information, we're going to count the number of flights to and from airports. Naturally there are a ton of ways to do this but I'm going to stick to the simple MapReduce style right now. What we need to do is convert them into edges from airport ID to airport ID (of the primitive type), then validate and convert them with the `Edge`

case class.
```
val flightEdges = flightsFromTo.map(x =>
((MurmurHash.stringHash(x(0).toString),MurmurHash.stringHash(x(1).toString)), 1)).reduceByKey(_+_).map(x => Edge(x._1._1, x._1._2,x._2))
```

Now we can create our graph!
```
val graph = Graph(airportVertices, flightEdges, defaultAirport)
graph.persist() // we're going to be using it a lot
```

`directed graph`

That will be important in a moment, for now though let's just answer some basic questions.
- How many airports are there?
- How many unique flights from airport A to airport B are there?

```
graph.numVertices // 305
graph.numEdges // 5366
```

```
graph.triplets.sortBy(_.attr, ascending=false).map(triplet =>
"There were " + triplet.attr.toString + " flights from " + triplet.srcAttr + " to " + triplet.dstAttr + ".").take(10)
```

`res60: Array[String] = Array(There were 13788 flights from SFO to LAX., There were 13390 flights from LAX to SFO., There were 12383 flights from OGG to HNL., There were 12035 flights from LGA to BOS., There were 12029 flights from BOS to LGA., There were 12014 flights from HNL to OGG., There were 11773 flights from LAX to LAS., There were 11729 flights from LAS to LAX., There were 11257 flights from LAX to SAN., There were 11224 flights from SAN to LAX.)`

```
graph.triplets.sortBy(_.attr).map(triplet =>
"There were " + triplet.attr.toString + " flights from " + triplet.srcAttr + " to " + triplet.dstAttr + ".").take(10)
```

`res62: Array[String] = Array(There were 1 flights from RNO to PIH., There were 1 flights from PHL to ICT., There were 1 flights from FSD to PIA., There were 1 flights from RIC to JAX., There were 1 flights from MOD to BFL., There were 1 flights from ASE to MSN., There were 1 flights from JFK to HPN., There were 1 flights from MCO to LIT., There were 1 flights from ROA to BWI., There were 1 flights from OMA to ABQ.)`

So awesome. Graphs are great because they are aligning more along the semantics of what we're trying to do here - explaining the relationships between airports. We can see in this last example that there are a lot of airports that just get these weird one off flights - these would probably be interesting to explore later on - why are there so few flights? What did they exist at all? Was there an in-flight problem?
Now let's think about some other statistics. Because this is a directed graph we can get data on which airports are flown to the most and flown out of the most. This is known as degree information.
For example, what airport has the most in degrees or unique flights into it?
`graph.inDegrees.join(airportVertices).sortBy(_._2._1, ascending=false).take(1)`

`Array[(org.apache.spark.graphx.VertexId, (Int, String))] = Array((2042033420,(173,ATL)))`

And out of it?
`graph.outDegrees.join(airportVertices).sortBy(_._2._1, ascending=false).take(1)`

`Array[(org.apache.spark.graphx.VertexId, (Int, String))] = Array((2042033420,(173,ATL)))`

`val ranks = graph.pageRank(0.0001).vertices`

So what are our most important airports!?
```
val ranksAndAirports = ranks.join(airportVertices).sortBy(_._2._1, ascending=false).map(_._2._2)
ranksAndAirports.take(10)
```

`res81: Array[String] = Array(ATL, DFW, ORD, MSP, SLC, DEN, DTW, IAH, CVG, LAX)`

And there you have it! The most important airports according to PageRank for flights in 2008. We saw before that Atlanta had the most imbound and outbound links so this isn't a complete surprise, however it's interesting that even though SFO and LAX had the most flights between then, this doesn't completely determine their importance in the greater network.
## Questions or comments?

comments powered by Disqus