Commit 4564986a authored by Martin Pulec's avatar Martin Pulec
Browse files

Added windows SHM implementation

parent 3e9f14ea
......@@ -43,6 +43,7 @@
#ifdef _WIN32
#include <windows.h>
#include <tchar.h>
#else
#include <unistd.h>
#include <errno.h>
......@@ -56,6 +57,7 @@
#include <inttypes.h>
#include <stdint.h>
#include <stdio.h>
#include "platform_ipc.h"
......@@ -78,6 +80,32 @@ static key_t get_key(const char *id, int proj_id) {
}
#endif
#ifdef _WIN32
void PrintError(void)
{
// Retrieve the system error message for the last-error code
LPVOID lpMsgBuf;
DWORD dw = GetLastError();
FormatMessage(
FORMAT_MESSAGE_ALLOCATE_BUFFER |
FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL,
dw,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPTSTR) &lpMsgBuf,
0, NULL );
// Display the error message and exit the process
_tprintf(TEXT("%s"), (LPCTSTR) lpMsgBuf);
LocalFree(lpMsgBuf);
}
#endif
//
// SHM
//
......@@ -109,6 +137,20 @@ static platform_ipc_shm_t platform_ipc_shm_open_common(const char *id, size_t si
platform_ipc_shm_t platform_ipc_shm_create(const char *id, size_t size)
{
#ifdef _WIN32
HANDLE hMapFile = CreateFileMapping(
INVALID_HANDLE_VALUE, // use paging file
NULL, // default security
PAGE_READWRITE, // read/write access
0, // maximum object size (high-order DWORD)
size, // maximum object size (low-order DWORD)
id); // name of mapping object
if (hMapFile == NULL) {
_tprintf(TEXT("Could not create file mapping object (%d).\n"),
GetLastError());
PrintError();
return NULL;
}
return hMapFile;
#else
return platform_ipc_shm_open_common(id, size, IPC_CREAT | IPC_EXCL | 0666);
#endif
......@@ -117,15 +159,44 @@ platform_ipc_shm_t platform_ipc_shm_create(const char *id, size_t size)
platform_ipc_shm_t platform_ipc_shm_open(const char *id, size_t size)
{
#ifdef _WIN32
HANDLE hMapFile = OpenFileMapping(
FILE_MAP_ALL_ACCESS, // read/write access
FALSE, // do not inherit the name
id); // name of mapping object
if (hMapFile == NULL)
{
_tprintf(TEXT("Could not open file mapping object (%d).\n"),
GetLastError());
PrintError();
return NULL;
}
return hMapFile;
#else
return platform_ipc_shm_open_common(id, size, 0);
#endif
}
void *platform_ipc_shm_attach(platform_ipc_shm_t handle)
void *platform_ipc_shm_attach(platform_ipc_shm_t handle, size_t size)
{
#ifdef _WIN32
LPVOID pBuf = (LPTSTR) MapViewOfFile(handle, // handle to map object
FILE_MAP_ALL_ACCESS, // read/write permission
0,
0,
size);
if (pBuf == NULL)
{
_tprintf(TEXT("Could not map view of file (%d).\n"),
GetLastError());
PrintError();
return NULL;
}
return pBuf;
#else
(void) size;
void *ret = shmat(handle, NULL, 0);
if (ret == (void *) -1) {
perror("shmat");
......@@ -137,6 +208,7 @@ void *platform_ipc_shm_attach(platform_ipc_shm_t handle)
void platform_ipc_shm_detach(void *ptr)
{
#ifdef _WIN32
UnmapViewOfFile(ptr);
#else
if (shmdt(ptr) == -1) {
perror("shmdt");
......@@ -144,12 +216,16 @@ void platform_ipc_shm_detach(void *ptr)
#endif
}
void platform_ipc_shm_destroy(platform_ipc_shm_t handle)
void platform_ipc_shm_done(platform_ipc_shm_t handle, bool destroy)
{
#ifdef _WIN32
(void) destroy;
CloseHandle(handle);
#else
if (shmctl(handle, IPC_RMID , 0) == -1) {
perror("shmctl");
if (destroy) {
if (shmctl(handle, IPC_RMID , 0) == -1) {
perror("shmctl");
}
}
#endif
}
......@@ -183,6 +259,22 @@ static platform_ipc_sem_t platform_ipc_sem_open_common(const char *id, int index
platform_ipc_sem_t platform_ipc_sem_create(const char *id, int index)
{
#ifdef _WIN32
char name[strlen(id) + 21 + 1];
sprintf(name, "%s-%d", id, index);
HANDLE ghSemaphore = CreateSemaphore(
NULL, // default security attributes
0, // initial count
(1<<15) - 1, // maximum count
name); // unnamed semaphore
if (ghSemaphore == NULL) {
printf("CreateSemaphore error: %d\n", GetLastError());
PrintError();
return NULL;
}
return ghSemaphore;
#else
return platform_ipc_sem_open_common(id, index, IPC_CREAT | IPC_EXCL | 0666);
#endif
......@@ -191,6 +283,21 @@ platform_ipc_sem_t platform_ipc_sem_create(const char *id, int index)
platform_ipc_sem_t platform_ipc_sem_open(const char *id, int index)
{
#ifdef _WIN32
char name[strlen(id) + 21 + 1];
sprintf(name, "%s-%d", id, index);
HANDLE ghSemaphore = OpenSemaphore(
SEMAPHORE_ALL_ACCESS, // default security attributes
FALSE, // inherit
name); // named semaphore
if (ghSemaphore == NULL) {
printf("CreateSemaphore error: %d\n", GetLastError());
PrintError();
return NULL;
}
return ghSemaphore;
#else
return platform_ipc_sem_open_common(id, index, 0);
#endif
......@@ -199,6 +306,15 @@ platform_ipc_sem_t platform_ipc_sem_open(const char *id, int index)
bool platform_ipc_sem_post(platform_ipc_sem_t handle)
{
#ifdef _WIN32
if (!ReleaseSemaphore(
handle, // handle to semaphore
1, // increase count by one
NULL) ) // not interested in previous count
{
printf("ReleaseSemaphore error: %d\n", GetLastError());
PrintError();
return false;
}
#else
struct sembuf op;
op.sem_num = 0;
......@@ -208,13 +324,32 @@ bool platform_ipc_sem_post(platform_ipc_sem_t handle)
perror("semop");
return false;
}
return true;
#endif
return true;
}
bool platform_ipc_sem_wait(platform_ipc_sem_t handle)
{
#ifdef _WIN32
DWORD dwWaitResult;
dwWaitResult = WaitForSingleObject(
handle, // handle to semaphore
INFINITE); // infinite time-out interval
switch (dwWaitResult)
{
// The semaphore object was signaled.
case WAIT_OBJECT_0:
return true;
// The semaphore was nonsignaled, so a time-out occurred.
case WAIT_TIMEOUT:
printf("Thread %d: wait timed out\n", GetCurrentThreadId());
return false;
default:
abort();
}
#else
struct sembuf op;
op.sem_num = 0;
......@@ -228,13 +363,17 @@ bool platform_ipc_sem_wait(platform_ipc_sem_t handle)
#endif
}
void platform_ipc_sem_destroy(platform_ipc_sem_t handle)
void platform_ipc_sem_done(platform_ipc_sem_t handle, bool destroy)
{
#ifdef _WIN32
(void) destroy;
CloseHandle(handle);
#else
if (semctl(handle, IPC_RMID , 0) == -1) {
perror("semctl");
}
if (destroy) {
if (semctl(handle, IPC_RMID , 0) == -1) {
perror("semctl");
}
}
#endif
}
......@@ -45,6 +45,7 @@
#endif // defined __cplusplus
#ifdef _WIN32
#include <windows.h>
typedef HANDLE platform_ipc_shm_t;
typedef HANDLE platform_ipc_sem_t;
#define PLATFORM_IPC_ERR NULL
......@@ -64,9 +65,9 @@ extern "C" {
*/
platform_ipc_shm_t platform_ipc_shm_create(const char *id, size_t size);
platform_ipc_shm_t platform_ipc_shm_open(const char *id, size_t size);
void *platform_ipc_shm_attach(platform_ipc_shm_t handle);
void *platform_ipc_shm_attach(platform_ipc_shm_t handle, size_t size);
void platform_ipc_shm_detach(void *ptr);
void platform_ipc_shm_destroy(platform_ipc_shm_t handle);
void platform_ipc_shm_done(platform_ipc_shm_t handle, bool destroy);
/**
* @param index index of semaphore for unique ID (see ftok(), param proj_id),
......@@ -76,7 +77,7 @@ platform_ipc_sem_t platform_ipc_sem_create(const char *id, int index);
platform_ipc_sem_t platform_ipc_sem_open(const char *id, int index);
bool platform_ipc_sem_post(platform_ipc_sem_t handle);
bool platform_ipc_sem_wait(platform_ipc_sem_t handle);
void platform_ipc_sem_destroy(platform_ipc_sem_t handle);
void platform_ipc_sem_done(platform_ipc_sem_t handle, bool destroy);
#ifdef __cplusplus
}
......
#include "ultragrid.h"
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
......@@ -10,8 +11,14 @@
#include "platform_ipc.h"
#include "vrgstream.h"
#define REQUIRED_SHM_VERSION 6
#define KEY "UltraGrid-SHM"
#define MAX_BUF_LEN (7680 * 2160 / 3 * 2)
#define REQUIRED_SHM_VERSION 7
#define UG_CUDA_IPC_HANDLE_SIZE 64
#if defined WITH_CLIENT_CUDA
static_assert(UG_CUDA_IPC_HANDLE_SIZE == CUDA_IPC_HANDLE_SIZE, "CUDA IPC handle doesn't match expected size!");
#endif
static void convert_render_pkt_to_view_matrix(struct RenderPacket *pkt,
cyclesphi::cyclesphi_data *cdata);
......@@ -128,7 +135,7 @@ int g_sensor_width = 36;
struct cesnet_shm {
int version;
int width, height;
cudaIpcMemHandle_t d_ptr;
char cuda_ipc_mem_handle[UG_CUDA_IPC_HANDLE_SIZE];
int ug_exited;
int use_gpu;
struct RenderPacket pkt;
......@@ -243,23 +250,31 @@ void cesnet_set_render_buffer_yuv_i420(
if (width == 0 || height == 0) {
return;
}
#ifdef WITH_CUDA_CLIENT
cudaSetDevice(0);
#endif
platform_ipc_shm_t id = platform_ipc_shm_open(KEY, sizeof(cesnet_shm));
if (id == PLATFORM_IPC_ERR) {
return;
platform_ipc_shm_t shm_id = PLATFORM_IPC_ERR;
struct cesnet_shm *cesnet_shm = (struct cesnet_shm *) PLATFORM_IPC_ERR;
platform_ipc_sem_t sem[3] = { PLATFORM_IPC_ERR, PLATFORM_IPC_ERR, PLATFORM_IPC_ERR };
int i;
bool post_should_exit = false;
size_t size = offsetof(struct cesnet_shm, data[MAX_BUF_LEN]);
shm_id = platform_ipc_shm_open(KEY, size);
if (shm_id == PLATFORM_IPC_ERR) {
goto done;
}
struct cesnet_shm *cesnet_shm = (struct cesnet_shm *) platform_ipc_shm_attach(id);
cesnet_shm = (struct cesnet_shm *) platform_ipc_shm_attach(shm_id, size);
if (cesnet_shm == (void *) PLATFORM_IPC_ERR) {
return;
}
/// 3 semaphores: [ready_to_consume_frame, freame_ready, lock]
platform_ipc_sem_t sem[3];
int i = 1;
i = 1;
for (auto &s : sem) {
s = platform_ipc_sem_open(KEY, i++);
if (s == PLATFORM_IPC_ERR) {
return;
goto done;
}
}
......@@ -268,24 +283,28 @@ void cesnet_set_render_buffer_yuv_i420(
"Incompatible UG ABI version %d (requred %d)!\n",
cesnet_shm->version,
REQUIRED_SHM_VERSION);
return;
goto done;
}
if (cesnet_shm->ug_exited) {
fprintf(stderr, "UltraGrid exited!\n");
goto done;
}
if (!platform_ipc_sem_wait(sem[SHOULD_EXIT_LOCK])) { // UltraGrid must not exit inbetween
return;
fprintf(stderr, "Waiting for SHOULD_EXIT_LOCK failed!\n");
goto done;
}
if (!platform_ipc_sem_wait(sem[READY_TO_CONSUME_FRAME])) { // wait until UltraGrid is able to take a frame
if (!platform_ipc_sem_post(sem[SHOULD_EXIT_LOCK])) { // try to release the "lock"
fprintf(stderr, "Cannot return to sane state! This is bad, exitting....\n");
exit(1);
}
return;
if (cesnet_shm->ug_exited) {
fprintf(stderr, "UltraGrid exited!\n");
goto done;
}
post_should_exit = true;
if (cesnet_shm->ug_exited) {
platform_ipc_shm_detach(cesnet_shm);
return;
if (!platform_ipc_sem_wait(sem[READY_TO_CONSUME_FRAME])) { // wait until UltraGrid is able to take a frame
fprintf(stderr, "Waiting for UltraGrid ready failed!\n");
goto done;
}
// hereafter UltraGrid must not exit (buffer must be valid) until first semafore has value 0
cesnet_shm->width = width;
cesnet_shm->height = height;
......@@ -294,7 +313,7 @@ void cesnet_set_render_buffer_yuv_i420(
#ifdef WITH_CLIENT_CUDA
char *d_data;
CUDA_CHECK(
cudaIpcOpenMemHandle((void **)&d_data, cesnet_shm->d_ptr, cudaIpcMemLazyEnablePeerAccess));
cudaIpcOpenMemHandle((void **)&d_data, (cudaIpcMemHandle_t) cesnet_shm->d_ptr, cudaIpcMemLazyEnablePeerAccess));
char *tmp = d_data;
int len = width * height;
......@@ -308,7 +327,7 @@ void cesnet_set_render_buffer_yuv_i420(
CUDA_CHECK(cudaIpcCloseMemHandle(d_data));
#else
fprintf(stderr, "CUDA support not compiled in! Use 'uv -t shm' to use standard SHM.\n");
return;
goto done;
#endif
}
else {
......@@ -327,11 +346,26 @@ void cesnet_set_render_buffer_yuv_i420(
}
platform_ipc_sem_post(sem[FRAME_READY]); // post that we have a new frame
if (!platform_ipc_sem_post(sem[SHOULD_EXIT_LOCK])) { // try to release the "lock"
fprintf(stderr, "Cannot return to sane state! This is bad, exitting....\n");
exit(1);
done:
if (post_should_exit) {
if (!platform_ipc_sem_post(sem[SHOULD_EXIT_LOCK])) { // try to release the "lock"
fprintf(stderr, "Cannot return to sane state! This is bad, exitting....\n");
exit(1);
}
}
i = 1;
for (auto &s : sem) {
if (s != PLATFORM_IPC_ERR) {
platform_ipc_sem_done(s, false);
}
}
if (cesnet_shm != (void *) PLATFORM_IPC_ERR) {
platform_ipc_shm_detach(cesnet_shm);
}
if (shm_id != PLATFORM_IPC_ERR) {
platform_ipc_shm_done(shm_id, false);
}
platform_ipc_shm_detach(cesnet_shm);
}
#endif
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment