Seven Languages In Seven Weeks: Scala - Day 3
I just finished day 3 of Scala from Seven Languages in Seven Weeks. Today was a tough day for me because it focused mostly on the concept on concurrency. ColdFusion, my language of choice, has threading in the form of CFThread; but, I am getting the feeling that "threading" and "concurrency" are two very different beasts. Threading seems to center around the idea of asynchronous "code" where as concurrency seems to center around the idea of asynchronous "messaging." Of course, this is my first tastes of concurrency, so please forgive me if that comparison is completely off-base.
NOTE: I am very confused by concurrency. Please take everything after this note with a grain of salt. I might be downright misleading at times (for all I know)!
HW1: Take the sizer application and add a message to count the number of links on the page.
// Take the sizer application and add a message to count the
// number of links on the page.
import scala.io._
import scala.actors._
import scala.actors.Actor._
// I am singleton object that contains access methods for URLs.
object PageLoader {
// I get the content at the given URL.
def getContent( url: String ) = Source.fromURL( url ).mkString
// I get teh size of the given URL (the number of characters).
def getPageSize( url: String ) = getContent( url ).length
// I get the number of anchor tags on the given page.
def getLinkCount( url: String ) = {
// Get the content of the given URL.
val content = getContent( url )
// Get the count of link tags within the content. We can do
// this by finding all the pattern matches for the anchor
// tag and then returning the number of matches found.
//
// NOTE: This regular expression pattern isn't perfect; but,
// it will do for this demo.
"(?i)<a[^>]+?href".r.findAllIn( content ).size
}
}
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// I am the list of URLs that we will be experimenting with.
val urls = List(
"http://www.amazon.com",
"http://www.twitter.com",
"http://www.google.com",
"http://www.cnn.com"
)
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// I get the page data (size and link count) in a sequential order.
// That is, I can't get the data from one URL until the previous
// URL has finished being calculated.
def getPageDataSequentially() = {
// Loop over each url.
for (url <- urls){
// Print the size of the URL.
println(
"Size for " + url + ": " +
PageLoader.getPageSize( url )
)
// Print the number of links at the given url.
println(
"Link count for " + url + ": " +
PageLoader.getLinkCount( url )
)
}
}
// I get the page data (size and link count) in a concurrent order.
// Not only can I get each url size concurrently, I can get the
// link count concurrently also.
def getPageDataConcurrently() = {
// Get a reference to the current Actor (I think that each
// program runs in its own Actor or Threat). The "self" reference
// is a reference to the current actor.
val caller = self
// Loop over each url to get its size and link count.
for (url <- urls){
// NOTE: The actor() method is a factor method of the current
// thread that creates an actor that executes the given
// closure.... I think.
// Create an actor that gets the page size. Pass the actor a
// tuple in which the first item, "Size", will help uniquely
// identify this actor's response.
actor {
caller ! ("Size", url, PageLoader.getPageSize( url ))
}
// Create an actor that gets the link count. Pass the actor
// a tuple in which the first item, "LinkCount", will help
// uniquely identify this actor's response.
actor {
caller ! ("LinkCount", url, PageLoader.getLinkCount( url ))
}
}
// Now that we have launched a bunch of threads, we need to wait
// for them to respond. To do this, we are going to use the
// receive method. This is a *blocking* method. That is, it is
// going to halt processing of the primary thread until one of
// the actors returns.
//
// NOTE: Since we created two asynch threads per URL, we need
// to execute N*2 receive() methods.
1.to( urls.size * 2 ).foreach{ i =>
// The receive() method taks a partial function. Both sets
// of actors will trigger this handler; the partial function
// will allow us to check the type of message to determine
// how to react. The "Size" and "LinkCount" parameters will
// help us match the unique patterns of the two different
// actors and the tuples they respond with.
receive{
// Handle "size" thread-joins.
case ("Size", url, size) => {
println( "Size for " + url + ": " + size )
}
// Handle "count" thread-joins.
case ("LinkCount", url, count) => {
println( "Link count for " + url + ": " + count )
}
}
}
}
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// Execute sequential access.
println( "Sequential run:" )
getPageDataSequentially()
println( "---------------" )
// Execute concurrent access.
println( "Concurrent run:" )
getPageDataConcurrently()
In this solution, every request for page size and link count triggers an HTTP request. This is not very efficient - I could have performed one HTTP request per link; but for the sake of concurrent exploration, I figured that I would leave the calls separate, allowing each one to happen asynchronously.
I am quite lost on the topic of concurrency, so I don't want to talk too much about what this code is actually doing. From what I can gather, though, each request for page size and link count is executed inside of an actor that runs the request asynchronously. Then, for each actor that I generated, I have to execute a receive() block to wait for that asynchronous request to come back.
Receive() is a blocking action; that is, it will prevent any further execution of the current thread until an asynchronous thread (matching one of the CASE patterns) returns a response. While I don't understand how the case matching actually works, I am looking at receive() much like a CFThread/Join action.
When we run the above code, we get the following console output:
Sequential run:
Size for http://www.amazon.com: 94505
Link count for http://www.amazon.com: 188
Size for http://www.twitter.com: 42906
Link count for http://www.twitter.com: 113
Size for http://www.google.com: 8734
Link count for http://www.google.com: 17
Size for http://www.cnn.com: 101558
Link count for http://www.cnn.com: 358
---------------
Concurrent run:
Link count for http://www.google.com: 17
Size for http://www.google.com: 8734
Link count for http://www.cnn.com: 358
Size for http://www.cnn.com: 101558
Link count for http://www.amazon.com: 187
Size for http://www.amazon.com: 94656
Link count for http://www.twitter.com: 113
Size for http://www.twitter.com: 44170
You'll notice that the concurrent values come back in a different order than the sequential values. This is because there is no guarantee as to the order in which the threads in a threadpool will finish executing. What you can't see in the output, however, is that the concurrent requests finished in about half of the time that it took the sequential requests finished.
HW2: Bonus Problem: Make the sizer follow the links on a given page, and load them as well. For example, a sizer for "google.com" would compute the size for Google and all of the pages it links to.
Since this one is getting a bit more complex, I dropped the link-count aspect and focused solely on the page size - that is, the sizer for this solution only worries about content length and link collection (for determining linked-page content length).
For a first pass, I tried to keep it very simple. Since the primary thread was already calling the PageLoader concurrently, I didn't want to mess with that; we're still calling the getPageSize() method concurrently, only this time, the getPageSize() method is doing a lot more work.
// Bonus Problem: Make the size follow the links on a given page,
// and load them as well. For example, a sizer for google.com
// would compute the size for Google and all of the pages it
// links to.
import scala.io._
import scala.actors._
import scala.actors.Actor._
// I am singleton object that contains access methods for URLs.
object PageLoader {
// I get the content at the given URL.
def getContent( url: String ) = {
// Try to get the content. If it fails, just return an empty
// string.
try {
Source.fromURL( url ).mkString
} catch {
case e: Exception => ""
}
}
// I get teh size of the given URL (the number of characters).
def getPageSize( url: String ) = {
// Get the page content of the target page.
var content = getContent( url )
// Start with the initial page size and then add the size of
// each linked page to the total.
getLinks( content ).foldLeft( content.size ){
// Define arguments.
(pageSize, link) =>
// Add the size of the linked page content.
(pageSize + getContent( link ).size)
}
}
// I gather the links out of the given content.
def getLinks( content: String ) = {
// Create a pattern for the link tags. This pattern is using
// the Verbose tag to allow an easier-to-read regular
// expression for matching.
var pattern = """(?xi)
< a
(
\s+
\w+
\s*=\s*
(
"[^"]*"
|
'[^']*'
)
)+
\s*
>
"""
// Gather all occurences of the pattern in the content.
pattern.r.findAllIn( content ).foldLeft( List[String]() ){
// Define arguments.
(links, linkTag) =>
// Gather the HTTP value. NOTE: Not only the links will
// have this, but for our purposes, those are the only
// ones that we are going to care about.
"http:[^'\"]+".r.findFirstIn( linkTag ) match {
// If a match was found, add it to the list.
case Some( link ) => links :+ link
// If no match was found, return existing list.
case None => links
}
}
}
}
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// I am the list of URLs that we will be experimenting with.
val urls = List(
"http://www.amazon.com",
"http://www.twitter.com",
"http://www.google.com",
"http://www.cnn.com"
)
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// I get the page size in a sequential order. That is, I can't
// get the data from one URL until the previous URL has finished
// being calculated.
def getPageSizeSequentially() = {
// Loop over each url.
for (url <- urls){
// Print the size of the URL.
println(
"Size for " + url + ": " +
PageLoader.getPageSize( url )
)
}
}
// I get the page size in a concurrent order. That is, I can gather
// page sizes for each URL in parallel.
def getPageSizeConcurrently() = {
// Get a reference to the current Actor (I think that each
// program runs in its own Actor or Threat). The "self" reference
// is a reference to the current actor.
val caller = self
// Loop over each url to get its size.
for (url <- urls){
// NOTE: The actor() method is a factor method of the current
// thread that creates an actor that executes the given
// closure.... I think.
// Create an actor that gets the page size.
actor {
caller ! (url, PageLoader.getPageSize( url ))
}
}
// Now that we have launched a bunch of threads, we need to wait
// for them to respond. To do this, we are going to use the
// receive method. This is a *blocking* method. That is, it is
// going to halt processing of the primary thread until one of
// the actors returns.
1.to( urls.size ).foreach{ i =>
// The receive() method taks a partial function.
receive{
case (url, size) => {
println( "Size for " + url + ": " + size )
}
}
}
}
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// Execute sequential access.
println( "Sequential run:" )
getPageSizeSequentially()
println( "---------------" )
// Execute concurrent access.
println( "Concurrent run:" )
getPageSizeConcurrently()
As you can see, whenever we call the getPageSize() method, not only does the PageLoader get the content of the given page, it also follows all of the links that begin with "http" (for ease of demo). The link-following is only one level deep; but, it adds the size of every linked page to the size of the target page.
When we run the above code, we get the following output:
Sequential run:
Size for http://www.amazon.com: 1555729
Size for http://www.twitter.com: 675137
Size for http://www.google.com: 8734
Size for http://www.cnn.com: 8532107
---------------
Concurrent run:
Size for http://www.google.com: 8734
Size for http://www.amazon.com: 1537047
Size for http://www.twitter.com: 674575
Size for http://www.cnn.com: 8639817
In the above code, we're calling the top four requests concurrently; the problem with this is that the top-four requests only represent the smallest amount of work. For each of the top four requests, we are actually executing dozens of HTTP requests to load all of the linked content. As such, I wanted to try coming at this problem in a slightly different way - rather than make the top requests concurrently, I'm going to make the linked-http requests concurrently.
This is where I really start to go off the reservation. I am sure the following code is absurdly junky and misguided. And I say this not just because I have no idea what I'm doing, but also because it performs poorly. The following approach does not seem to demonstrate any increase in performance.
The idea here was that all external calls to the PageLoader would be synchronous; however, once inside the PageLoader, the PageLoader would start to make concurrent calls to itself. In our previous example, we had to create actors on the fly. This time, however, we're going to define the PageLoader as an Actor (it extends the Actor trait) and use its own "self" reference to spawn and join threads.
// Bonus Problem: Make the size follow the links on a given page,
// and load them as well. For example, a sizer for google.com
// would compute the size for Google and all of the pages it
// links to.
import scala.io._
import scala.actors._
import scala.actors.Actor._
// I am singleton object that contains access methods for URLs. This
// time, the PageLoader class is going to extend Actor so that it can
// have some concurrent behavior.
object PageLoader extends Actor {
// I get the content at the given URL.
def getContent( url: String ) = {
// Try to get the content. If it fails, just return an empty
// string.
try {
Source.fromURL( url ).mkString
} catch {
case e: Exception => ""
}
}
// I get teh size of the given URL (the number of characters).
def getPageSize( url: String ) = {
// Get the page content of the target page.
var content = getContent( url )
// Get the initial size of this page. We are going to
// aggregate the size of this page and its linked pages; but
// to start with, we want just the size of this page.
var pageSize = content.size
// Get the links on the given page.
var links = getLinks( content )
// Now, for each link, send an asynchronous message to the
// PageLoader to get the content of that link. Since there
// are a lot of links and the HTTP requests are expensive,
// we want these to happen in parallel as much as possible.
links.foreach{ link =>
// Send a concurrent message to the PageLaoder.
self ! ("linkedPageContent", link, getContent( link ))
}
// Now that we have launched a bunch of threads - one per
// link to get its content - we need to join all of those
// threads back into the current thread.
links.foldLeft( pageSize ){
// Define arguments.
(pageSize, link) =>
// Wait for the page content to come back.
self.receive{
case ("linkedPageContent", url, content:String) => {
// Add the size of the linked content to the
// running total for the master page.
(pageSize + content.size)
}
}
}
}
// I gather the links out of the given content.
def getLinks( content: String ) = {
// Create a pattern for the link tags. This pattern is using
// the Verbose tag to allow an easier-to-read regular
// expression for matching.
var pattern = """(?xi)
< a
(
\s+
\w+
\s*=\s*
(
"[^"]*"
|
'[^']*'
)
)+
\s*
>
"""
// Gather all occurences of the pattern in the content.
pattern.r.findAllIn( content ).foldLeft( List[String]() ){
// Define arguments.
(links, linkTag) =>
// Gather the HTTP value. NOTE: Not only the links will
// have this, but for our purposes, those are the only
// ones that we are going to care about.
"http:[^'\"]+".r.findFirstIn( linkTag ) match {
// If a match was found, add it to the list.
case Some( link ) => links :+ link
// If no match was found, return existing list.
case None => links
}
}
}
// This needs to be done for the actor.
def act() = {
// -- Not sure how this works :( :(
}
}
// Start the thread of the PageLoader???
PageLoader.start
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// I am the list of URLs that we will be experimenting with.
val urls = List(
"http://www.amazon.com",
"http://www.twitter.com",
"http://www.google.com",
"http://www.cnn.com"
)
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// Loop over each url to get its size. This time, we not going to
// call the page size concurrently because the getPageSize()
// method itself is going to handle concurrency internally.
for (url <- urls){
println(
"Size for " + url + ": " +
PageLoader.getPageSize( url )
)
}
As you can see, this approach is fairly similar to the previous one. The difference is that my concurrent actions take place inside the PageLoader Actor, not inside the primary thread..... I think. Clearly, I have very little idea what things like the act() method of an Actor are supposed to do. I couldn't do anything to get code to execute within the act() function.
Anyway, when we run the above code, we get the following output:
Size for http://www.amazon.com: 1535326
Size for http://www.twitter.com: 675428
Size for http://www.google.com: 8734
Size for http://www.cnn.com: 8624444
I think that CFThread in ColdFusion is awesome. It's awesome because it yields a lot of power with relatively little effort. As I was working with concurrency in Scala, I kept hoping it would be as simple; but, it doesn't appear to be. I am sure much of that is my ignorance of the language; however, I think a good part of it is the fact that, as Marc Esher eluded to, they are very different concepts:
And I don't mean to diss CFThread. It's just that they solve vastly different problems, particularly of scale / distribution.
As I said before, it seems like CFThread deals more in terms of "code" where as concurrency deals more in terms of "messaging." In any case, the topic is definitely far to complex to be covered thoroughly within a 3-day overview of the language.
Scala was fun - up next: Erlang!
Want to use code from this post? Check out the license.
Reader Comments
Nice to see fewer semicolons :) Must be killing you, eh?
You could omit a lot of the dots and some of the parentheses too to make it more readable / idiomatic:
Source fromURL url mkString
1 to url.size foreach { _ => ... }
(You can use _ for the variable name when you're ignoring it as in your loops)
On the subject of actors, you'll find Erlang is very similar (and Groovy is adding a similar feature thru the GPars work). Imagine each actor is an independent person doing whatever work they've been assigned. They communicate by sending messages to each other (think: email or instant message, for example). When an actor is receiving, they're just a sittin' there waiting for someone to send them an IM.
In the main Scala app I have running at World Singles, it has a series of actors that i) look for changes in a DB table and send a message to ii) that reads info from the DB and converts it to XML and posts it to search engine and then tells iii) the batch of records that got posted so they can be updated as 'processed'. There's only one of each i) and iii) actors because their processes are pretty quick but there are several of the ii) actors because the XML transform and post operation is slow.
So the team of actors has a point person watching for changes who distributes work to the transformers who then tell the auditor they're done.
Hope that helps make sense of it?