Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using HaaSMiddleware.BusinessLogicTier.Factory;
using HaaSMiddleware.DataAccessTier.UnitOfWork;
using HaaSMiddleware.DomainObjects.DataTransfer;
using HaaSMiddleware.DomainObjects.UserAndLimitationManagement;
using HaaSMiddleware.DomainObjects.JobManagement.JobInformation;
using HaaSMiddleware.HpcConnectionFramework;
using log4net;
using System.Threading;
using System.Net;
using System.Net.Sockets;
using System.Text;
using HaaSMiddleware.BusinessLogicTier.Logic.DataTransfer.Exceptions;
namespace HaaSMiddleware.BusinessLogicTier.Logic.DataTransfer {
internal class DataTransferLogic : IDataTransferLogic {
private static readonly ILog log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private readonly IUnitOfWork unitOfWork;
//local sockets for jobs and ips
private static Dictionary<long, Dictionary<string, Socket>> jobIpSockets = new Dictionary<long, Dictionary<string, Socket>>();
private static Dictionary<string, int> ipLocalPorts = new Dictionary<string, int>();
internal DataTransferLogic(IUnitOfWork unitOfWork)
{
this.unitOfWork = unitOfWork;
}
public DataTransferMethod GetDataTransferMethod(string ipAddress, int port, SubmittedJobInfo jobInfo, AdaptorUser loggedUser)
{
if (jobInfo.State > JobState.Running)
throw new UnableToCreateConnectionException("JobId " + jobInfo.Id + " is not a currently running job.");
log.Info("Getting data transfer method for submitted job info ID " + jobInfo.Id + " with user " + loggedUser.GetLogIdentification());
try
{
//create tunnel
int allocatedPort = CreateSshTunnel(jobInfo, "127.0.0.1", 4000, "salomon.it4i.cz", ipAddress, port);
ipLocalPorts.Add(ipAddress, allocatedPort);
//create socket
CreateLocalSocketConnection(jobInfo.Id, ipAddress, allocatedPort);
DataTransferMethod dtMethod = new DataTransferMethod();
dtMethod.submittedJobId = jobInfo.Id;
dtMethod.IpAddress = ipAddress;
dtMethod.Port = port;
return dtMethod;
}
catch(Exception e)
{
RemoveSshTunnel(jobInfo, ipAddress);
CloseLocalSocketConnection(jobInfo.Id, ipAddress);
log.Error("GetDataTransferMethod error: {0}", e);
return null;
}
}
public void EndDataTransfer(DataTransferMethod transferMethod, SubmittedJobInfo jobInfo, AdaptorUser loggedUser)
{
log.Info("Removing data transfer method for submitted job info ID " + transferMethod.submittedJobId + " with user " + loggedUser.GetLogIdentification());
if (jobIpSockets.Keys.Contains(transferMethod.submittedJobId))
{
try
{
//remove tunnel
ipLocalPorts.Remove(transferMethod.IpAddress);
RemoveSshTunnel(jobInfo, transferMethod.IpAddress);
//remove socket
CloseLocalSocketConnection(transferMethod.submittedJobId, transferMethod.IpAddress);
}
catch (Exception e)
{
log.Error("Error in removing data transfer method for submitted job info ID " + transferMethod.submittedJobId + ": {0}", e);
}
}
else log.Error("Job " + transferMethod.submittedJobId + " with IP " + transferMethod.IpAddress + " does not have an active socket.");
}
public int WriteDataToJobNode(byte[] data, long submittedJobInfoId, string ipAddress, string sessionCode, bool closeConnection)
{
SubmittedJobInfo jobInfo = unitOfWork.SubmittedJobInfoRepository.GetById(submittedJobInfoId);
int bytesSent = 0;
if (SshTunnelExist(jobInfo, ipAddress))
{
if (!jobIpSockets.Keys.Contains(submittedJobInfoId) || !jobIpSockets[submittedJobInfoId].Keys.Contains(ipAddress))
{
int allocatedPort = ipLocalPorts[ipAddress];
//create new socket
CreateLocalSocketConnection(submittedJobInfoId, ipAddress, allocatedPort);
}
//else
//{
if (data != null)
{
Socket sock = jobIpSockets[submittedJobInfoId][ipAddress];
try
{
bytesSent = sock.Send(data);
log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " sent " + bytesSent + " bytes.");
}
catch (Exception e)
{
log.Error("Socket write error: {0}", e);
}
}
else
{
log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " : nothing to send, NULL data value.");
}
if (closeConnection)
{
Socket sock = jobIpSockets[submittedJobInfoId][ipAddress];
sock.Shutdown(SocketShutdown.Send);
log.Info("Closed SEND direction in socket for submitted job info Id: " + submittedJobInfoId + " and remote IP address: " + ipAddress);
}
//}
}
else log.Error("Job " + submittedJobInfoId + " with IP remote " + ipAddress + " does not have an active connection.");
/**
if (jobIpSockets.Keys.Contains(submittedJobInfoId) && jobIpSockets[submittedJobInfoId].Keys.Contains(ipAddress))
{
Socket sock = jobIpSockets[submittedJobInfoId][ipAddress];
try
{
bytesSent = sock.Send(data);
log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " sent " + bytesSent + " bytes.");
}
catch (Exception e)
{
log.Error("Socket write error: {0}", e);
}
if (closeConnection)
{
sock.Shutdown(SocketShutdown.Send);
log.Info("Closed SEND direction in socket for submitted job info Id: " + submittedJobInfoId + " and remote IP address: " + ipAddress);
}
}
else if (SshTunnelExist(jobInfo, ipAddress))
{
int allocatedPort = ipLocalPorts[ipAddress];
//create new socket
CreateLocalSocketConnection(submittedJobInfoId, ipAddress, allocatedPort);
}
else log.Error("Job " + submittedJobInfoId + " with IP remote " + ipAddress + " does not have an active connection.");
*/
return bytesSent;
}
public byte[] ReadDataFromJobNode(long submittedJobInfoId, string ipAddress, string sessionCode)
{
if (jobIpSockets.Keys.Contains(submittedJobInfoId) && jobIpSockets[submittedJobInfoId].Keys.Contains(ipAddress))
{
Socket sock = jobIpSockets[submittedJobInfoId][ipAddress];
byte[] buffer = new byte[1048576];
byte[] retData = new byte[0];
try
{
int bytesRec = sock.Receive(buffer);
if (bytesRec > 0)
{
retData = new byte[bytesRec];
Array.Copy(buffer, retData, bytesRec);
} else {
log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " received 0 bytes - returning null and closing socket.");
CloseLocalSocketConnection(submittedJobInfoId, ipAddress);
return null;
}
log.Info("Socket from job " + submittedJobInfoId + " with remote IP " + ipAddress + " received " + bytesRec + " bytes.");
}
catch (Exception e)
{
log.Error("Socket read error: {0}", e);
}
return retData;
}
else
{
log.Error("Job " + submittedJobInfoId + " with remote IP " + ipAddress + " does not have an active socket.");
return null;
}
}
public List<long> GetJobIdsForOpenSockets()
{
List<long> keyList = new List<long>(jobIpSockets.Keys);
return keyList;
}
public void CloseAllConnectionsForJob(SubmittedJobInfo jobInfo)
{
foreach (string ipAddress in jobIpSockets[jobInfo.Id].Keys.ToList())
{
//close tunnel
RemoveSshTunnel(jobInfo, ipAddress);
//close socket
CloseLocalSocketConnection(jobInfo.Id, ipAddress);
}
jobIpSockets.Remove(jobInfo.Id);
log.InfoFormat("Closed all connections for jobId {0}", jobInfo.Id);
}
#region Protected methods
protected int CreateSshTunnel(SubmittedJobInfo jobInfo, string localHost, int localPort, string loginHost, string nodeHost, int nodePort)
{
while (!IsPortFree(localHost, localPort))
++localPort;
SchedulerFactory.GetInstance(jobInfo.NodeType.Cluster.SchedulerType).CreateScheduler(jobInfo.NodeType.Cluster).CreateSshTunnel(
jobInfo.Specification.Id, localHost, localPort, loginHost, nodeHost, nodePort, jobInfo.Specification.ClusterUser
);
return localPort;
}
protected void RemoveSshTunnel(SubmittedJobInfo jobInfo, string nodeHost)
{
SchedulerFactory.GetInstance(jobInfo.NodeType.Cluster.SchedulerType).CreateScheduler(jobInfo.NodeType.Cluster).RemoveSshTunnel(
jobInfo.Specification.Id, nodeHost
);
}
protected bool SshTunnelExist(SubmittedJobInfo jobInfo, string nodeHost)
{
return SchedulerFactory.GetInstance(jobInfo.NodeType.Cluster.SchedulerType).CreateScheduler(jobInfo.NodeType.Cluster).SshTunnelExist(
jobInfo.Specification.Id, nodeHost
);
}
protected bool IsPortFree(string ipAddress, int port)
{
try
{
IPAddress address = IPAddress.Parse(ipAddress);
TcpListener tcpListener = new TcpListener(address, port);
tcpListener.Start();
tcpListener.Stop();
return true;
}
catch (SocketException ex)
{
return false;
}
}
protected void CreateLocalSocketConnection(long jobId, string remoteIp, int localPort)
{
if (jobIpSockets.Keys.Contains(jobId) && jobIpSockets[jobId].Keys.Contains(remoteIp))
log.Error("Job " + jobId + " with remote IP " + remoteIp + " already has socket a connection.");
else
{
Socket socket = null;
try
{
//create socket
IPAddress address = IPAddress.Parse("127.0.0.1");
IPEndPoint remoteEP = new IPEndPoint(address, localPort);
socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
socket.NoDelay = true;
//connect to remote host
socket.Connect(remoteEP);
//add socket to dictionary
if (jobIpSockets.Keys.Contains(jobId))
jobIpSockets[jobId].Add(remoteIp, socket);
else jobIpSockets.Add(jobId, new Dictionary<string, Socket> { { remoteIp, socket } });
log.Info("New socket connection created for job " + jobId + " with remote IP " + remoteIp + " and local port " + localPort);
}
catch (Exception e)
{
log.Error("Socket creation error: {0}", e);
}
}
}
protected bool SocketConnected(Socket socket)
{
try
{
return !(socket.Poll(1, SelectMode.SelectRead) && socket.Available == 0);
}
catch (SocketException) { return false; }
}
protected void CloseLocalSocketConnection(long jobId, string remoteIp)
{
if (jobIpSockets.Keys.Contains(jobId) && jobIpSockets[jobId].Keys.Contains(remoteIp))
{
Socket sock = jobIpSockets[jobId][remoteIp];
try
{
sock.Shutdown(SocketShutdown.Both);
sock.Close();
}
catch (Exception e)
{
log.Error("Error in closing socket connection for submitted job info ID " + jobId + ": {0}", e);
}
jobIpSockets[jobId].Remove(remoteIp);
log.Info("Closed socket for submitted job info Id: " + jobId + " and remote IP address: " + remoteIp);
}
else log.Error("Job " + jobId + " with IP " + remoteIp + " does not have an active socket.");
}
#endregion
}
}