/* * send_message.c - Part of AFD, an automatic file distribution program. * Copyright (c) 1998 - 2020 Holger Kiehl * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ #include "afddefs.h" DESCR__S_M3 /* ** NAME ** send_message - sends a message via MSG_FIFO to the FD ** ** SYNOPSIS ** void send_message(char *outgoing_file_dir, ** dev_t dev, ** char *p_unique_name, ** unsigned int split_job_counter, ** unsigned int unique_number, ** time_t creation_time, ** int position, ** int files_moved, ** int files_to_send, ** off_t file_size_to_send, ** int do_handle_options) ** ** DESCRIPTION ** The function send_message() sends a message of the following ** format to the FD if SIZEOF_TIME_T is 4: ** ** | | | | | | | | | | ** | | | | | | | +-----------+ | | ** | | | | | | | | +------------------+ | ** | | | | | | | | | +-----------------------+ ** | | | | | | | | | | ** | | | | | | | | | +---> char ** | | | | | | | | +------> char ** | | | | | | | +---------> unsigned short ** | | | | | | +------------> unsigned int ** | | | | | +------------------> off_t ** | | | | +-----------------------> unsigned int ** | | | +----------------------------> unsigned int ** | | +---------------------------------> unsigned int ** | +--------------------------------------> dev_t ** +------------------------------------------------> time_t ** ** If SIZEOF_TIME_T is not 4 then the order is as follows: ** ** ** | | | | | | | | | | ** | | | | | | | +-----------+ | | ** | | | | | | | | +------------------+ | ** | | | | | | | | | +-----------------------+ ** | | | | | | | | | | ** | | | | | | | | | +---> char ** | | | | | | | | +------> char ** | | | | | | | +---------> unsigned short ** | | | | | | +------------> unsigned int ** | | | | | +------------------> unsigned int ** | | | | +-----------------------> unsigned int ** | | | +----------------------------> unsigned int ** | | +---------------------------------> dev_t ** | +--------------------------------------> off_t ** +------------------------------------------------> time_t ** ** ** FID - Filesystem Device ID. ** JID - Job ID ** SJC - Split Job Counter ** FTS - Files To Send ** FSTS - File Size To Send ** dir no - this is the extra directory number so we do not create ** to many directories in one dir ** ** But only when the FD is currently active. If not these messages ** get stored in the buffer 'mb'. ** ** RETURN VALUES ** None. ** ** AUTHOR ** H.Kiehl ** ** HISTORY ** 21.01.1998 H.Kiehl Created ** 24.09.2004 H.Kiehl Added split job counter. ** */ DESCR__E_M3 #include /* strcpy(), strcat(), strerror(), memcpy() */ #include /* exit(), strtoul() */ #include /* write() */ #include #include #ifdef _WITH_PTHREAD # include #endif #include #include "amgdefs.h" /* External global variables. */ extern int fsa_fd, mb_fd, msg_fifo_fd, *no_msg_buffered; extern char *p_work_dir; #ifndef _WITH_PTHREAD extern char *file_name_buffer; #endif extern struct filetransfer_status *fsa; extern struct afd_status *p_afd_status; extern struct instant_db *db; extern struct message_buf *mb; #ifdef _WITH_PTHREAD extern pthread_mutex_t fsa_mutex; #else # if defined (_DELETE_LOG) || defined (_PRODUCTION_LOG) extern off_t *file_size_pool; extern char **file_name_pool; extern unsigned char *file_length_pool; # endif #endif #ifdef _DELETE_LOG extern struct delete_log dl; #endif /* Local function prototypes. */ static void store_msg(char *); /*############################ send_message() ###########################*/ void send_message(char *outgoing_file_dir, #ifdef MULTI_FS_SUPPORT dev_t dev, #endif char *p_unique_name, unsigned int split_job_counter, unsigned int unique_number, time_t creation_time, int position, #ifdef _WITH_PTHREAD # if defined (_DELETE_LOG) || defined (_PRODUCTION_LOG) off_t *file_size_pool, char **file_name_pool, unsigned char *file_length_pool, # endif #endif int files_moved, int files_to_send, off_t file_size_to_send, int do_handle_options) { char file_path[MAX_PATH_LENGTH]; (void)strcpy(file_path, outgoing_file_dir); (void)strcat(file_path, p_unique_name); if (do_handle_options == YES) { if (db[position].lfs & DELETE_ALL_FILES) { #if defined (_DELETE_LOG) || defined (_PRODUCTION_LOG) # ifdef _DELETE_LOG int gotcha, j; size_t dl_real_size; # endif int i; char *p_file_name; p_file_name = file_name_buffer; for (i = 0; i < files_to_send; i++) { # ifdef _PRODUCTION_LOG production_log(creation_time, 1, 0, unique_number, split_job_counter, db[position].job_id, db[position].dir_id, 0.0, 0L, 0L, # if SIZEOF_OFF_T == 4 "%s%c%lx%c%c%c0%c%s", # else "%s%c%llx%c%c%c0%c%s", # endif p_file_name, SEPARATOR_CHAR, (pri_off_t)file_size_pool[i], SEPARATOR_CHAR, SEPARATOR_CHAR, SEPARATOR_CHAR, SEPARATOR_CHAR, DELETE_ID); # endif # ifdef _DELETE_LOG (void)strcpy(dl.file_name, p_file_name); (void)snprintf(dl.host_name, MAX_HOSTNAME_LENGTH + 4 + 1, "%-*s %03x", MAX_HOSTNAME_LENGTH, db[position].host_alias, DELETE_OPTION); gotcha = NO; for (j = 0; j < files_moved; j++) { if (CHECK_STRCMP(p_file_name, file_name_pool[j]) == 0) { *dl.file_size = file_size_pool[j]; *dl.file_name_length = file_length_pool[j]; gotcha = YES; break; } } if (gotcha == NO) { *dl.file_size = 0; *dl.file_name_length = strlen(p_file_name); } *dl.dir_id = db[position].dir_id; *dl.job_id = db[position].job_id; *dl.input_time = creation_time; *dl.split_job_counter = split_job_counter; *dl.unique_number = unique_number; dl_real_size = *dl.file_name_length + dl.size + snprintf((dl.file_name + *dl.file_name_length + 1), MAX_FILENAME_LENGTH + 1, "%s%c(%s %d)", DIR_CHECK, SEPARATOR_CHAR, __FILE__, __LINE__); if (write(dl.fd, dl.data, dl_real_size) != dl_real_size) { system_log(ERROR_SIGN, __FILE__, __LINE__, "write() error : %s", strerror(errno)); } # endif p_file_name += MAX_FILENAME_LENGTH; } #endif files_to_send = 0; } else { if (db[position].no_of_loptions > 0) { if (handle_options(position, creation_time, unique_number, split_job_counter, file_path, &files_to_send, &file_size_to_send) != 0) { /* The function reported the error, so */ /* no need to do it here again. */ return; } } } } if (files_to_send > 0) { off_t lock_offset; char fifo_buffer[MAX_BIN_MSG_LENGTH], *ptr; unsigned short dir_no; ptr = p_unique_name + 1; while ((*ptr != '/') && (*ptr != '\0')) { ptr++; } if (*ptr != '/') { system_log(ERROR_SIGN, __FILE__, __LINE__, "Unable to find directory number in `%s'", p_unique_name); return; } dir_no = (unsigned short)strtoul(ptr + 1, NULL, 16); if (do_handle_options == YES) { #ifdef _WITH_PTHREAD int rtn; if ((rtn = pthread_mutex_lock(&fsa_mutex)) != 0) { system_log(ERROR_SIGN, __FILE__, __LINE__, "pthread_mutex_lock() error : %s", strerror(rtn)); } #endif lock_offset = AFD_WORD_OFFSET + (db[position].position * sizeof(struct filetransfer_status)); /* Total file counter. */ #ifdef LOCK_DEBUG lock_region_w(fsa_fd, LOCK_CHECK_FSA_ENTRIES, __FILE__, __LINE__); lock_region_w(fsa_fd, lock_offset + LOCK_TFC, __FILE__, __LINE__); #else lock_region_w(fsa_fd, LOCK_CHECK_FSA_ENTRIES); lock_region_w(fsa_fd, lock_offset + LOCK_TFC); #endif fsa[db[position].position].total_file_counter += files_to_send; /* Total file size. */ fsa[db[position].position].total_file_size += file_size_to_send; #ifdef LOCK_DEBUG unlock_region(fsa_fd, lock_offset + LOCK_TFC, __FILE__, __LINE__); unlock_region(fsa_fd, LOCK_CHECK_FSA_ENTRIES, __FILE__, __LINE__); #else unlock_region(fsa_fd, lock_offset + LOCK_TFC); unlock_region(fsa_fd, LOCK_CHECK_FSA_ENTRIES); #endif #ifdef _WITH_PTHREAD if ((rtn = pthread_mutex_unlock(&fsa_mutex)) != 0) { system_log(ERROR_SIGN, __FILE__, __LINE__, "pthread_mutex_unlock() error : %s", strerror(rtn)); } #endif } *(time_t *)(fifo_buffer) = creation_time; #ifdef MULTI_FS_SUPPORT # if SIZEOF_TIME_T == 4 *(dev_t *)(fifo_buffer + sizeof(time_t)) = dev; *(unsigned int *)(fifo_buffer + sizeof(time_t) + sizeof(dev_t)) = db[position].job_id; *(unsigned int *)(fifo_buffer + sizeof(time_t) + sizeof(dev_t) + sizeof(unsigned int)) = split_job_counter; *(unsigned int *)(fifo_buffer + sizeof(time_t) + sizeof(dev_t) + sizeof(unsigned int) + sizeof(unsigned int)) = files_to_send; *(off_t *)(fifo_buffer + sizeof(time_t) + sizeof(dev_t) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(unsigned int)) = file_size_to_send; # else *(off_t *)(fifo_buffer + sizeof(time_t)) = file_size_to_send; *(dev_t *)(fifo_buffer + sizeof(time_t) + sizeof(off_t)) = dev; *(unsigned int *)(fifo_buffer + sizeof(time_t) + sizeof(off_t) + sizeof(dev_t)) = db[position].job_id; *(unsigned int *)(fifo_buffer + sizeof(time_t) + sizeof(off_t) + sizeof(dev_t) + sizeof(unsigned int)) = split_job_counter; *(unsigned int *)(fifo_buffer + sizeof(time_t) + sizeof(off_t) + sizeof(dev_t) + sizeof(unsigned int) + sizeof(unsigned int)) = files_to_send; # endif *(unsigned int *)(fifo_buffer + sizeof(time_t) + sizeof(dev_t) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(off_t)) = unique_number; *(unsigned short *)(fifo_buffer + sizeof(time_t) + sizeof(dev_t) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(off_t) + sizeof(unsigned int)) = dir_no; *(char *)(fifo_buffer + sizeof(time_t) + sizeof(dev_t) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(off_t) + sizeof(unsigned int) + sizeof(unsigned short)) = db[position].priority; *(char *)(fifo_buffer + sizeof(time_t) + sizeof(dev_t) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(off_t) + sizeof(unsigned int) + sizeof(unsigned short) + sizeof(char)) = AMG_NO; #else # if SIZEOF_TIME_T == 4 *(unsigned int *)(fifo_buffer + sizeof(time_t)) = db[position].job_id; *(unsigned int *)(fifo_buffer + sizeof(time_t) + sizeof(unsigned int)) = split_job_counter; *(unsigned int *)(fifo_buffer + sizeof(time_t) + sizeof(unsigned int) + sizeof(unsigned int)) = files_to_send; *(off_t *)(fifo_buffer + sizeof(time_t) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(unsigned int)) = file_size_to_send; # else *(off_t *)(fifo_buffer + sizeof(time_t)) = file_size_to_send; *(unsigned int *)(fifo_buffer + sizeof(time_t) + sizeof(off_t)) = db[position].job_id; *(unsigned int *)(fifo_buffer + sizeof(time_t) + sizeof(off_t) + sizeof(unsigned int)) = split_job_counter; *(unsigned int *)(fifo_buffer + sizeof(time_t) + sizeof(off_t) + sizeof(unsigned int) + sizeof(unsigned int)) = files_to_send; # endif *(unsigned int *)(fifo_buffer + sizeof(time_t) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(off_t)) = unique_number; *(unsigned short *)(fifo_buffer + sizeof(time_t) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(off_t) + sizeof(unsigned int)) = dir_no; *(char *)(fifo_buffer + sizeof(time_t) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(off_t) + sizeof(unsigned int) + sizeof(unsigned short)) = db[position].priority; *(char *)(fifo_buffer + sizeof(time_t) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(unsigned int) + sizeof(off_t) + sizeof(unsigned int) + sizeof(unsigned short) + sizeof(char)) = AMG_NO; #endif /* MULTI_FS_SUPPORT */ /* * Send the message name via fifo to the FD. If the * FD is not active queue it in a special buffer. * When sending a message always make sure that this * buffer is empty. */ if (p_afd_status->fd == ON) { /* * NOTE: If we do NOT do the next check we will have a deadlock * if check_file_dir() generates a message, since it waits * for send_message() to return. */ if (do_handle_options == YES) { /* * If check_file_dir() is active, no forked job may send any * message. Otherwise check_file_dir() will add this job to * the queue. */ while ((p_afd_status->amg_jobs & CHECK_FILE_DIR_ACTIVE) && (p_afd_status->fd == ON)) { (void)my_usleep(10000L); } } if (p_afd_status->fd == ON) { /* Check if there are any messages in the queue. */ if (*no_msg_buffered > 0) { clear_msg_buffer(); } if (write(msg_fifo_fd, fifo_buffer, MAX_BIN_MSG_LENGTH) != MAX_BIN_MSG_LENGTH) { store_msg(fifo_buffer); system_log(FATAL_SIGN, __FILE__, __LINE__, "Failed to write() to message FIFO : %s", strerror(errno)); exit(INCORRECT); } } else { store_msg(fifo_buffer); } } else /* Queue the message! */ { store_msg(fifo_buffer); } } /* if (files_to_send > 0) */ else { if ((do_handle_options == NO) || (db[position].loptions != NULL)) { /* A directory has already been created. Lets remove it. */ if (rec_rmdir(file_path) < 0) { system_log(WARN_SIGN, __FILE__, __LINE__, "Failed to remove directory %s", file_path); } } } return; } /*+++++++++++++++++++++++++++++ store_msg() +++++++++++++++++++++++++++++*/ static void store_msg(char *msg) { #ifdef LOCK_DEBUG lock_region_w(mb_fd, (off_t)0, __FILE__, __LINE__); #else lock_region_w(mb_fd, (off_t)0); #endif if ((*no_msg_buffered != 0) && ((*no_msg_buffered % MESSAGE_BUF_STEP_SIZE) == 0)) { char *ptr; size_t new_size = (((*no_msg_buffered / MESSAGE_BUF_STEP_SIZE) + 1) * MESSAGE_BUF_STEP_SIZE * sizeof(struct message_buf)) + AFD_WORD_OFFSET; ptr = (char *)mb - AFD_WORD_OFFSET; if ((ptr = mmap_resize(mb_fd, ptr, new_size)) == (caddr_t) -1) { system_log(FATAL_SIGN, __FILE__, __LINE__, "mmap_resize() error : %s", strerror(errno)); exit(INCORRECT); } no_msg_buffered = (int *)ptr; ptr += AFD_WORD_OFFSET; mb = (struct message_buf *)ptr; } (void)memcpy(mb[*no_msg_buffered].bin_msg_name, msg, MAX_BIN_MSG_LENGTH); (*no_msg_buffered)++; #ifdef LOCK_DEBUG unlock_region(mb_fd, 0, __FILE__, __LINE__); #else unlock_region(mb_fd, 0); #endif return; }