Skip to content
Snippets Groups Projects
MainEtchServiceTierClient.cs 14.8 KiB
Newer Older
  • Learn to ignore specific revisions
  • Vaclav Svaton's avatar
    Vaclav Svaton committed
    // This file automatically generated by:
    //   Apache Etch 1.4.0 (LOCAL-0) / csharp 1.4.0 (LOCAL-0)
    //   Mon Jan 19 15:34:12 CET 2015
    // This file is automatically created for your convenience and will not be
    // overwritten once it exists! Please edit this file as necessary to implement
    // your service logic.
    
    using System;
    
    
    using org.apache.etch.EtchServiceTier.types.EtchServiceTier;
    using System.Threading;
    using System.Security.Cryptography;
    using System.Text;
    using Renci.SshNet;
    using Renci.SshNet.Sftp;
    using System.IO;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    
    namespace org.apache.etch.EtchServiceTier {
    	///<summary>Main implementation for EtchServiceTierClient.</summary>
    	public class MainEtchServiceTierClient : EtchServiceTierHelper.EtchServiceTierClientFactory {
    
    		private static RemoteEtchServiceTierServer server;
    		private static string sessionCode;
    
    		private const string TEST_USER = "test";
    		private const string TEST_USER_PRIVATE_KEY = "<RSAKeyValue>somersakey</RSAKeyValue>";
    
    		///<summary>Main for EtchServiceTierClient</summary>
    		///<param name="args">Command Line Arguments</param>
    		public static void Main(String[] args) {
    			
                string uri = "tcp://localhost:4001?TlsConnection.authReqd=false&Packetizer.maxPktSize=0";
    
    			Connect(uri);
                AuthenticateUser();
    
    			bool run = true;
    			while ( run ) {
    				String s = Console.ReadLine();
    
    				if ( s == null )
    					break;
    
    				if ( s.Length == 0 )
    					continue;
    
    				char[] seps = { ' ', '\t', '\r', '\n' };
    				String[] cmds = s.Split(seps);
    
    				if ( cmds.Length == 0 )
    					continue;
    				else {
    					try {
    						switch ( cmds[0] ) {
    							case "conn":
    								Connect(uri);
    								break;
    							case "clusters":
    								ListAvailableClusters();
    								break;
    							case "auth":
    								AuthenticateUser();
    								break;
    							case "clusterinfo":
    								AuthenticateUser();
    								GetClusterInfo();
    								break;
    							case "templates":
    								AuthenticateUser();
    								ListJobTemplates();
    								break;
    							case "createjob":
    								AuthenticateUser();
    								CreateJob(Convert.ToInt32(cmds[1]));
    								break;
    							case "submitjob":
    								AuthenticateUser();
    								SubmitJob(Convert.ToInt32(cmds[1]));
    								break;
    							case "canceljob":
    								AuthenticateUser();
    								CancelJob(Convert.ToInt32(cmds[1]));
    								break;
    							case "jobinfo":
    								AuthenticateUser();
    								GetJobInfo(Convert.ToInt32(cmds[1]));
    								break;
    							case "getfilepart":
    								AuthenticateUser();
    								DownloadFileParts(Convert.ToInt32(cmds[1]));
    								break;
                                case "listusergroups":
                                    AuthenticateUser();
                                    ListAdaptorUserGroups();
                                    break;
                                case "getuserreport":
                                    AuthenticateUser();
                                    GetUserResourceUsageReport(Convert.ToInt32(cmds[1]));
                                    break;
                                case "getgroupreport":
                                    AuthenticateUser();
                                    GetUserGroupResourceUsageReport(Convert.ToInt32(cmds[1]));
                                    break;
    							case "quit":
    								run = false;
    								break;
    							default:
    								Console.Write("\n>");
    								continue;
    						}
    					}
    					catch ( Exception e ) {
    						Console.WriteLine("\nERROR: {0}\n\n{1}", e.Message, (e.InnerException == null) ? "---" : e.InnerException.Source);
    					}
    				}
    
    				Console.Write("\n>");
    			}
    
    			// Disconnect from the service
    			server._StopAndWaitDown(4000);
    			Console.WriteLine("Disconnected ...");
    			Console.ReadKey();
    		}
    
    		///<summary>Return a new instance of EtchServiceTierClient.</summary>
    		///<param name="server">Reference to remote service</param>
    		///<returns>Client Implementation</returns>
    		public EtchServiceTierClient NewEtchServiceTierClient(RemoteEtchServiceTierServer server) {
    			return new ImplEtchServiceTierClient(server);
    		}
    
    		private static void Connect(string uri) {
    			if ( server != null ) {
    				Console.Write("\nReconnecting...");
    				server._StartAndWaitUp(4000);
    				Console.WriteLine("OK");
    			}
    			else {
    				server = EtchServiceTierHelper.NewServer(uri, null, new MainEtchServiceTierClient());
    				// Connect to the service
    				server._StartAndWaitUp(4000);
                    Console.Write("Connected ({0})\n Available commands:\n\t conn\t clusters\t templates\t clusterinfo\n\t auth\t createjob (template id)\n\t submitjob (id)\t jobinfo (id)\t canceljob (id)\t listusergroups\t getuserreport (userId)\t getgroupreport (groupId)\t quit\n>", uri);
    			}
    		}
    
    		private static DigitalSignatureCredentialsExt SignCredentials(DigitalSignatureCredentialsExt creds, string privateKey) {
    			using ( RSACryptoServiceProvider rsa = new RSACryptoServiceProvider(2048) ) {
    				rsa.PersistKeyInCsp = false;
    				rsa.FromXmlString(privateKey);
    
    
    				int val = (Encoding.UTF8.GetBytes(creds.GetNoise())[0]) % creds.GetNoise().Length;
    				StringBuilder sb = new StringBuilder(creds.GetNoise());
    				sb.Insert(val, creds.GetUsername());
    
    				byte[] hash;
    				using ( SHA256 hashAlg = SHA256.Create() ) {
    					hash = hashAlg.ComputeHash(Encoding.UTF8.GetBytes(sb.ToString()));
    				}
    
    				var rsaFormatter = new RSAPKCS1SignatureFormatter(rsa);
    				rsaFormatter.SetHashAlgorithm("SHA256");
    
    				sbyte[] signatureBytes = Array.ConvertAll(rsaFormatter.CreateSignature(hash), b => unchecked((sbyte) b)); // TODO: Verify
    				creds.SetDigitalSignature(signatureBytes);
    			}
    			return creds;
    		}
    
    		private static string GetRandomString() {
    			var random = new byte[16];
    			var rng = new RNGCryptoServiceProvider();
    			rng.GetNonZeroBytes(random);
    			return Convert.ToBase64String(random);
    		}
    
    		private static void ListAvailableClusters() {
    			Console.WriteLine("ListAvailableClusters ...");
    			ClusterInfoExt[] list = server.ListAvailableClusters();
    			foreach ( ClusterInfoExt ci in list ) {
    				Console.WriteLine(ci.GetName());
    			}
    		}
    
    		private static void AuthenticateUser() {
    			Console.WriteLine("Authenticating user [{0}]...", TEST_USER);
    			DigitalSignatureCredentialsExt credentials = new DigitalSignatureCredentialsExt();
    			credentials.SetUsername(TEST_USER);
    			credentials.SetNoise(GetRandomString());
    			// Sign credentials with known private key
    			credentials = SignCredentials(credentials, TEST_USER_PRIVATE_KEY);
    			sessionCode = server.AuthenticateUserCertificate(credentials);
    			Console.WriteLine("\tAuth OK (Session GUID: {0})", sessionCode);
    		}
    
    		private static void GetClusterInfo() {
    			// Get current cluster node usage
    			Console.WriteLine("Current node usage (EXP)");
    			ClusterNodeUsageExt usage = server.GetCurrentClusterNodeUsage(6, sessionCode);
    			Console.WriteLine("\tNode type: {0}\tNodes used: {1}\t\n", usage.GetNodeType().GetName(), usage.GetNodesUsed());
    
    			// GetCurrentUsageAndLimitationsForCurrentUser
    			Console.WriteLine("Current usage and limitations for user {0}", TEST_USER);
    			ResourceUsageExt[] userLimits = server.GetCurrentUsageAndLimitationsForCurrentUser(sessionCode);
    			foreach ( var limit in userLimits ) {
    				Console.WriteLine("\t Node type: {0}\tCores used: {1}\tCores max: {2}\tMax cores per job: {3}", limit.GetNodeType().GetName(), limit.GetCoresUsed(), limit.GetLimitation().GetTotalMaxCores(), limit.GetLimitation().GetMaxCoresPerJob());
    			}
    
    		}
    
    		private static void ListJobTemplates() {
    			ClusterInfoExt[] list = server.ListAvailableClusters();
    			foreach ( var cluster in list ) {
    				Console.WriteLine("Available job templates for {0}", cluster.GetName());
    				foreach ( var nodeType in cluster.GetNodeTypes() ) {
    					if ( nodeType.GetPossibleCommands().Length < 1 )
    						continue;
    					Console.Write("\n\tNode type: {0}\n\tID\tName\t\t\n\n", nodeType.GetName());
    					foreach ( var command in nodeType.GetPossibleCommands() ) {
    						Console.WriteLine("\t{0}\t{1}\t\t", command.GetId(), command.GetName(), command.GetCode());
    					}
    				}
    			}
    		}
    
    		private static void CreateJob(long templateId) {
    			long createdJobId = 0;
    
    			switch ( templateId ) {
                    case 0:
    					createdJobId = CreateTestJob(templateId);
    					break;
                    case 1:
                        createdJobId = CreateTestJob2(templateId);
                        break;
    				default:
    					Console.WriteLine("Unknown template ID {0}, missing implementation in client.", templateId);
    					break;
    			}
    			Console.WriteLine("Created job ID {0}.", createdJobId);
    		}
    
            private static long CreateTestJob(long templateId)
            {
                TaskSpecificationExt testTask = new TaskSpecificationExt(
                        "TestOutRedirect", 1, 1, 200, null, false, false, null, "EtchConsole_OUT", "EtchConsole_ERR", "EtchConsole_PROG", "EtchConsole_LOG", null,
                        templateId, new EnvironmentVariableExt[0], null, null);
    
                JobSpecificationExt jobSpec = new JobSpecificationExt("TestOutRedirect", 1, 1, JobPriorityExt.Average, "ExpTests", null, 200,
                        "some.name@mail.cz", "999111000", false, false, false, 7, new EnvironmentVariableExt[0],
                        new TaskSpecificationExt[] { testTask });
    
                SubmittedJobInfoExt submittedTestJob = server.CreateJob(jobSpec, sessionCode);
    
                // Transfer files
                Console.WriteLine("Transfering files ...");
                FileTransferMethodExt ft = server.GetFileTransferMethod(submittedTestJob.GetId(), sessionCode);
                using (MemoryStream pKeyStream = new MemoryStream(Encoding.UTF8.GetBytes(ft.credentials.GetPrivateKey())))
                {
                    Console.WriteLine("ServerHostname: {0} UserName: {1}", ft.GetServerHostname(), ft.GetCredentials().GetUsername());
                    using ( ScpClient scpClient = new ScpClient(ft.GetServerHostname(), ft.GetCredentials().GetUsername(), new PrivateKeyFile(pKeyStream)) ) 
                    {
                        scpClient.Connect();
                        foreach (string fileName in Directory.GetFiles(@"C:\UserDataStorage"))
                        {
                            Console.WriteLine("Uploading file: " + fileName);
                            FileInfo file = new FileInfo(fileName);
                            Console.WriteLine("Path: " + ft.GetSharedBasepath() + "/" + file.Name);
                            scpClient.Upload(file, ft.GetSharedBasepath() + "/" + file.Name);
                            Console.WriteLine("File uploaded.");
                        }
                    }
                }
                server.EndFileTransfer(submittedTestJob.GetId(), ft, sessionCode);
                
                return submittedTestJob.GetId().GetValueOrDefault();
            }
    
            private static long CreateTestJob2(long templateId)
            {
                TaskSpecificationExt testTask = new TaskSpecificationExt(
                        "TestOutRedirect", 1, 1, 200, null, false, false, null, "EtchConsole_OUT", "EtchConsole_ERR", "EtchConsole_PROG", "EtchConsole_LOG", null,
                        templateId, new EnvironmentVariableExt[0], null,
                        new CommandTemplateParameterValueExt[] {
    					new CommandTemplateParameterValueExt() {commandParameterIdentifier = "InputData", parameterValue="UserFolder"}
    				});
    
                JobSpecificationExt jobSpec = new JobSpecificationExt("TestOutRedirect", 1, 1, JobPriorityExt.Average, "ExpTests", null, 200,
                        "some.name@mail.cz", "999111000", false, false, false, 7, new EnvironmentVariableExt[0],
                        new TaskSpecificationExt[] { testTask });
    
                SubmittedJobInfoExt submittedTestJob = server.CreateJob(jobSpec, sessionCode);
    
                return submittedTestJob.GetId().GetValueOrDefault();
            }
    
    		private static void SubmitJob(long jobId) {
    			SubmittedJobInfoExt submittedTestJob = server.SubmitJob(jobId, sessionCode);
    			Console.WriteLine("Submitted job id: {0}", jobId);
    		}
    
    		private static void GetJobInfo(long jobId) {
    			// GetCurrentInfoForJob
    			SubmittedJobInfoExt jobInfo = server.GetCurrentInfoForJob(jobId, sessionCode);
    			Console.WriteLine("Current info for submitted job {0}", jobId);
    			Console.WriteLine("\tJob Id: {0}\tName: {1}\tState: {2}\tParams: {3}", jobInfo.GetId(), jobInfo.GetName(), jobInfo.GetState(), jobInfo.GetAllParameters());
    		}
    
    		private static void CancelJob(long jobId) {
    			SubmittedJobInfoExt submittedTestJob = server.CancelJob(jobId, sessionCode);
    			Console.WriteLine("Cancelled job id: {0}", jobId);
    		}
    
    		public static void DownloadFileParts(long jobId) {
    			SubmittedJobInfoExt jobInfo = server.GetCurrentInfoForJob(jobId, sessionCode);
    			List<TaskFileOffsetExt> offsets = new List<TaskFileOffsetExt>();
    			foreach ( SubmittedTaskInfoExt taskInfo in jobInfo.GetTasks() ) {
    				offsets.Add(new TaskFileOffsetExt(taskInfo.GetId(), SynchronizableFilesExt.LogFile, 0));
    				offsets.Add(new TaskFileOffsetExt(taskInfo.GetId(), SynchronizableFilesExt.ProgressFile, 0));
    				offsets.Add(new TaskFileOffsetExt(taskInfo.GetId(), SynchronizableFilesExt.StandardErrorFile, 0));
    				offsets.Add(new TaskFileOffsetExt(taskInfo.GetId(), SynchronizableFilesExt.StandardOutputFile, 0));
    			}
    			JobFileContentExt[] result = server.DownloadPartsOfJobFilesFromCluster(jobId, offsets.ToArray(), sessionCode);
    			foreach ( JobFileContentExt content in result ) {
    				Console.WriteLine("File: " + content.GetFileType() + ", " + content.GetRelativePath());
    				Console.WriteLine("TaskInfoId: " + content.GetSubmittedTaskInfoId());
    				Console.WriteLine("Offset: " + content.GetOffset());
    				Console.WriteLine("Content: " + content.GetContent());
    			}
    		}
    
            public static void ListAdaptorUserGroups()
            {
                Console.WriteLine("ListAdaptorUserGroups ...");
                AdaptorUserGroupExt[] groups = server.ListAdaptorUserGroups(sessionCode);
                foreach (AdaptorUserGroupExt group in groups)
                {
                    foreach (AdaptorUserExt user in group.users)
                        Console.WriteLine("groupId = {0}, groupName = {1}, userId = {2}, userName = {3}", group.id, group.name, user.id, user.username);
                }
            }
    
            private static void GetUserResourceUsageReport(long userId)
            {
                UserResourceUsageReportExt report = server.GetUserResourceUsageReport(userId, new DateTime(2017, 1, 1), DateTime.Now, sessionCode);
                Console.WriteLine("userId = {0}, nodeTypeReports = {1}, totalCorehoursUsage = {2} ... details included", report.user.id, report.nodeTypeReports.Length, report.totalCorehoursUsage);
            }
    
            private static void GetUserGroupResourceUsageReport(long groupId)
            {
                UserGroupResourceUsageReportExt report = server.GetUserGroupResourceUsageReport(groupId, new DateTime(2017, 1, 1), DateTime.Now, sessionCode);
                foreach (UserAggregatedUsageExt userReport in report.userReports)
                {
                    Console.WriteLine("groupId = {0}, userId = {1}, nodeTypeReports = {2}, totalCorehoursUsage = {3} ... details included", groupId, userReport.user.id,
                        userReport.nodeTypeReports.Length, userReport.totalCorehoursUsage);
                }
            }
    	}
    }