/* * This file is part of the Sofia-SIP package * * Copyright (C) 2005 Nokia Corporation. * * Contact: Pekka Pessi * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA * */ /**@CFILE nth_client.c * @brief HTTP Client implementhtion * * Copyright (c) 2002 Nokia Research Center. All rights reserved. * * This source file has been divided into following sections: * 1) engine * 2) tport handling * 3) client transactions * * @author Pekka Pessi * * @date Created: Tue Jun 13 02:57:51 2000 ppessi */ #include "config.h" #include #include #include #include #include #include #include /** @internal SU message argument structure type */ #define SU_MSG_ARG_T union sm_arg_u /** @internal SU timer argument pointer type */ #define SU_TIMER_ARG_T struct nth_engine_s #define MSG_HDR_T union http_header_u #define MSG_PUB_T struct http_s #include "sofia-sip/nth.h" #include #include #include #include #include #include #include /* We are customer of tport_t */ #define TP_STACK_T nth_engine_t #define TP_MAGIC_T void #define TP_CLIENT_T nth_client_t #ifndef TPORT_H #include #endif #include #define HE_TIMER HE_TIMER enum { HE_TIMER = 1000 }; /** @c http_flag telling that this message is internally generated. */ #define NTH_INTERNAL_MSG (1<<16) HTABLE_DECLARE_WITH(hc_htable, hct, nth_client_t, uintptr_t, size_t); struct nth_engine_s { su_home_t he_home[1]; su_root_t *he_root; su_timer_t *he_timer; int he_mflags; /**< Message flags */ msg_mclass_t const *he_mclass; tport_t *he_tports; url_t *he_default_proxy; unsigned he_now; unsigned he_expires; /* Attributes */ unsigned he_streaming:1; /**< Enable streaming */ unsigned he_error_msg:1; unsigned:0; /* Statistics */ struct { uint32_t st_requests; /**< Sent requests */ uint32_t st_1xxresponses; /**< Received 1XX responses */ uint32_t st_responses; /**< Received responses */ uint32_t st_tp_errors; /**< Transport errors */ uint32_t st_timeouts; /**< Timeouts */ uint32_t st_bad; /**< Bad responses*/ } he_stats[1]; /** Table for client transactions */ hc_htable_t he_clients[1]; }; struct nth_client_s { nth_engine_t *hc_engine; nth_response_f *hc_callback; nth_client_magic_t *hc_magic; http_method_t hc_method; char const *hc_method_name; url_t const *hc_url; /**< Original RequestURI */ unsigned hc_timeout; /**< Client timeout */ unsigned hc_expires; /**< Client expires */ /* Request state */ unsigned short hc_status; unsigned hc_destroyed:1; unsigned hc_completed:1; unsigned hc_inserted:1; unsigned hc_is_streaming:1; /**< Currently streaming response */ /* Attributes */ unsigned hc_streaming:1; /**< Enable streaming */ unsigned hc_error_msg:1; unsigned /* pad */:0; url_string_t const *hc_route_url; tp_name_t hc_tpn[1]; /**< Where to send requests */ tport_t *hc_tport; int hc_pending; /**< Request is pending in tport */ tagi_t *hc_tags; /**< Transport tags */ auth_client_t **hc_auc; /**< List of authenticators */ msg_t *hc_request; msg_t *hc_response; }; /* ====================================================================== */ /* Debug log settings */ #define SU_LOG nth_client_log #ifdef SU_DEBUG_H #error included directly. #endif #include /**@var NTH_DEBUG * * Environment variable determining the debug log level for @b nth * module. * * The NTH_DEBUG environment variable is used to determine the debug * logging level for @b nth module. The default level is 1. * * @sa , nth_client_log, #SOFIA_DEBUG */ extern char const NTH_DEBUG[]; #ifndef SU_DEBUG #define SU_DEBUG 1 #endif /**Debug log for @b nth module. * * The nth_client_log is the log object used by @b nth client. The level of * #nth_client_log is set using #NTH_DEBUG environment variable. */ su_log_t nth_client_log[] = { SU_LOG_INIT("nth", "NTH_DEBUG", SU_DEBUG) }; #if HAVE_FUNC #elif HAVE_FUNCTION #define __func__ __FUNCTION__ #else static char const __func__[] = "nth"; #endif /* ====================================================================== */ /* Internal message passing */ union sm_arg_u { struct hc_recv_s { nth_client_t *hc; msg_t *msg; http_t *http; } hc_recv[1]; }; /* ====================================================================== */ /* Internal prototypes */ tagi_t nth_client_tags[] = { {nthtag_mclass, 0}, {nthtag_message, 0}, {nthtag_mflags, 0}, {nthtag_proxy, 0}, {nthtag_error_msg, 0}, {nthtag_template, 0}, {nthtag_authentication, 0}, {TAG_NEXT(tport_tags)} }; /* ====================================================================== */ /* Internal prototypes */ static int he_create_tports(nth_engine_t * he, tagi_t *tags); static int he_timer_init(nth_engine_t * he); static void he_timer(su_root_magic_t *, su_timer_t *, nth_engine_t * he); static void hc_timer(nth_engine_t * he, nth_client_t * hc, uint32_t now); static uint32_t he_now(nth_engine_t const *he); static void he_recv_message(nth_engine_t * he, tport_t * tport, msg_t *msg, void *arg, su_time_t now); static msg_t *he_msg_create(nth_engine_t * he, int flags, char const data[], usize_t dlen, tport_t const *tport, nth_client_t * hc); static void he_tp_error(nth_engine_t * he, tport_t * tport, int errcode, char const *remote); static int hc_recv(nth_client_t * hc, msg_t *msg, http_t * http); HTABLE_PROTOS_WITH(hc_htable, hct, nth_client_t, uintptr_t, size_t); #define HTABLE_HASH_CLIENT(hc) ((uintptr_t)(hc)->hc_tport) HTABLE_BODIES_WITH(hc_htable, hct, nth_client_t, HTABLE_HASH_CLIENT, uintptr_t, size_t); static url_string_t const *hc_request_complete(nth_client_t * hc, msg_t *msg, http_t * http, http_method_t method, char const *name, url_string_t const *url, char const *version, url_t const *parent); static int hc_request_authenticate(nth_client_t * hc, msg_t *msg, http_t * http, url_string_t const *uri, auth_client_t **auc); static nth_client_t *hc_create(nth_engine_t * he, nth_response_f * callback, nth_client_magic_t * magic, msg_t *msg, tag_type_t tag, tag_value_t value, ...); static int hc_resolve_and_send(nth_client_t * hc); static nth_client_t *hc_send(nth_client_t * hc); static void hc_insert(nth_engine_t * he, nth_client_t * hc); static void hc_free(nth_client_t * hc); static void hc_tport_error(nth_engine_t *, nth_client_t * hc, tport_t * tp, msg_t *msg, int error); static int hc_reply(nth_client_t * hc, int status, char const *phrase); static int hc_default_cb(nth_client_magic_t * magic, nth_client_t * request, http_t const *http); /* ---------------------------------------------------------------------- */ char const *nth_engine_version(void) { return "sofia-http-client/" NTH_CLIENT_VERSION; } /* ---------------------------------------------------------------------- */ nth_engine_t *nth_engine_create(su_root_t *root, tag_type_t tag, tag_value_t value, ...) { nth_engine_t *he; ta_list ta; if ((he = su_home_new(sizeof(*he)))) { he->he_root = root; he->he_mflags = MSG_DO_CANONIC; he->he_mclass = http_default_mclass(); he->he_expires = 32000; ta_start(ta, tag, value); if (hc_htable_resize(he->he_home, he->he_clients, 0) < 0 || he_create_tports(he, ta_args(ta)) < 0 || he_timer_init(he) < 0 || nth_engine_set_params(he, ta_tags(ta)) < 0) { nth_engine_destroy(he), he = NULL; } ta_end(ta); } return he; } void nth_engine_destroy(nth_engine_t * he) { if (he) { size_t i; hc_htable_t *hct = he->he_clients; for (i = 0; i < hct->hct_size; i++) hc_free(hct->hct_table[i]); tport_destroy(he->he_tports); su_timer_destroy(he->he_timer), he->he_timer = NULL; su_home_unref(he->he_home); } } int nth_engine_set_params(nth_engine_t * he, tag_type_t tag, tag_value_t value, ...) { int n; ta_list ta; unsigned expires; int error_msg; msg_mclass_t const *mclass; int mflags; int streaming; url_string_t const *proxy; if (he == NULL) return (errno = EINVAL), -1; ta_start(ta, tag, value); expires = he->he_expires; error_msg = he->he_error_msg; mclass = he->he_mclass; mflags = he->he_mflags; streaming = he->he_streaming; proxy = (void *) he->he_default_proxy; n = tl_gets(ta_args(ta), NTHTAG_EXPIRES_REF(expires), NTHTAG_ERROR_MSG_REF(error_msg), NTHTAG_MCLASS_REF(mclass), NTHTAG_MFLAGS_REF(mflags), NTHTAG_STREAMING_REF(streaming), NTHTAG_PROXY_REF(proxy), TAG_END()); if (n > 0) { if (proxy->us_url != he->he_default_proxy) { url_t *copy = url_hdup(he->he_home, proxy->us_url); if (proxy && !copy) { n = -1; } else { su_free(he->he_home, (void *) he->he_default_proxy); he->he_default_proxy = copy; } } } if (n > 0) { he->he_expires = expires; he->he_error_msg = error_msg != 0; if (mclass) he->he_mclass = mclass; else he->he_mclass = http_default_mclass(); he->he_mflags = mflags; he->he_streaming = streaming != 0; } ta_end(ta); return n; } int nth_engine_get_params(nth_engine_t const *he, tag_type_t tag, tag_value_t value, ...) { int n; ta_list ta; msg_mclass_t const *mclass; if (he == NULL) return (errno = EINVAL), -1; if (he->he_mclass != http_default_mclass()) mclass = he->he_mclass; else mclass = NULL; ta_start(ta, tag, value); n = tl_tgets(ta_args(ta), NTHTAG_ERROR_MSG(he->he_error_msg), NTHTAG_MCLASS(mclass), NTHTAG_MFLAGS(he->he_mflags), NTHTAG_EXPIRES(he->he_expires), NTHTAG_STREAMING(he->he_streaming), NTHTAG_PROXY((url_string_t *) he->he_default_proxy), TAG_END()); ta_end(ta); return n; } int nth_engine_get_stats(nth_engine_t const *he, tag_type_t tag, tag_value_t value, ...) { int n; ta_list ta; if (he == NULL) return (errno = EINVAL), -1; ta_start(ta, tag, value); n = tl_tgets(ta_args(ta), TAG_END()); ta_end(ta); return n; } static tp_name_t he_name[1] = { {"*", "*", "*", "*"} }; static char const *const he_tports[] = { "tcp", "tls", NULL }; static tp_stack_class_t http_client_class[1] = { { sizeof(http_client_class), he_recv_message, he_tp_error, he_msg_create} }; /** Create transports for client engine */ static int he_create_tports(nth_engine_t * he, tagi_t *tags) { he->he_tports = tport_tcreate(he, http_client_class, he->he_root, TAG_END()); if (!he->he_tports) return -1; return tport_tbind(he->he_tports, he_name, he_tports, TPTAG_SERVER(0), TAG_NEXT(tags)); } /** Initialize engine timer. */ static int he_timer_init(nth_engine_t * he) { he->he_timer = su_timer_create(su_root_task(he->he_root), HE_TIMER); return su_timer_set(he->he_timer, he_timer, he); } /** * Engine timer routine. */ static void he_timer(su_root_magic_t *rm, su_timer_t *timer, nth_engine_t * he) { unsigned i; uint32_t now; hc_htable_t *hct = he->he_clients; now = su_time_ms(su_now()); now += now == 0; he->he_now = now; if (hct) for (i = hct->hct_size; i > 0;) if (hct->hct_table[--i]) hc_timer(he, hct->hct_table[i], now); su_timer_set(timer, he_timer, he); he->he_now = 0; } /** Get current timestamp in milliseconds */ static uint32_t he_now(nth_engine_t const *he) { if (he->he_now) return he->he_now; else return su_time_ms(su_now()); } static void he_recv_message(nth_engine_t * he, tport_t * tport, msg_t *msg, void *arg, su_time_t now) { nth_client_t *hc, **hcp; tp_name_t const *tpn; for (hcp = hc_htable_hash(he->he_clients, (hash_value_t)(uintptr_t) tport); (hc = *hcp); hcp = hc_htable_next(he->he_clients, hcp)) { if (hc->hc_tport == tport) { if (hc_recv(hc, msg, http_object(msg)) < 0) msg_destroy(msg); return; } } /* Extra response? Framing error? */ tpn = tport_name(tport); if (msg_size(msg)) SU_DEBUG_3(("nth client: received extra data ("MOD_ZU" bytes) " "from %s/%s:%s\n", (size_t)msg_size(msg), tpn->tpn_proto, tpn->tpn_host, tpn->tpn_port)); else SU_DEBUG_3(("nth client: received extra data from %s/%s:%s\n", tpn->tpn_proto, tpn->tpn_host, tpn->tpn_port)); msg_destroy(msg); tport_shutdown(tport, 2); } /** Report error from transport */ static void he_tp_error(nth_engine_t * he, tport_t * tport, int errcode, char const *remote) { su_log("\nth: tport: %s%s%s\n", remote ? remote : "", remote ? ": " : "", su_strerror(errcode)); } /** Create a new message. */ msg_t *nth_engine_msg_create(nth_engine_t * he, int flags) { if (he == NULL) { errno = EINVAL; return NULL; } flags |= he->he_mflags; if (he->he_streaming) flags |= MSG_FLG_STREAMING; else flags &= ~MSG_FLG_STREAMING; return msg_create(he->he_mclass, flags); } /** Create a new message for transport */ static msg_t *he_msg_create(nth_engine_t * he, int flags, char const data[], usize_t dlen, tport_t const *tport, nth_client_t * hc) { flags |= he->he_mflags; if (he->he_streaming) flags |= MSG_FLG_STREAMING; else flags &= ~MSG_FLG_STREAMING; if (hc == NULL) { nth_client_t **slot; for (slot = hc_htable_hash(he->he_clients, (hash_value_t)(uintptr_t) tport); (hc = *slot); slot = hc_htable_next(he->he_clients, slot)) if (hc->hc_tport == tport) break; } if (hc) { if (hc->hc_method == http_method_head) { flags &= ~MSG_FLG_STREAMING; flags |= HTTP_FLG_NO_BODY; } } return msg_create(he->he_mclass, flags); } /** Get destination name from Host header and request URI. */ static int tpn_by_host(tp_name_t * tpn, http_host_t const *h, url_t const *url) { if (!h || !url) return -1; tpn->tpn_proto = url_tport_default(url->url_type); tpn->tpn_canon = h->h_host; tpn->tpn_host = h->h_host; if (h->h_port) tpn->tpn_port = h->h_port; else tpn->tpn_port = url_port_default(url->url_type); return 0; } /* ---------------------------------------------------------------------- */ nth_client_t *nth_client_tcreate(nth_engine_t * engine, nth_response_f * callback, nth_client_magic_t * magic, http_method_t method, char const *name, url_string_t const *uri, tag_type_t tag, tag_value_t value, ...) { nth_client_t *hc = NULL; ta_list ta; if (engine) { void *none = &none; msg_t *msg = none; http_t *http; char const *version = http_version_1_1; nth_client_t const *template = NULL; auth_client_t **auc = none; unsigned expires = engine->he_expires; int ok = 0; ta_start(ta, tag, value); tl_gets(ta_args(ta), NTHTAG_TEMPLATE_REF(template), NTHTAG_AUTHENTICATION_REF(auc), NTHTAG_MESSAGE_REF(msg), NTHTAG_EXPIRES_REF(expires), HTTPTAG_VERSION_REF(version), TAG_END()); if (msg == none) { if (template && template->hc_request) msg = msg_copy(template->hc_request); else msg = msg_create(engine->he_mclass, engine->he_mflags); } http = http_object(msg); if (template) { if (callback == NULL) callback = template->hc_callback; if (magic == NULL) magic = template->hc_magic; if (name == NULL) method = template->hc_method, name = template->hc_method_name; if (uri == NULL) uri = (url_string_t *) template->hc_url; if (auc == none) auc = template->hc_auc; } else if (auc == none) { auc = NULL; } hc = hc_create(engine, callback, magic, msg, ta_tags(ta)); if (hc) hc->hc_expires = expires; if (hc == NULL) ; else if (http_add_tl(msg, http, ta_tags(ta)) < 0) ; else if (!(uri = hc_request_complete(hc, msg, http, method, name, uri, version, nth_client_url(template)))) ; else if (auc && hc_request_authenticate(hc, msg, http, uri, auc) <= 0) ; else if (hc_resolve_and_send(hc) < 0) ; else ok = 1; if (!ok) { if (hc) hc_free(hc); else msg_destroy(msg); hc = NULL; } ta_end(ta); } return hc; } static url_string_t const *hc_request_complete(nth_client_t * hc, msg_t *msg, http_t * http, http_method_t method, char const *name, url_string_t const *uri, char const *version, url_t const *parent) { su_home_t *home = msg_home(msg); http_host_t *host = http->http_host; void *tbf = NULL; url_t const *url; url_t u[1]; if (uri == NULL && http->http_request) uri = (url_string_t *) http->http_request->rq_url; if (uri == NULL) uri = (url_string_t *) parent; url = url_string_p(uri) ? (tbf = url_hdup(NULL, uri->us_url)) : uri->us_url; if (!url) return NULL; *u = *url; if (u->url_type == url_unknown && u->url_path && !u->url_host) { if (parent) { *u = *parent; u->url_path = url->url_path; /* XXX - relative URLs! */ u->url_params = url->url_params; u->url_headers = url->url_headers; /* Query */ } } if (!hc->hc_route_url && u->url_type != url_http && u->url_type != url_https) hc->hc_route_url = (url_string_t *) u; if (host && (host_cmp(host->h_host, u->url_host) || str0cmp(host->h_port, u->url_port))) host = NULL; if (host == NULL && u->url_host) { host = http_host_create(home, u->url_host, u->url_port); msg_header_insert(msg, http, (http_header_t *) host); } if (u->url_host || hc->hc_route_url || host) hc->hc_url = url_hdup(home, u); if (hc->hc_route_url == (url_string_t *) u) hc->hc_route_url = (url_string_t *) hc->hc_url; if (hc->hc_url) { http_request_t *rq = http->http_request; if (rq && !method && !name) method = rq->rq_method, name = rq->rq_method_name; else if (rq && method && method != rq->rq_method) rq = NULL; else if (rq && name && strcmp(name, rq->rq_method_name)) rq = NULL; if (rq && version && strcasecmp(version, rq->rq_version)) rq = NULL; if (!hc->hc_route_url) { u->url_type = url_unknown, u->url_scheme = NULL; u->url_user = NULL, u->url_password = NULL; u->url_host = NULL, u->url_port = NULL; u->url_root = '/'; if (!u->url_path) u->url_path = ""; u->url_fragment = NULL; } if (rq && http_url_cmp(u, rq->rq_url)) rq = NULL; if (!rq) { if (http->http_request) msg_header_remove(msg, http, (msg_header_t *) http->http_request); http->http_request = http_request_create(home, method, name, (url_string_t *) u, version); if (!http->http_request) uri = NULL; } } else { uri = NULL; } if (http_message_complete(msg, http) < 0) uri = NULL; if (tbf) su_free(NULL, tbf); if (uri) { hc->hc_method = http->http_request->rq_method; hc->hc_method_name = http->http_request->rq_method_name; } return uri; } static int hc_request_authenticate(nth_client_t * hc, msg_t *msg, http_t * http, url_string_t const *uri, auth_client_t **auc) { return auc_authorization(auc, msg, http, http->http_request->rq_method_name, uri->us_url, http->http_payload); } static nth_client_t *hc_create(nth_engine_t * he, nth_response_f * callback, nth_client_magic_t * magic, msg_t *msg, tag_type_t tag, tag_value_t value, ...) { nth_client_t *hc; su_home_t *home = msg_home(msg); if (!(hc = su_zalloc(he->he_home, sizeof(*hc)))) return NULL; if (!callback) callback = hc_default_cb; { int error_msg = he->he_error_msg; int streaming = he->he_streaming; url_string_t const *route_url = NULL; ta_list ta; ta_start(ta, tag, value); route_url = (url_string_t *) he->he_default_proxy; tl_gets(ta_args(ta), NTHTAG_PROXY_REF(route_url), NTHTAG_ERROR_MSG_REF(error_msg), NTHTAG_STREAMING_REF(streaming), TAG_END()); hc->hc_engine = he; hc->hc_callback = callback; hc->hc_magic = magic; hc->hc_tags = tl_afilter(home, tport_tags, ta_args(ta)); hc->hc_error_msg = error_msg; hc->hc_streaming = streaming; hc->hc_route_url = route_url; ta_end(ta); } hc->hc_request = msg; return hc; } static int hc_resolve_and_send(nth_client_t * hc) { msg_t *msg = hc->hc_request; http_t *http = http_object(msg); su_home_t *home = msg_home(msg); int resolved = -1; if (hc->hc_route_url) { resolved = tport_name_by_url(home, hc->hc_tpn, hc->hc_route_url); } else { resolved = tpn_by_host(hc->hc_tpn, http->http_host, hc->hc_url); } if (resolved < 0) { SU_DEBUG_3(("nth client resolve: %s\n", "cannot resolve URL")); return -1; } hc->hc_route_url = NULL; hc->hc_tport = tport_by_name(hc->hc_engine->he_tports, hc->hc_tpn); if (!hc->hc_tport) { assert(hc->hc_tport); SU_DEBUG_3(("nth client create: %s\n", !hc->hc_tport ? "no transport" : "invalid message")); return -1; } if (msg_serialize(msg, http) < 0) { assert(hc->hc_tport); SU_DEBUG_3(("nth client create: invalid message")); return -1; } hc_send(hc); hc_insert(hc->hc_engine, hc); return 0; } /**@internal * Insert client request to the hash table */ static void hc_insert(nth_engine_t * he, nth_client_t * hc) { if (hc_htable_is_full(he->he_clients)) hc_htable_resize(he->he_home, he->he_clients, 0); hc_htable_insert(he->he_clients, hc); hc->hc_inserted = 1; } /**@internal * Remove client request from the hash table */ static void hc_remove(nth_engine_t * he, nth_client_t * hc) { if (hc->hc_inserted) hc_htable_remove(he->he_clients, hc); hc->hc_inserted = 0; } /** Destroy client request. */ void nth_client_destroy(nth_client_t * hc) { if (hc == NULL) ; else if (hc->hc_completed) hc_free(hc); else hc->hc_callback = hc_default_cb; } /**@internal Free client request. */ void hc_free(nth_client_t * hc) { if (hc) { if (hc->hc_pending) tport_release(hc->hc_tport, hc->hc_pending, hc->hc_request, NULL, hc, 0); tport_decref(&hc->hc_tport); msg_destroy(hc->hc_request); msg_destroy(hc->hc_response); su_free(hc->hc_engine->he_home, hc); } } /** * Gets client status. * * @param hc pointer to a nth client object * * @return * Returns the status code from the response message if it has been * received. A status code below 100 indicates that no response has been * received. If request timeouts, the connection is closed and the status * code is set to 408. If @a hc is NULL, returns 400 (Bad Request). */ int nth_client_status(nth_client_t const *hc) { return hc ? hc->hc_status : 400; } /** * Gets client method. * * @param hc pointer to a nth client object * * @return * Returns the HTTP method from the request. * If @a hc is NULL, returns #http_method_invalid. */ http_method_t nth_client_method(nth_client_t const *hc) { return hc ? hc->hc_method : http_method_invalid; } /** Get original Request-URI */ url_t const *nth_client_url(nth_client_t const *hc) { return hc ? hc->hc_url : NULL; } /** Get request message. */ msg_t *nth_client_request(nth_client_t * hc) { msg_t *request = NULL; if (hc) request = hc->hc_request, hc->hc_request = NULL; return request; } /** Get response message. */ msg_t *nth_client_response(nth_client_t const *hc) { if (hc) return msg_ref_create(hc->hc_response); else return NULL; } /** Is client streaming response? */ int nth_client_is_streaming(nth_client_t const *hc) { return hc && hc->hc_is_streaming; } /** Send request. */ static nth_client_t *hc_send(nth_client_t * hc) { nth_engine_t *he = hc->hc_engine; tport_t *tp; he->he_stats->st_requests++; tp = tport_tsend(hc->hc_tport, hc->hc_request, hc->hc_tpn, TAG_NEXT(hc->hc_tags)); if (tp == NULL) { he->he_stats->st_tp_errors++; hc_reply(hc, HTTP_503_NO_SERVICE); return hc; } hc->hc_tport = tport_incref(tp); hc->hc_pending = tport_pend(tp, hc->hc_request, hc_tport_error, hc); if (hc->hc_pending == -1) hc->hc_pending = 0; if (hc->hc_expires) { hc->hc_timeout = he_now(he) + hc->hc_expires; /* XXX */ if (hc->hc_timeout == 0) hc->hc_timeout++; } return hc; } /** @internal Report transport errors. */ void hc_tport_error(nth_engine_t * he, nth_client_t * hc, tport_t * tp, msg_t *msg, int error) { su_sockaddr_t const *su = msg_addr(msg); tp_name_t const *tpn = tp ? tport_name(tp) : hc->hc_tpn; char addr[SU_ADDRSIZE]; char const *errmsg; if (error) errmsg = su_strerror(error); else errmsg = "Remote end closed connection"; su_log("nth: %s: %s (%u) with %s@%s:%u\n", hc->hc_method_name, errmsg, error, tpn->tpn_proto, inet_ntop(su->su_family, SU_ADDR(su), addr, sizeof(addr)), htons(su->su_port)); he->he_stats->st_tp_errors++; hc_reply(hc, HTTP_503_NO_SERVICE); } static void hc_delayed_recv(su_root_magic_t *rm, su_msg_r msg, union sm_arg_u *u); /** Respond internally to a transaction. */ int hc_reply(nth_client_t * hc, int status, char const *phrase) { nth_engine_t *he = hc->hc_engine; msg_t *msg = NULL; http_t *http = NULL; assert(status >= 400); SU_DEBUG_5(("nth: hc_reply(%p, %u, %s)\n", hc, status, phrase)); if (hc->hc_pending) { tport_release(hc->hc_tport, hc->hc_pending, hc->hc_request, NULL, hc, status < 200); if (status >= 200) hc->hc_pending = 0; } tport_shutdown(hc->hc_tport, 2); hc->hc_completed = 1; hc->hc_timeout = 0; if (hc->hc_callback == hc_default_cb) { hc_free(hc); return 0; } /* Create response message, if needed */ if (hc->hc_error_msg) { msg = he_msg_create(he, NTH_INTERNAL_MSG, NULL, 0, NULL, hc); http = http_object(msg); http_complete_response(msg, status, phrase, http_object(hc->hc_request)); } else hc->hc_status = status; if (hc->hc_inserted) { hc_recv(hc, msg, http); return 0; } else { /* * The thread creating outgoing transaction must return to application * before transaction callback can be invoked. Processing an internally * generated response message must be delayed until transaction creation * is completed. * * The internally generated message is transmitted using su_msg_send() * and it is delivered back to NTA when the application next time * executes the su_root_t event loop. */ su_root_t *root = he->he_root; su_msg_r su_msg = SU_MSG_R_INIT; if (su_msg_create(su_msg, su_root_task(root), su_root_task(root), hc_delayed_recv, sizeof(struct hc_recv_s)) == SU_SUCCESS) { struct hc_recv_s *a = su_msg_data(su_msg)->hc_recv; a->hc = hc; a->msg = msg; a->http = http; if (su_msg_send(su_msg) == SU_SUCCESS) return 0; } } if (msg) msg_destroy(msg); return -1; } static void hc_delayed_recv(su_root_magic_t *rm, su_msg_r msg, union sm_arg_u *u) { struct hc_recv_s *a = u->hc_recv; if (hc_recv(a->hc, a->msg, a->http) < 0 && a->msg) msg_destroy(a->msg); } /** Receive response to transaction. */ int hc_recv(nth_client_t * hc, msg_t *msg, http_t * http) { short status; int streaming = msg_is_streaming(msg); int shutdown = 0; if (http && http->http_status) { status = http->http_status->st_status; if (status < 100) status = 100; if (streaming && !hc->hc_streaming) { /* Disable streaming for this msg */ msg_set_streaming(msg, 0); return 0; /* Wait for complete message */ } hc->hc_status = status; } else if (http) status = hc->hc_status = 500, streaming = 0, http = NULL; else status = hc->hc_status, streaming = 0; if (status == 400 || (http && (http->http_flags & MSG_FLG_ERROR))) shutdown = 2; if (!streaming || shutdown) msg_set_streaming(msg, 0); if (hc->hc_pending) { tport_release(hc->hc_tport, hc->hc_pending, hc->hc_request, msg, hc, streaming || status < 200); if (!streaming && status >= 200) hc->hc_pending = 0; } if (!streaming && status >= 200) { /* Completed. */ hc->hc_completed = 1; hc_remove(hc->hc_engine, hc); if (shutdown || !http || (http->http_status->st_version == http_version_1_1 && http->http_connection && msg_params_find(http->http_connection->k_items, "close")) || (http->http_status->st_version == http_version_1_0)) shutdown = 2; } if (shutdown) { if (status < 200) status = 400; tport_shutdown(hc->hc_tport, shutdown); } if (msg_is_complete(msg)) { if (status < 200) hc->hc_engine->he_stats->st_1xxresponses++; else hc->hc_engine->he_stats->st_responses++; } if (hc->hc_response) msg_destroy(hc->hc_response); hc->hc_response = msg; hc->hc_is_streaming = streaming; /* Call callback */ hc->hc_callback(hc->hc_magic, hc, http); return 0; } /** @internal Default callback for request */ int hc_default_cb(nth_client_magic_t * magic, nth_client_t * hc, http_t const *http) { if (http == NULL || http->http_status->st_status >= 200) hc_free(hc); return 0; } /** @internal Client transaction timer routine. */ static void hc_timer(nth_engine_t * he, nth_client_t * hc, uint32_t now) { if (hc->hc_timeout == 0) return; if ((int)hc->hc_timeout - (int)now > 0) return; hc_reply(hc, HTTP_408_TIMEOUT); }