Skip to main content
Ben Nadel at cf.Objective() 2017 (Washington, D.C.) with: Valerie Poreaux
Ben Nadel at cf.Objective() 2017 (Washington, D.C.) with: Valerie Poreaux

Clustering Plupload Instances For Parallel File Uploads

By
Published in ,

If you follow this blog, you know that I'm a super-fan of Plupload for client-side file uploads. But, one thing that I've always wanted to experiment with is parallel uploads. By default, Plupload uploads one file at a time. But what if we clustered several instances of Plupload? Then we could distribute files to each instance and have them work in parallel. This post is a proof-of-concept of what that might look like.

View this project on my GitHub account.

For this experiment, I am using N+1 instances of Plupload. One of the instances acts as the "master"; the rest act as the worker instances that are performing parallel uploads. The master instance is the only instance that is visible to the user - it's the instance that renders the file-selection interface; but, the master doesn't actually perform any uploads - it just takes the files and distributes them to the workers.

When you instantiate a Plupload instance, you have to give it a DOM (Document Object Model) element ID to bind to. But, our worker instances are not intended to be triggered directly by the user, only by the master instance. As such, I have chosen to render the worker instances off-screen. To do this, my PluploadCluster() class does have to manipulate the DOM. This is poor form; but, it's a proof-of-concept.

The calling context instantiates the PluploadCluster() class and then binds to events. Most of these events are distributed to the worker instances. As the events are triggered, the calling context is passed the instance of the Plupload uploader in question, not the cluster. In this way, the calling context can perform per-file setting augmentation the way that we've been doing in the past (which is super powerful).

When you instantiate the cluster, you tell it how many instances you want and provide the settings that you would normally for any of the Plupload instances. Keep in mind that you are still limited by the concurrent-HTTP-request settings of the browser. So, if you tell it to create 15 instances, you'll still only see (something like) 5-6 files being uploaded in parallel. Bandwidth is also a consideration.

Anyway, here's the calling code - the code that creates the Plupload cluster:

app.directive(
	"bnImageUploader",
	function( $window, $rootScope, PluploadCluster, naturalSort ) {

		// I bind the JavaScript events to the scope.
		function link( $scope, element, attributes ) {

			// The uploader has to refernece the various elements using IDs. Rather than
			// crudding up the HTML, just insert the values dynamically here.
			element
				.attr( "id", "primaryUploaderContainer" )
				.find( "div.dropzone" )
					.attr( "id", "primaryUploaderDropzone" )
			;


			// Instantiate the Plupload cluster. The cluster works by creating a master
			// runtime and N-uploader instance runtimes. The Master never actually
			// uploads - it just creates the UI that the user interacts with; it then
			// distributes the files across the cluster. As it does this, events are
			// triggered on each uploader, where you can interact with each uploader the
			// way that you would have normally with a single-uploader approach.
			var cluster = new PluploadCluster(
				// Number of parellel instances. This is going to be limited by the
				// number of concurrent HTTP requests that the browser can make.
				5,
				// Individual uploader settings.
				{
					// For this demo, we're only going to use the html5 runtime. I
					// don't want to have to deal with people who require flash - not
					// this time, I'm tired of it; plus, much of the point of this demo
					// is to work with the drag-n-drop, which isn't available in Flash.
					runtimes: "html5",

					// Upload the image to the API.
					url: "api/index.cfm?action=upload",

					// Set the name of file field (that contains the upload).
					file_data_name: "file",

					// The container, into which to inject the Input shim.
					container: "primaryUploaderContainer",

					// The ID of the drop-zone element.
					drop_element: "primaryUploaderDropzone",

					// To enable click-to-select-files, you can provide a browse button.
					// We can use the same one as the drop zone.
					browse_button: "primaryUploaderDropzone"
				}
			);


			// Initialize the plupload runtime.
			cluster.bind( "Error", handleError );
			cluster.bind( "PostInit", handleInit );
			cluster.bind( "FilesSelected", handleFilesSelected );
			cluster.bind( "QueueChanged", handleQueueChanged );
			cluster.bind( "BeforeUpload", handleBeforeUpload );
			cluster.bind( "UploadProgress", handleUploadProgress );
			cluster.bind( "FileUploaded", handleFileUploaded );
			cluster.bind( "StateChanged", handleStateChanged );
			cluster.init();

			// I provide access to the aggregate file list, across the cluster, for use
			// inside of the directive. This can be used to render the items being
			// uploaded.
			$scope.queue = cluster.queue;

			// Wrap the window instance so we can get easy event binding.
			var win = $( $window );

			// When the window is resized, we'll have to update the dimensions of the
			// input shim.
			win.on( "resize", handleWindowResize );

			// When the scope is destroyed, clean up bindings.
			$scope.$on(
				"$destroy",
				function() {

					win.off( "resize", handleWindowResize );

					cluster.destroy();

				}
			);


			// ---
			// PRIVATE METHODS.
			// ---


			// I handle the before upload event where the meta data can be edited right
			// before the upload of a specific file, allowing for per-file settings.
			function handleBeforeUpload( uploader, file ) {

				var params = uploader.settings.multipart_params;
				var source = file.getSource();

				// Delete any previous reference to sort.
				delete( params.sort );

				// If the dropped/selected file has a sort option, then send it through.
				if ( "sort" in source ) {

					params.sort = source.sort;

				}

			}


			// I handle errors that occur during intialization or general operation of
			// the Plupload instance.
			function handleError( uploader, error ) {

				console.warn( "Plupload error" );
				console.error( error );

			}


			// I handle the files-selected event. This is when the files have been
			// selected for the cluster, but have not yet been added to any of the
			// uploaders in the cluster. At this point, we have the ability to alter the
			// collection of files before they are distributed.
			function handleFilesSelected( master, files ) {

				naturalSort( files, "name" );

				// For this demo, we want to make sure that file properties added in the
				// FileSelected event can be accessed later on in the BeforeUpload
				// event; this will be after the master has distributed the file to each
				// uploader in the cluster.
				for ( var i = 0 ; i < files.length ; i++ ) {

					files[ i ].sort = i;

				}

				// After the files have been selected, they will be distributed and the
				// cluter queue will be updated. Trigger a digest asynchronously so we
				// can render the queue.
				$scope.$evalAsync();

			}


			// I handle the file-uploaded event. At this point, the image has been
			// uploaded and thumbnailed - we can now load that image in our uploads list.
			function handleFileUploaded( uploader, file, response ) {

				$scope.$apply(
					function() {

						// Broudcast the response from the server.
						$rootScope.$broadcast(
							"imageUploaded",
							angular.fromJson( response.response )
						);

						// Remove the file from the uploader queue.
						uploader.removeFile( file );

					}
				);

			}


			// I handle the init event. At this point, we will know which runtime has
			// loaded, and whether or not drag-drop functionality is supported. This
			// event only gets bound to the Master since it seems that anything that
			// fails / succeeds for the master will do the same for the enture cluster.
			function handleInit( master, params ) {

				console.log( "Initialization complete." );
				console.log( "Drag-drop supported:", !! master.features.dragdrop );

			}


			// I handle the queue changed event - this is the queue of the given uploader,
			// NOT on the cluster. However, when this changes, the master queue will be
			// changed already. When the queue changes, it gives us an opportunity to
			// programmatically start the upload process.
			function handleQueueChanged( uploader ) {

				if ( uploader.files.length ){

					uploader.start();

				}

				// So we can re-render the queue.
				$scope.$evalAsync();

			}


			// I handle the change in state of the uploader.
			function handleStateChanged( uploader ) {

				// If the cluster, as a whole, is uploading, indicate the activity.
				if ( cluster.isUploading() ) {

					element.addClass( "uploading" );

				} else {

					element.removeClass( "uploading" );

				}

			}


			// I get called when upload progress is made on the given file.
			// --
			// CAUTION: This may get called one more time after the file has actually
			// been fully uploaded AND the uploaded event has already been called.
			function handleUploadProgress( uploader, file ) {

				$scope.$digest();

			}


			// I handle the resizing of the browser window, which causes a resizing of
			// the input-shim used by the master uploader.
			function handleWindowResize( event ) {

				cluster.refresh();

			}

		}


		// Return the directive configuration. We need to create a scope for this
		// directive so that it can expose the file queue without altering the parent
		// scope.
		return({
			link: link,
			restrict: "A",
			scope: true
		});

	}
);

As you can see, there are still a number of events being bound to the individual Plupload instances. The intent of the cluster is not to completely hide the Plupload implementation; rather, it's to facilitate communication between several instances. It's important that we still have insight into each instance since we want to be able to set per-file POST parameters. If the cluster abstracted this away from us, it would be much harder to achieve that granular control.

And, here's the Plupload cluster:

app.factory(
	"PluploadCluster",
	function( plupload, mOxie ) {

		// I contain the auto-incrementer for each uploader instance.
		var clusterUploaderInstanceID = 0;

		// I contain the DOM element into which each uploader instance will be injected.
		var clusterContainer = getClusterContainer();


		// In order to be instantiated, each plupload instance needs to reference an
		// actual element. As such, we need to create a throw-away, hidden element for
		// each uploader.
		function buildUploaderElement() {

			var id = ( "pluploadClusterUploaderInstance-" + ++clusterUploaderInstanceID );

			var element = angular.element( "<div></div>" )
				.attr( "id", id )
				.addClass( "pluploadClusterUploaderInstance" )
				.css({
					height: "1px",
					left: "0px",
					position: "absolute",
					top: "0px",
					width: "1px"
				})
				.appendTo( clusterContainer )
			;

			return( id )

		}


		// I remove the throw-away uploader element with the given ID.
		function destroyUploaderElement( id ) {

			clusterContainer.find( "#" + id )
				.remove()
			;

		}


		// The Plupload instance need to be on the page in some sort of DOM. This is
		// really a dirty move, touching the DOM inside this service; but, it is a
		// service that interactions with the
		function getClusterContainer() {

			var container = angular.element( "<div></div>" )
				.addClass( "pluploadClusterContainer" )
				.css({
					height: "1px",
					left: "-100px",
					overflow: "hidden",
					position: "fixed",
					top: "-100px",
					width: "1px"
				})
				.appendTo( "head" )
			;

			return( container );

		}


		// -------------------------------------------------- //
		// -------------------------------------------------- //


		// I cluster a number of Plupload instances so that files can be uploaded in
		// parallel. There is a master instance that acts as the user interface for
		// the uploader; but, this does nothing but hand off files to the various
		// instances in the cluster.
		function PluploadCluster( clusterSize, settings ) {

			// As files are added to the cluster, they will be distributed to the
			// clustered uplaoders using a simplistic round-robin approach. For now,
			// we're not going to worry about whether or not the given upload is
			// available - perhaps in another demo.
			var roundRobinIndex = 0;

			// The master uploader is the point-of-contact with the user. When the user
			// selects files or drops files, the will be dropped into the master uploader.
			// The master uploader will then distribute the selected files to the cluster
			// of upload instances.
			var master = new plupload.Uploader( settings );

			// When the files are added to the master, we are going to pipe them into a
			// "FilesSelected" event that can provide a pre-uploader hook for the calling
			// context.
			master.bind( "FilesAdded", handleMasterFilesAdded );

			// I hold the collection of uploaders in the cluster.
			var uploaders = [];

			// Create each instance.
			for ( var i = 0 ; i < clusterSize ; i++ ) {

				var instanceID = buildUploaderElement();

				// Ensure there are mulitpart params - makes life easier when performing
				// just-in-time updates to the settings during queue processing. Also, we
				// need to override the button element since we don't want a single click
				// on the button to trigger file-selection in each of the instances.
				var uploader = new plupload.Uploader(
					mOxie.extend(
						{},
						{
							multipart_params: {}
						},
						settings,
						{
							pluploadClusterElementID: instanceID,
							browse_button: instanceID,
							drop_element: null
						}
					)
				);

				// We need to bind to the individual uploader events in order to keep the
				// aggregate queue up to date.
				uploader.bind( "FilesAdded", handleUploaderFilesAdded );
				uploader.bind( "UploadProgress", handleUploaderUploadProgress );
				uploader.bind( "FileUploaded", handleUploaderFileUploaded );
				uploader.bind( "FilesRemoved", handleUploaderFilesRemoved );

				uploaders.push( uploader );

			}

			// I contain the aggregated list of files being uploaded.
			// --
			// NOTE: This is being made public; so, we can't overwrite the reference to
			// it - we can only splice into it.
			var queue = new PluploadClusterQueue();


			// Return the public API.
			return({
				addFile: addFile,
				bind: bind,
				destroy: destroy,
				init: init,
				isNotUploading: isNotUploading,
				isUploading: isUploading,
				queue: queue,
				refresh: refresh,
				removeFile: removeFile,
				start: start
			});


			// ---
			// PUBLIC METHODS.
			// ---


			// I add a new file to the cluster. This can be consumed by external
			// instances of FileDrop or FileInput.
			function addFile( file ) {

				// When we add it to the master, the master will take care of
				// distributing it to the next targeted uploader.
				master.addFile( files );

			}


			// I bind to events on the individual uploaders in the cluster.
			function bind( eventType, callback ) {

				// Some events will only be bound to the master of the cluster.
				if (
					( eventType === "Init" ) ||
					( eventType === "PostInit" ) ||
					( eventType === "FilesSelected" )
					) {

					return( master.bind( eventType, callback ) );

				}

				// If we made it this far, we want to bind the given event handler to
				// all uploader instances in the cluster.
				applyToUploaders( "bind", arguments );

			}


			// I destroy the cluster of uploaders.
			function destroy() {

				master.destroy();

				// As we loop over the uploader instances, we have to remove each of the
				// throw-away elements that was used to instantantiate the uploader.
				for ( var i = 0 ; i < clusterSize ; i++ ) {

					var uploader = uploaders[ i ];
					var elementID = uploader.settings.pluploadClusterElementID;

					uploader.destroy();
					destroyUploaderElement( elementID );

				}

			}


			// I initialize the cluster.
			function init() {

				// Initialize the master uploader.
				master.init();

				// Initialize each of the uploaders in the cluster.
				for ( var i = 0 ; i < clusterSize ; i++ ) {

					var uploader = uploaders[ i ];

					uploader.init();

					// This step isn't really necessary; but, since these uploaders
					// aren't actually "exposed" on the browser, we can disable the file
					// input shims.
					uploader.disableBrowse();

				}

			}


			// I determine if the cluster (or proivded uploader) is currently inactive.
			// If no uploader is provided, checks to see if ALL uploaders are currently
			// stopped.
			function isNotUploading( uploader ) {

				// If an uploader was provided, check only the given uploader.
				if ( uploader ) {

					return( uploader.state === plupload.STOPPED );

				}

				// If no uploader was provided, then the cluster is considered stopped if
				// ALL of the uploaders have stopped.
				return( ! isUploading() );

			}


			// I determine if the cluster (or proivded uploader) is currently uploading
			// a file. If no uploader is provided, checks to see if ANY uploader is
			// actively uploading a file.
			function isUploading( uploader ) {

				// If an uploader was provided, check only the given uploader.
				if ( uploader ) {

					return( uploader.state === plupload.STARTED );

				}

				// If no uploader was provided, then check to see if ANY uploaders are
				// currently uploading.
				for ( var i = 0 ; i < clusterSize ; i++ ) {

					if ( uploaders[ i ].state === plupload.STARTED ) {

						return( true );

					}

				}

				// If we made it this far, none of the uploaders are uploading.
				return( false );

			}


			// I refresh the shim used by the master uploader.
			function refresh() {

				master.refresh();

				// NOTE: Since the master is the only instance in the entire cluster that
				// the user has access to (visually), we don't have to refresh any of the
				// other worker instances.

			}


			// I remove the given file from the cluster.
			function removeFile( file ) {

				// Try to remove from each uploader - there are no negative consequences
				// from calling removeFile() if there is no matching file.
				applyToUploaders( "removeFile", arguments );

			}


			// I start the uploading process for all uploaders in the cluster.
			function start() {

				applyToUploaders( "start" );

			}


			// ---
			// PRIVATE METHODS.
			// ---


			// I invoke the given method with the given arguments on all uploaders.
			function applyToUploaders( methodName, methodArguments ) {

				for ( var i = 0 ; i < clusterSize ; i++ ) {

					var uploader = uploaders[ i ];

					uploader[ methodName ].apply( uploader, ( methodArguments || [] ) );

				}

			}


			// I handle the selection of files in the master instance. This raises the
			// "FilesSelected" event which allows the calling context to change the
			// collection before the master starts to distribute them.
			function handleMasterFilesAdded( master, files ) {

				// The files that have been passed to this event are already bound the
				// master uploader. As such, we want to recreate the collection with
				// unbound mOxie file instances.
				var selectedFiles = [];

				for ( var i = 0 ; i < files.length ; i++ ) {

					// Create a new mOxie file - it won't have a UUID since it's not
					// bound to any uploader yet.
					selectedFiles.push(
						new mOxie.File( null, files[ i ].getSource().getSource() )
					);

				}

				// Now that we've rebuilt the file collection, remove them all from the
				// master uploader.
				master.splice();

				// Announce the selected-files event. This gives the calling context the
				// chance to alter the selected files.
				master.trigger( "FilesSelected", selectedFiles );

				// Distribute the selected files to the cluster.
				for ( var i = 0 ; i < selectedFiles.length ; i++ ) {

					uploaders[ roundRobinIndex++ % clusterSize ].addFile( selectedFiles[ i ] );

				}

			}


			// When files are removed from the given uploader, I remove them from the
			// cluster queue.
			function handleUploaderFilesRemoved( uploader, files ) {

				for ( var i = 0 ; i < files.length ; i++ ) {

					queue.removeFile( files[ i ] );

				}

			}


			// When files are added to the given uploader, I add them to the cluster
			// queue.
			function handleUploaderFilesAdded( uploader, files ) {

				for ( var i = 0 ; i < files.length ; i++ ) {

					queue.addFile( files[ i ] );

				}

			}


			// When a file has been uploaded, I update the file in the cluster queue.
			function handleUploaderFileUploaded( uploader, file ) {

				queue.updateFile( file );

			}


			// When a file has made progress, I update the file in the clsuter queue.
			function handleUploaderUploadProgress( uploader, file ) {

				queue.updateFile( file );

			}

		}


		// -------------------------------------------------- //
		// -------------------------------------------------- //


		// I mainain an aggregate queue of all the files in the cluster, across the
		// individual queues of each uploader.
		function PluploadClusterQueue() {

			var queue = [];

			// Set public methods on queue.
			queue.addFile = addFile;
			queue.removeFile = removeFile;
			queue.updateFile = updateFile;

			// Return the queue reference.
			return( queue);


			// ---
			// PUBLIC METHODS.
			// ---


			// I add the given file to the queue.
			function addFile( file ) {

				var item = {
					id: file.id,
					name: file.name,
					size: file.size,
					loaded: file.loaded,
					percent: file.percent.toFixed( 0 ),
					status: file.status,
					isUploading: ( file.status === plupload.UPLOADING )
				};

				queue.push( item );

			}


			// I remove the given file from the queue.
			function removeFile( file ) {

				for ( var i = 0 ; i < queue.length ; i++ ) {

					if ( queue[ i ].id === file.id ) {

						return( queue.splice( i, 1 ) );

					}

				}

			}


			// I update the given file in the queue.
			function updateFile( file ) {

				for ( var i = 0 ; i < queue.length ; i++ ) {

					var item = queue[ i ];

					if ( item.id === file.id ) {

						item.loaded = file.loaded;
						item.percent = file.percent.toFixed( 0 );
						item.status = file.status;
						item.isUploading = ( file.status === plupload.UPLOADING );

						return;

					}

				}

			}

		}


		// -------------------------------------------------- //
		// -------------------------------------------------- //


		// Return factory value.
		return( PluploadCluster );

	}
);

There's a lot that I'd like to do to clean up this implementation before I would consider it done. Like turning the cluster into a truly evented-system. But, for a proof-of-concept, I think it really gets the idea across. It would be awesome to be able to execute parallel uploads using Plupload.

Want to use code from this post? Check out the license.

Reader Comments

I believe in love. I believe in compassion. I believe in human rights. I believe that we can afford to give more of these gifts to the world around us because it costs us nothing to be decent and kind and understanding. And, I want you to know that when you land on this site, you are accepted for who you are, no matter how you identify, what truths you live, or whatever kind of goofy shit makes you feel alive! Rock on with your bad self!
Ben Nadel