Something went wrong on our end
-
Vaclav Svaton authoredVaclav Svaton authored
DataTransferLogic.cs 13.54 KiB
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using HaaSMiddleware.BusinessLogicTier.Factory;
using HaaSMiddleware.DataAccessTier.UnitOfWork;
using HaaSMiddleware.DomainObjects.DataTransfer;
using HaaSMiddleware.DomainObjects.UserAndLimitationManagement;
using HaaSMiddleware.DomainObjects.JobManagement.JobInformation;
using HaaSMiddleware.HpcConnectionFramework;
using log4net;
using System.Threading;
using System.Net;
using System.Net.Sockets;
using System.Text;
using HaaSMiddleware.BusinessLogicTier.Logic.DataTransfer.Exceptions;
namespace HaaSMiddleware.BusinessLogicTier.Logic.DataTransfer {
internal class DataTransferLogic : IDataTransferLogic {
private static readonly ILog log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private readonly IUnitOfWork unitOfWork;
//local sockets for jobs and ips
private static Dictionary<long, Dictionary<string, Socket>> jobIpSockets = new Dictionary<long, Dictionary<string, Socket>>();
private static Dictionary<string, int> ipLocalPorts = new Dictionary<string, int>();
internal DataTransferLogic(IUnitOfWork unitOfWork)
{
this.unitOfWork = unitOfWork;
}
public DataTransferMethod GetDataTransferMethod(string ipAddress, int port, SubmittedJobInfo jobInfo, AdaptorUser loggedUser)
{
if (jobInfo.State > JobState.Running)
throw new UnableToCreateConnectionException("JobId " + jobInfo.Id + " is not a currently running job.");
log.Info("Getting data transfer method for submitted job info ID " + jobInfo.Id + " with user " + loggedUser.GetLogIdentification());
try
{
//create tunnel
int allocatedPort = CreateSshTunnel(jobInfo, "127.0.0.1", 4000, "salomon.it4i.cz", ipAddress, port);
ipLocalPorts.Add(ipAddress, allocatedPort);
//create socket
CreateLocalSocketConnection(jobInfo.Id, ipAddress, allocatedPort);
DataTransferMethod dtMethod = new DataTransferMethod();
dtMethod.submittedJobId = jobInfo.Id;
dtMethod.IpAddress = ipAddress;
dtMethod.Port = port;
return dtMethod;
}
catch(Exception e)
{
RemoveSshTunnel(jobInfo, ipAddress);
CloseLocalSocketConnection(jobInfo.Id, ipAddress);
log.Error("GetDataTransferMethod error: {0}", e);
return null;
}
}
public void EndDataTransfer(DataTransferMethod transferMethod, SubmittedJobInfo jobInfo, AdaptorUser loggedUser)
{
log.Info("Removing data transfer method for submitted job info ID " + transferMethod.submittedJobId + " with user " + loggedUser.GetLogIdentification());
if (jobIpSockets.Keys.Contains(transferMethod.submittedJobId))
{
try
{
//remove tunnel
ipLocalPorts.Remove(transferMethod.IpAddress);
RemoveSshTunnel(jobInfo, transferMethod.IpAddress);
//remove socket
CloseLocalSocketConnection(transferMethod.submittedJobId, transferMethod.IpAddress);
}
catch (Exception e)
{
log.Error("Error in removing data transfer method for submitted job info ID " + transferMethod.submittedJobId + ": {0}", e);
}
}
else log.Error("Job " + transferMethod.submittedJobId + " with IP " + transferMethod.IpAddress + " does not have an active socket.");
}
public int WriteDataToJobNode(byte[] data, long submittedJobInfoId, string ipAddress, string sessionCode, bool closeConnection)
{
SubmittedJobInfo jobInfo = unitOfWork.SubmittedJobInfoRepository.GetById(submittedJobInfoId);
int bytesSent = 0;
if (SshTunnelExist(jobInfo, ipAddress))
{
if (!jobIpSockets.Keys.Contains(submittedJobInfoId) || !jobIpSockets[submittedJobInfoId].Keys.Contains(ipAddress))
{
int allocatedPort = ipLocalPorts[ipAddress];
//create new socket
CreateLocalSocketConnection(submittedJobInfoId, ipAddress, allocatedPort);
}
//else
//{
if (data != null)
{
Socket sock = jobIpSockets[submittedJobInfoId][ipAddress];
try
{
bytesSent = sock.Send(data);
log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " sent " + bytesSent + " bytes.");
}
catch (Exception e)
{
log.Error("Socket write error: {0}", e);
}
}
else
{
log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " : nothing to send, NULL data value.");
}
if (closeConnection)
{
Socket sock = jobIpSockets[submittedJobInfoId][ipAddress];
sock.Shutdown(SocketShutdown.Send);
log.Info("Closed SEND direction in socket for submitted job info Id: " + submittedJobInfoId + " and remote IP address: " + ipAddress);
}
//}
}
else log.Error("Job " + submittedJobInfoId + " with IP remote " + ipAddress + " does not have an active connection.");
/**
if (jobIpSockets.Keys.Contains(submittedJobInfoId) && jobIpSockets[submittedJobInfoId].Keys.Contains(ipAddress))
{
Socket sock = jobIpSockets[submittedJobInfoId][ipAddress];
try
{
bytesSent = sock.Send(data);
log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " sent " + bytesSent + " bytes.");
}
catch (Exception e)
{
log.Error("Socket write error: {0}", e);
}
if (closeConnection)
{
sock.Shutdown(SocketShutdown.Send);
log.Info("Closed SEND direction in socket for submitted job info Id: " + submittedJobInfoId + " and remote IP address: " + ipAddress);
}
}
else if (SshTunnelExist(jobInfo, ipAddress))
{
int allocatedPort = ipLocalPorts[ipAddress];
//create new socket
CreateLocalSocketConnection(submittedJobInfoId, ipAddress, allocatedPort);
}
else log.Error("Job " + submittedJobInfoId + " with IP remote " + ipAddress + " does not have an active connection.");
*/
return bytesSent;
}
public byte[] ReadDataFromJobNode(long submittedJobInfoId, string ipAddress, string sessionCode)
{
if (jobIpSockets.Keys.Contains(submittedJobInfoId) && jobIpSockets[submittedJobInfoId].Keys.Contains(ipAddress))
{
Socket sock = jobIpSockets[submittedJobInfoId][ipAddress];
byte[] buffer = new byte[1048576];
byte[] retData = new byte[0];
try
{
int bytesRec = sock.Receive(buffer);
if (bytesRec > 0)
{
retData = new byte[bytesRec];
Array.Copy(buffer, retData, bytesRec);
} else {
log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " received 0 bytes - returning null and closing socket.");
CloseLocalSocketConnection(submittedJobInfoId, ipAddress);
return null;
}
log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " received " + bytesRec + " bytes.");
}
catch (Exception e)
{
log.Error("Socket read error: {0}", e);
}
return retData;
}
else
{
log.Error("Job " + submittedJobInfoId + " with remote IP " + ipAddress + " does not have an active socket.");
return null;
}
}
public List<long> GetJobIdsForOpenSockets()
{
List<long> keyList = new List<long>(jobIpSockets.Keys);
return keyList;
}
public void CloseAllConnectionsForJob(SubmittedJobInfo jobInfo)
{
foreach (string ipAddress in jobIpSockets[jobInfo.Id].Keys.ToList())
{
//close tunnel
RemoveSshTunnel(jobInfo, ipAddress);
//close socket
CloseLocalSocketConnection(jobInfo.Id, ipAddress);
}
jobIpSockets.Remove(jobInfo.Id);
log.InfoFormat("Closed all connections for jobId {0}", jobInfo.Id);
}
#region Protected methods
protected int CreateSshTunnel(SubmittedJobInfo jobInfo, string localHost, int localPort, string loginHost, string nodeHost, int nodePort)
{
while (!IsPortFree(localHost, localPort))
++localPort;
SchedulerFactory.GetInstance(jobInfo.NodeType.Cluster.SchedulerType).CreateScheduler(jobInfo.NodeType.Cluster).CreateSshTunnel(
jobInfo.Specification.Id, localHost, localPort, loginHost, nodeHost, nodePort, jobInfo.Specification.ClusterUser
);
return localPort;
}
protected void RemoveSshTunnel(SubmittedJobInfo jobInfo, string nodeHost)
{
SchedulerFactory.GetInstance(jobInfo.NodeType.Cluster.SchedulerType).CreateScheduler(jobInfo.NodeType.Cluster).RemoveSshTunnel(
jobInfo.Specification.Id, nodeHost
);
}
protected bool SshTunnelExist(SubmittedJobInfo jobInfo, string nodeHost)
{
return SchedulerFactory.GetInstance(jobInfo.NodeType.Cluster.SchedulerType).CreateScheduler(jobInfo.NodeType.Cluster).SshTunnelExist(
jobInfo.Specification.Id, nodeHost
);
}
protected bool IsPortFree(string ipAddress, int port)
{
try
{
IPAddress address = IPAddress.Parse(ipAddress);
TcpListener tcpListener = new TcpListener(address, port);
tcpListener.Start();
tcpListener.Stop();
return true;
}
catch (SocketException ex)
{
return false;
}
}
protected void CreateLocalSocketConnection(long jobId, string remoteIp, int localPort)
{
if (jobIpSockets.Keys.Contains(jobId) && jobIpSockets[jobId].Keys.Contains(remoteIp))
log.Error("Job " + jobId + " with remote IP " + remoteIp + " already has socket a connection.");
else
{
Socket socket = null;
try
{
//create socket
IPAddress address = IPAddress.Parse("127.0.0.1");
IPEndPoint remoteEP = new IPEndPoint(address, localPort);
socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
socket.NoDelay = true;
//connect to remote host
socket.Connect(remoteEP);
//add socket to dictionary
if (jobIpSockets.Keys.Contains(jobId))
jobIpSockets[jobId].Add(remoteIp, socket);
else jobIpSockets.Add(jobId, new Dictionary<string, Socket> { { remoteIp, socket } });
log.Info("New socket connection created for job " + jobId + " with remote IP " + remoteIp + " and local port " + localPort);
}
catch (Exception e)
{
log.Error("Socket creation error: {0}", e);
}
}
}
protected bool SocketConnected(Socket socket)
{
try
{
return !(socket.Poll(1, SelectMode.SelectRead) && socket.Available == 0);
}
catch (SocketException) { return false; }
}
protected void CloseLocalSocketConnection(long jobId, string remoteIp)
{
if (jobIpSockets.Keys.Contains(jobId) && jobIpSockets[jobId].Keys.Contains(remoteIp))
{
Socket sock = jobIpSockets[jobId][remoteIp];
try
{
sock.Shutdown(SocketShutdown.Both);
sock.Close();
}
catch (Exception e)
{
log.Error("Error in closing socket connection for submitted job info ID " + jobId + ": {0}", e);
}
jobIpSockets[jobId].Remove(remoteIp);
log.Info("Closed socket for submitted job info Id: " + jobId + " and remote IP address: " + remoteIp);
}
else log.Error("Job " + jobId + " with IP " + remoteIp + " does not have an active socket.");
}
#endregion
}
}