11 - LTTng over RelayFS transport
patch-2.6.17-lttng-core-0.5.108-transport.diff
OpenPGP public key: http://krystal.dyndns.org:8080/key/compudj.gpg
Key fingerprint: 8CD5 52C3 8E3C 4140 715F BA06 3F25 A8FE 3BAE 9A68
--- /dev/null
+++ b/ltt/ltt-relay.c
@@ -0,0 +1,1189 @@
+/*
+ * ltt-core.c
+ *
+ * (C) Copyright 2005-2006 - Mathieu Desnoyers ([email protected])
+ *
+ * Contains the kernel code for the Linux Trace Toolkit.
+ *
+ * Author:
+ * Mathieu Desnoyers ([email protected])
+ *
+ * Inspired from LTT :
+ * Karim Yaghmour ([email protected])
+ * Tom Zanussi ([email protected])
+ * Bob Wisniewski ([email protected])
+ * And from K42 :
+ * Bob Wisniewski ([email protected])
+ *
+ * Changelog:
+ * 19/10/05, Complete lockless mechanism. (Mathieu Desnoyers)
+ * 27/05/05, Modular redesign and rewrite. (Mathieu Desnoyers)
+
+ * Comments :
+ * num_active_traces protects the functors. Changing the pointer is an atomic
+ * operation, but the functions can only be called when in tracing. It is then
+ * safe to unload a module in which sits a functor when no tracing is active.
+ *
+ * filter_control functor is protected by incrementing its module refcount.
+ *
+ *
+ */
+
+#include <linux/config.h>
+#include <linux/time.h>
+#include <linux/ltt-core.h>
+#include <linux/relay.h>
+#include <linux/module.h>
+#include <linux/string.h>
+#include <linux/slab.h>
+#include <linux/init.h>
+#include <linux/ltt-facilities.h>
+#include <linux/rcupdate.h>
+#include <linux/sched.h>
+#include <linux/bitops.h>
+#include <linux/fs.h>
+#include <linux/smp_lock.h>
+#include <asm/atomic.h>
+
+static struct dentry *ltt_root_dentry;
+
+/* How a force_switch must be done ?
+ *
+ * Is it done during tracing or as a final flush after tracing
+ * (so it won't write in the new sub-buffer).
+ */
+enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH };
+
+
+/* Trace callbacks */
+
+static void ltt_buffer_begin_callback(struct rchan_buf *buf,
+ u64 tsc, unsigned int subbuf_idx)
+{
+ struct ltt_channel_struct *channel =
+ (struct ltt_channel_struct*)buf->chan->client_data;
+ struct ltt_block_start_header *header =
+ (struct ltt_block_start_header*)
+ (buf->start + (subbuf_idx*buf->chan->subbuf_size));
+
+ header->begin.cycle_count = tsc;
+ header->begin.freq = ltt_frequency();
+ header->lost_size = 0xFFFFFFFF; // for debugging...
+ header->buf_size = buf->chan->subbuf_size;
+ ltt_write_trace_header(channel->trace, &header->trace);
+}
+
+static void ltt_buffer_end_callback(struct rchan_buf *buf,
+ u64 tsc, unsigned int offset, unsigned int subbuf_idx)
+{
+ struct ltt_block_start_header *header =
+ (struct ltt_block_start_header*)
+ (buf->start + (subbuf_idx*buf->chan->subbuf_size));
+
+ /* offset is assumed to never be 0 here : never deliver a completely
+ * empty subbuffer.
+ * The lost size is between 0 and subbuf_size-1 */
+ header->lost_size = SUBBUF_OFFSET((buf->chan->subbuf_size - offset),
+ buf->chan);
+ header->end.cycle_count = tsc;
+ header->end.freq = ltt_frequency();
+}
+
+static void ltt_deliver_callback(struct rchan_buf *buf,
+ unsigned subbuf_idx,
+ void *subbuf)
+{
+ struct ltt_channel_struct *channel =
+ (struct ltt_channel_struct*)buf->chan->client_data;
+ struct ltt_channel_buf_struct *ltt_buf = &channel->buf[buf->cpu];
+
+ atomic_set(<t_buf->wakeup_readers, 1);
+}
+
+static void ltt_buf_mapped_callback(struct rchan_buf *buf,
+ struct file *filp)
+{
+}
+
+static void ltt_buf_unmapped_callback(struct rchan_buf *buf,
+ struct file *filp)
+{
+}
+
+static void ltt_buf_full_callback(struct rchan_buf *buf,
+ unsigned subbuf_idx,
+ void *subbuf)
+{
+}
+
+/* This callback should not be called from NMI interrupt context */
+static void ltt_buf_unfull_callback(struct rchan_buf *buf,
+ unsigned subbuf_idx,
+ void *subbuf)
+{
+ struct ltt_channel_struct *ltt_channel =
+ (struct ltt_channel_struct*)buf->chan->client_data;
+ struct ltt_channel_buf_struct *ltt_buf = <t_channel->buf[buf->cpu];
+ if (waitqueue_active(<t_buf->write_wait))
+ schedule_work(<t_buf->wake_writers);
+}
+
+
+/**
+ * ltt_poll - poll file op for ltt files
+ * @filp: the file
+ * @wait: poll table
+ *
+ * Poll implemention.
+ */
+static unsigned int ltt_poll(struct file *filp, poll_table *wait)
+{
+ unsigned int mask = 0;
+ struct inode *inode = filp->f_dentry->d_inode;
+ struct rchan_buf *buf = inode->u.generic_ip;
+ struct ltt_channel_struct *ltt_channel =
+ (struct ltt_channel_struct*)buf->chan->client_data;
+ struct ltt_channel_buf_struct *ltt_buf = <t_channel->buf[buf->cpu];
+
+ //printk(KERN_DEBUG "DEBUG : in LTT poll %p\n", filp);
+ if (filp->f_mode & FMODE_READ) {
+ poll_wait(filp, &buf->read_wait, wait);
+
+ if (atomic_read(<t_buf->active_readers) != 0) {
+ return 0;
+ } else {
+ if (SUBBUF_TRUNC(
+ atomic_read(<t_buf->offset), buf->chan)
+ - SUBBUF_TRUNC(
+ atomic_read(<t_buf->consumed), buf->chan)
+ == 0) {
+ if (buf->finalized) return POLLHUP;
+ else return 0;
+ } else {
+ struct rchan *rchan = ltt_channel->trans_channel_data;
+ if (SUBBUF_TRUNC(
+ atomic_read(<t_buf->offset), buf->chan)
+ - SUBBUF_TRUNC(
+ atomic_read(<t_buf->consumed),
+ buf->chan)
+ >= rchan->alloc_size)
+ return POLLPRI | POLLRDBAND;
+ else
+ return POLLIN | POLLRDNORM;
+ }
+ }
+ }
+ return mask;
+}
+
+
+/**
+ * ltt_ioctl - ioctl control on the relayfs file
+ *
+ * @inode: the inode
+ * @filp: the file
+ * @cmd: the command
+ * @arg: command arg
+ *
+ * This ioctl implements three commands necessary for a minimal
+ * producer/consumer implementation :
+ * RELAYFS_GET_SUBBUF
+ * Get the next sub buffer that can be read. It never blocks.
+ * RELAYFS_PUT_SUBBUF
+ * Release the currently read sub-buffer. Parameter is the last
+ * put subbuffer (returned by GET_SUBBUF).
+ * RELAYFS_GET_N_BUBBUFS
+ * returns the number of sub buffers in the per cpu channel.
+ * RELAYFS_GET_SUBBUF_SIZE
+ * returns the size of the sub buffers.
+ *
+ */
+static int ltt_ioctl(struct inode *inode, struct file *filp,
+ unsigned int cmd, unsigned long arg)
+{
+ struct rchan_buf *buf = inode->u.generic_ip;
+ struct ltt_channel_struct *ltt_channel =
+ (struct ltt_channel_struct*)buf->chan->client_data;
+ struct ltt_channel_buf_struct *ltt_buf = <t_channel->buf[buf->cpu];
+ u32 __user *argp = (u32 __user *)arg;
+
+ switch(cmd) {
+ case RELAYFS_GET_SUBBUF:
+ {
+ unsigned int consumed_old, consumed_idx;
+ atomic_inc(<t_buf->active_readers);
+ consumed_old = atomic_read(<t_buf->consumed);
+ consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
+ if (atomic_read(<t_buf->commit_count[consumed_idx])
+ != atomic_read(<t_buf->reserve_count[consumed_idx])) {
+ atomic_dec(<t_buf->active_readers);
+ return -EAGAIN;
+ }
+ if ((SUBBUF_TRUNC(
+ atomic_read(<t_buf->offset), buf->chan)
+ - SUBBUF_TRUNC(consumed_old, buf->chan))
+ == 0) {
+ atomic_dec(<t_buf->active_readers);
+ return -EAGAIN;
+ }
+ //printk(KERN_DEBUG "LTT ioctl get subbuf %d\n",
+ // consumed_old);
+ return put_user((u32)consumed_old, argp);
+ break;
+ }
+ case RELAYFS_PUT_SUBBUF:
+ {
+ u32 consumed_old;
+ int ret;
+ unsigned int consumed_new;
+
+ ret = get_user(consumed_old, argp);
+ if (ret) return ret; /* will return -EFAULT */
+
+ //printk(KERN_DEBUG "LTT ioctl put subbuf %d\n",
+ // consumed_old);
+ consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
+ spin_lock(<t_buf->full_lock);
+ if (atomic_cmpxchg(
+ <t_buf->consumed, consumed_old, consumed_new)
+ != consumed_old) {
+ /* We have been pushed by the writer : the last
+ * buffer read _is_ corrupted! It can also
+ * happen if this is a buffer we never got. */
+ atomic_dec(<t_buf->active_readers);
+ spin_unlock(<t_buf->full_lock);
+ return -EIO;
+ } else {
+ /* tell the client that buffer is now unfull */
+ int index;
+ void *data;
+ index = SUBBUF_INDEX(consumed_old, buf->chan);
+ data = buf->start +
+ BUFFER_OFFSET(consumed_old, buf->chan);
+ buf->chan->cb->buf_unfull(buf, index, data);
+ atomic_dec(<t_buf->active_readers);
+ spin_unlock(<t_buf->full_lock);
+ }
+ break;
+ }
+ case RELAYFS_GET_N_SUBBUFS:
+ //printk(KERN_DEBUG "LTT ioctl get n subbufs\n");
+ return put_user((u32)buf->chan->n_subbufs, argp);
+ break;
+ case RELAYFS_GET_SUBBUF_SIZE:
+ //printk(KERN_DEBUG "LTT ioctl get subbuf size\n");
+ return put_user((u32)buf->chan->subbuf_size, argp);
+ break;
+ default:
+ return -ENOIOCTLCMD;
+ }
+ return 0;
+}
+
+static struct file_operations ltt_file_operations;
+
+#ifdef CONFIG_COMPAT
+
+static long ltt_compat_ioctl(struct file *file, unsigned cmd, unsigned long arg)
+{
+ long ret = -ENOIOCTLCMD;
+
+ lock_kernel();
+ ret = ltt_ioctl(file->f_dentry->d_inode, file, cmd, arg);
+ unlock_kernel();
+
+ return ret;
+}
+
+#endif //CONFIG_COMPAT
+
+/* Create channel.
+ */
+static int ltt_relay_create_channel(char *trace_name,
+ struct ltt_trace_struct *trace,
+ struct dentry *dir,
+ char *channel_name,
+ struct ltt_channel_struct **ltt_chan,
+ unsigned int subbuf_size, unsigned int n_subbufs,
+ int overwrite)
+{
+ struct rchan *rchan;
+ unsigned int i, j;
+ char *tmpname;
+ int err = 0;
+
+ tmpname = kmalloc(PATH_MAX, GFP_KERNEL);
+ if(!tmpname) return EPERM;
+ if(overwrite) {
+ strncpy(tmpname, LTT_FLIGHT_PREFIX, PATH_MAX-1);
+ strncat(tmpname, channel_name, PATH_MAX-1-sizeof(LTT_FLIGHT_PREFIX));
+ } else {
+ strncpy(tmpname, channel_name, PATH_MAX-1);
+ }
+
+ /* This channel info will be freed by relayfs if the relay_open
+ * succeed.
+ * FIXME : in one error path, it will be freed by the kref_put */
+ *ltt_chan = kmalloc(sizeof(struct ltt_channel_struct), GFP_KERNEL);
+ if(!(*ltt_chan)) goto ltt_chan_alloc_error;
+
+ kref_get(&trace->kref);
+
+ (*ltt_chan)->trace = trace;
+ (*ltt_chan)->trans_channel_data = relay_open(tmpname,
+ dir,
+ subbuf_size,
+ n_subbufs,
+ overwrite,
+ THIS_MODULE,
+ &trace->callbacks,
+ <t_file_operations,
+ *ltt_chan,
+ ltt_channel_destroy);
+
+ if ((*ltt_chan)->trans_channel_data == NULL) {
+ printk(KERN_ERR "LTT : Can't open %s channel for trace %s\n",
+ tmpname, trace_name);
+ goto relay_open_error;
+ }
+
+ (*ltt_chan)->buffer_begin = ltt_buffer_begin_callback;
+ (*ltt_chan)->buffer_end = ltt_buffer_end_callback;
+ rchan = (*ltt_chan)->trans_channel_data;
+
+ strncpy((*ltt_chan)->channel_name, tmpname, PATH_MAX-1);
+
+ for(i=0;i<NR_CPUS;i++) {
+ atomic_set(&(*ltt_chan)->buf[i].offset,
+ ltt_subbuf_header_len());
+ atomic_set(&(*ltt_chan)->buf[i].consumed, 0);
+ atomic_set(&(*ltt_chan)->buf[i].active_readers, 0);
+ (*ltt_chan)->buf[i].reserve_count =
+ kmalloc(sizeof(atomic_t) * n_subbufs, GFP_KERNEL);
+ for(j=0; j<n_subbufs; j++)
+ atomic_set(&(*ltt_chan)->buf[i].reserve_count[j], 0);
+ (*ltt_chan)->buf[i].commit_count =
+ kmalloc(sizeof(atomic_t) * n_subbufs, GFP_KERNEL);
+ for(j=0; j<n_subbufs; j++)
+ atomic_set(&(*ltt_chan)->buf[i].commit_count[j], 0);
+ init_waitqueue_head(&(*ltt_chan)->buf[i].write_wait);
+ atomic_set(&(*ltt_chan)->buf[i].wakeup_readers, 0);
+ INIT_WORK(&(*ltt_chan)->buf[i].wake_writers,
+ ltt_wakeup_writers, &(*ltt_chan)->buf[i]);
+ spin_lock_init(&(*ltt_chan)->buf[i].full_lock);
+ /* Tracing not started yet :
+ * we don't deal with concurrency (no delivery) */
+ if (!rchan->buf[i])
+ continue;
+
+ ltt_buffer_begin_callback(rchan->buf[i], trace->start_tsc, 0);
+ atomic_add(ltt_subbuf_header_len(),
+ &(*ltt_chan)->buf[i].commit_count[0]);
+ }
+ err = 0;
+ goto end;
+
+relay_open_error:
+ ltt_channel_destroy(*ltt_chan);
+ltt_chan_alloc_error:
+ err = EPERM;
+end:
+ kfree(tmpname);
+ return err;
+}
+
+static int ltt_relay_create_dirs(struct ltt_trace_struct *new_trace)
+{
+ new_trace->dentry.trace_root = relayfs_create_dir(new_trace->trace_name,
+ ltt_root_dentry);
+ if (new_trace->dentry.trace_root == NULL) {
+ printk(KERN_ERR "LTT : Trace directory name %s already taken\n",
+ new_trace->trace_name);
+ return -EEXIST;
+ }
+
+ new_trace->dentry.control_root = relayfs_create_dir(LTT_CONTROL_ROOT,
+ new_trace->dentry.trace_root);
+ if (new_trace->dentry.control_root == NULL) {
+ printk(KERN_ERR "LTT : Trace control subdirectory name "\
+ "%s/%s already taken\n",
+ new_trace->trace_name, LTT_CONTROL_ROOT);
+ relayfs_remove_dir(new_trace->dentry.trace_root);
+ return -EEXIST;
+ }
+
+ //new_trace->callbacks.subbuf_start = ltt_subbuf_start_callback;
+ new_trace->callbacks.deliver = ltt_deliver_callback;
+ new_trace->callbacks.buf_mapped = ltt_buf_mapped_callback;
+ new_trace->callbacks.buf_unmapped = ltt_buf_unmapped_callback;
+ new_trace->callbacks.buf_full = ltt_buf_full_callback;
+ new_trace->callbacks.buf_unfull = ltt_buf_unfull_callback;
+
+ return 0;
+}
+
+static void ltt_relay_remove_dirs(struct ltt_trace_struct *trace)
+{
+ int err;
+
+ err = relayfs_remove_dir(trace->dentry.control_root);
+ WARN_ON(err);
+ err = relayfs_remove_dir(trace->dentry.trace_root);
+ WARN_ON(err);
+}
+
+/* Force a sub-buffer switch for a per-cpu buffer. This operation is
+ * completely reentrant : can be called while tracing is active with
+ * absolutely no lock held.
+ */
+static void ltt_force_switch(struct rchan_buf *buf, enum force_switch_mode mode)
+{
+ struct ltt_channel_struct *ltt_channel =
+ (struct ltt_channel_struct*)buf->chan->client_data;
+ struct ltt_channel_buf_struct *ltt_buf = <t_channel->buf[buf->cpu];
+ struct rchan *rchan = ltt_channel->trans_channel_data;
+
+ u64 tsc;
+ int offset_begin, offset_end, offset_old;
+ int reserve_commit_diff;
+ int consumed_old, consumed_new;
+ int commit_count, reserve_count;
+ int end_switch_old;
+
+ do {
+ offset_old = atomic_read(<t_buf->offset);
+ offset_begin = offset_old;
+ end_switch_old = 0;
+
+ if (SUBBUF_OFFSET(offset_begin, buf->chan) != 0) {
+ offset_begin = SUBBUF_ALIGN(offset_begin, buf->chan);
+ end_switch_old = 1;
+ } else {
+ /* we do not have to switch : buffer is empty */
+ return;
+ }
+ if (mode == FORCE_ACTIVE)
+ offset_begin += ltt_subbuf_header_len();
+ /* Always begin_switch in FORCE_ACTIVE mode */
+ /* Test new buffer integrity */
+ reserve_commit_diff = atomic_read(
+ <t_buf->reserve_count[SUBBUF_INDEX(offset_begin,
+ buf->chan)])
+ - atomic_read(
+ <t_buf->commit_count[SUBBUF_INDEX(offset_begin,
+ buf->chan)]);
+ if (reserve_commit_diff == 0) {
+ /* Next buffer not corrupted. */
+ if (mode == FORCE_ACTIVE && !buf->chan->overwrite &&
+ (offset_begin-atomic_read(<t_buf->consumed))
+ >= rchan->alloc_size) {
+ /* We do not overwrite non consumed buffers
+ * and we are full :
+ * ignore switch while tracing is active. */
+ return;
+ }
+ } else {
+ /* Next subbuffer corrupted. Force pushing reader even
+ * in normal mode */
+ }
+ offset_end = offset_begin;
+
+ tsc = ltt_get_timestamp64();
+ if (tsc == 0) {
+ /* Error in getting the timestamp : should not happen :
+ * it would mean we are called from an NMI during a
+ * write seqlock on xtime. */
+ return;
+ }
+ } while(atomic_cmpxchg(<t_buf->offset, offset_old, offset_end)
+ != offset_old);
+
+ if (mode == FORCE_ACTIVE) {
+ /* Push the reader if necessary */
+ do {
+ consumed_old = atomic_read(<t_buf->consumed);
+ /* If buffer is in overwrite mode, push the reader
+ * consumed count if the write position has reached it
+ * and we are not at the first iteration (don't push
+ * the reader farther than the writer). This operation
+ * can be done concurrently by many writers in the same
+ * buffer, the writer being at the fartest write
+ * position sub-buffer index in the buffer being the
+ * one which will win this loop.
+ * If the buffer is not in overwrite mode, pushing the
+ * reader only happen if a sub-buffer is corrupted */
+ if ((SUBBUF_TRUNC(offset_end-1, buf->chan)
+ - SUBBUF_TRUNC(consumed_old,
+ buf->chan))
+ >= rchan->alloc_size)
+ consumed_new =
+ SUBBUF_ALIGN(consumed_old, buf->chan);
+ else {
+ consumed_new = consumed_old;
+ break;
+ }
+ } while(atomic_cmpxchg(<t_buf->consumed, consumed_old,
+ consumed_new)
+ != consumed_old);
+
+ if (consumed_old != consumed_new) {
+ /* Reader pushed : we are the winner of the push, we
+ * can therefore reequilibrate reserve and commit.
+ * Atomic increment of the commit count permits other
+ * writers to play around with this variable before us.
+ * We keep track of corrupted_subbuffers even in
+ * overwrite mode :
+ * we never want to write over a non completely
+ * committed sub-buffer : possible causes : the buffer
+ * size is too low compared to the unordered data input,
+ * or there is a writer who died between the reserve
+ * and the commit. */
+ if (reserve_commit_diff) {
+ /* We have to alter the sub-buffer commit
+ * count : a sub-buffer is corrupted */
+ atomic_add(reserve_commit_diff,
+ <t_buf->commit_count[SUBBUF_INDEX(
+ offset_begin, buf->chan)]);
+ atomic_inc(<t_buf->corrupted_subbuffers);
+ }
+ }
+ }
+
+ /* Always switch */
+ if (end_switch_old) {
+ /* old subbuffer */
+ /* Concurrency safe because we are the last and only thread to
+ * alter this sub-buffer. As long as it is not delivered and
+ * read, no other thread can alter the offset, alter the
+ * reserve_count or call the client_buffer_end_callback on this
+ * sub-buffer. The only remaining threads could be the ones
+ * with pending commits. They will have to do the deliver
+ * themself.
+ * Not concurrency safe in overwrite mode.
+ * We detect corrupted subbuffers with commit and reserve
+ * counts. We keep a corrupted sub-buffers count and push the
+ * readers across these sub-buffers. Not concurrency safe if a
+ * writer is stalled in a subbuffer and another writer switches
+ * in, finding out it's corrupted. The result will be than the
+ * old (uncommited) subbuffer will be declared corrupted, and
+ * that the new subbuffer will be declared corrupted too because
+ * of the commit count adjustment.
+ * Offset old should never be 0. */
+ ltt_channel->buffer_end(buf, tsc, offset_old,
+ SUBBUF_INDEX((offset_old), buf->chan));
+ /* Setting this reserve_count will allow the sub-buffer to be
+ * delivered by the last committer. */
+ reserve_count = atomic_add_return((SUBBUF_OFFSET((offset_old-1),
+ buf->chan) + 1),
+ <t_buf->reserve_count[SUBBUF_INDEX((offset_old),
+ buf->chan)]);
+ if (reserve_count == atomic_read(
+ <t_buf->commit_count[SUBBUF_INDEX(
+ (offset_old), buf->chan)])) {
+ buf->chan->cb->deliver(buf,
+ SUBBUF_INDEX((offset_old), buf->chan), NULL);
+ }
+ }
+
+ if (mode == FORCE_ACTIVE) {
+ /* New sub-buffer */
+ /* This code can be executed unordered : writers may already
+ * have written to the sub-buffer before this code gets
+ * executed, caution. */
+ /* The commit makes sure that this code is executed before the
+ * deliver of this sub-buffer */
+ ltt_channel->buffer_begin(buf, tsc,
+ SUBBUF_INDEX(offset_begin, buf->chan));
+ commit_count =
+ atomic_add_return(ltt_subbuf_header_len(),
+ <t_buf->commit_count[SUBBUF_INDEX(offset_begin,
+ buf->chan)]);
+ /* Check if the written buffer has to be delivered */
+ if (commit_count == atomic_read(
+ <t_buf->reserve_count[SUBBUF_INDEX(offset_begin,
+ buf->chan)])) {
+ buf->chan->cb->deliver(buf,
+ SUBBUF_INDEX(offset_begin, buf->chan), NULL);
+ }
+ }
+}
+
+/* LTTng channel flush function.
+ *
+ * Must be called when no tracing is active in the channel, because of
+ * accesses across CPUs. */
+static void ltt_relay_buffer_flush(struct rchan *chan)
+{
+ unsigned int i;
+ for(i=0;i<NR_CPUS;i++) {
+ if (!chan->buf[i]) continue;
+ chan->buf[i]->finalized = 1;
+ ltt_force_switch(chan->buf[i], FORCE_FLUSH);
+ }
+}
+
+static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct *ltt_chan)
+{
+ struct rchan *rchan = ltt_chan->trans_channel_data;
+ unsigned int i;
+
+ for(i=0; i<NR_CPUS; i++) {
+ if (!rchan->buf[i])
+ continue;
+ if (atomic_read(<t_chan->buf[i].wakeup_readers) == 1) {
+ atomic_set(<t_chan->buf[i].wakeup_readers, 0);
+ wake_up_interruptible(&rchan->buf[i]->read_wait);
+ }
+ }
+}
+
+/* Wake writers :
+ *
+ * This must be done after the trace is removed from the RCU list so that there
+ * are no stalled writers. */
+static void ltt_relay_wake_writers(struct rchan *chan)
+{
+ struct ltt_channel_struct *ltt_channel =
+ (struct ltt_channel_struct*)chan->client_data;
+ struct ltt_channel_buf_struct *ltt_buf;
+ unsigned int i;
+
+ for(i=0;i<NR_CPUS;i++) {
+ if (!chan->buf[i])
+ continue;
+ ltt_buf = <t_channel->buf[i];
+ if (waitqueue_active(<t_buf->write_wait))
+ schedule_work(<t_buf->wake_writers);
+ }
+}
+
+static void ltt_relay_finish_channel(struct ltt_channel_struct *channel)
+{
+ struct rchan *rchan = channel->trans_channel_data;
+#if 0
+ relay_flush(rchan);
+#endif //0
+ ltt_relay_buffer_flush(rchan);
+ ltt_relay_wake_writers(rchan);
+}
+
+static void ltt_relay_remove_channel(struct ltt_channel_struct *channel)
+{
+ struct rchan *rchan = channel->trans_channel_data;
+
+ relay_close(rchan);
+#if 0
+ ltt_channel_destroy(channel);
+#endif //0
+}
+
+/* ltt_relay_reserve_slot
+ *
+ * Atomic slot reservation in a LTTng buffer. It will take care of
+ * sub-buffer switching.
+ *
+ * Parameters:
+ *
+ * @trace : the trace structure to log to.
+ * @buf : the buffer to reserve space into.
+ * @data_size : size of the variable length data to log.
+ * @slot_size : pointer to total size of the slot (out)
+ * @tsc : pointer to the tsc at the slot reservation (out)
+ * @before_hdr_pad : dynamic padding before the event header.
+ * @after_hdr_pad : dynamic padding after the event header.
+ *
+ * Return : NULL if not enough space, else returns the pointer
+ * to the beginning of the reserved slot. */
+static void *ltt_relay_reserve_slot(
+ struct ltt_trace_struct *trace,
+ struct ltt_channel_struct *ltt_channel,
+ void **transport_data,
+ size_t data_size,
+ size_t *slot_size,
+ u64 *tsc,
+ size_t *before_hdr_pad,
+ size_t *after_hdr_pad,
+ size_t *header_size)
+{
+ struct rchan *rchan = ltt_channel->trans_channel_data;
+ struct rchan_buf *buf = *transport_data = rchan->buf[smp_processor_id()];
+ struct ltt_channel_buf_struct *ltt_buf = <t_channel->buf[buf->cpu];
+ int offset_begin, offset_end, offset_old;
+ int begin_switch, end_switch_current, end_switch_old;
+ int reserve_commit_diff = 0;
+ size_t size;
+ int consumed_old, consumed_new;
+ int commit_count, reserve_count;
+
+ if(ltt_nesting[smp_processor_id()] > 4) {
+ // every page fault would lose an event.
+ atomic_inc(<t_buf->events_lost);
+ return NULL;
+ }
+
+ do {
+ offset_old = atomic_read(<t_buf->offset);
+ offset_begin = offset_old;
+ begin_switch = 0;
+ end_switch_current = 0;
+ end_switch_old = 0;
+
+ if(SUBBUF_OFFSET(offset_begin, buf->chan) == 0) {
+ begin_switch = 1; /* For offset_begin */
+ } else {
+ size = ltt_get_header_size(trace,
+ buf->start + offset_begin,
+ before_hdr_pad, after_hdr_pad,
+ header_size) + data_size;
+ if((SUBBUF_OFFSET(offset_begin, buf->chan)+size)
+ > buf->chan->subbuf_size) {
+ end_switch_old = 1; /* For offset_old */
+ begin_switch = 1; /* For offset_begin */
+ }
+ }
+ if(begin_switch) {
+ if(end_switch_old) {
+ offset_begin =
+ SUBBUF_ALIGN(offset_begin, buf->chan);
+ }
+ offset_begin = offset_begin +
+ ltt_subbuf_header_len();
+ /* Test new buffer integrity */
+ reserve_commit_diff =
+ atomic_read(
+ <t_buf->reserve_count[SUBBUF_INDEX(
+ offset_begin,
+ buf->chan)])
+ - atomic_read(
+ <t_buf->commit_count[SUBBUF_INDEX(
+ offset_begin,
+ buf->chan)]);
+ if(reserve_commit_diff == 0) {
+ /* Next buffer not corrupted. */
+ if(!buf->chan->overwrite &&
+ (SUBBUF_TRUNC(offset_begin, buf->chan)
+ - SUBBUF_TRUNC(
+ atomic_read(<t_buf->consumed),
+ buf->chan))
+ >= rchan->alloc_size) {
+ /* We do not overwrite non consumed
+ * buffers and we are full : event
+ * is lost. */
+ atomic_inc(<t_buf->events_lost);
+ return NULL;
+ } else {
+ /* next buffer not corrupted, we are
+ * either in overwrite mode or the
+ * buffer is not full. It's safe to
+ * write in this new subbuffer.*/
+ }
+ } else {
+ /* Next subbuffer corrupted. Force pushing
+ * reader even in normal mode. It's safe to
+ * write in this new subbuffer. */
+ }
+ size = ltt_get_header_size(trace,
+ buf->start + offset_begin,
+ before_hdr_pad, after_hdr_pad,
+ header_size) + data_size;
+ if((SUBBUF_OFFSET(offset_begin,buf->chan) + size)
+ > buf->chan->subbuf_size) {
+ /* Event too big for subbuffers, report error,
+ * don't complete the sub-buffer switch. */
+ atomic_inc(<t_buf->events_lost);
+ return NULL;
+ } else {
+ /* We just made a successful buffer switch and
+ * the event fits in the new subbuffer. Let's
+ * write. */
+ }
+ } else {
+ /* Event fits in the current buffer and we are not on a
+ * switch boundary. It's safe to write */
+ }
+ offset_end = offset_begin + size;
+
+ if((SUBBUF_OFFSET(offset_end, buf->chan)) == 0) {
+ /* The offset_end will fall at the very beginning of
+ * the next subbuffer. */
+ end_switch_current = 1; /* For offset_begin */
+ }
+#ifdef CONFIG_LTT_HEARTBEAT_EVENT
+ if(begin_switch || end_switch_old || end_switch_current) {
+ *tsc = ltt_get_timestamp64();
+ } else {
+ *tsc = ltt_get_timestamp32();
+ }
+#else
+ *tsc = ltt_get_timestamp64();
+#endif //CONFIG_LTT_HEARTBEAT_EVENT
+ if(*tsc == 0) {
+ /* Error in getting the timestamp, event lost */
+ atomic_inc(<t_buf->events_lost);
+ return NULL;
+ }
+
+ } while(atomic_cmpxchg(<t_buf->offset, offset_old, offset_end)
+ != offset_old);
+
+
+ /* Push the reader if necessary */
+ do {
+ consumed_old = atomic_read(<t_buf->consumed);
+ /* If buffer is in overwrite mode, push the reader consumed
+ * count if the write position has reached it and we are not
+ * at the first iteration (don't push the reader farther than
+ * the writer). This operation can be done concurrently by many
+ * writers in the same buffer, the writer being at the fartest
+ * write position sub-buffer index in the buffer being the one
+ * which will win this loop. */
+ /* If the buffer is not in overwrite mode, pushing the reader
+ * only happen if a sub-buffer is corrupted */
+ if((SUBBUF_TRUNC(offset_end-1, buf->chan)
+ - SUBBUF_TRUNC(consumed_old, buf->chan))
+ >= rchan->alloc_size)
+ consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
+ else {
+ consumed_new = consumed_old;
+ break;
+ }
+ } while(atomic_cmpxchg(<t_buf->consumed, consumed_old, consumed_new)
+ != consumed_old);
+
+ if(consumed_old != consumed_new) {
+ /* Reader pushed : we are the winner of the push, we can
+ * therefore reequilibrate reserve and commit. Atomic increment
+ * of the commit count permits other writers to play around
+ * with this variable before us. We keep track of
+ * corrupted_subbuffers even in overwrite mode :
+ * we never want to write over a non completely committed
+ * sub-buffer : possible causes : the buffer size is too low
+ * compared to the unordered data input, or there is a writer
+ * who died between the reserve and the commit. */
+ if(reserve_commit_diff) {
+ /* We have to alter the sub-buffer commit count : a
+ * sub-buffer is corrupted. We do not deliver it. */
+ atomic_add(
+ reserve_commit_diff,
+ <t_buf->commit_count[
+ SUBBUF_INDEX(offset_begin, buf->chan)]);
+ atomic_inc(<t_buf->corrupted_subbuffers);
+ }
+ }
+
+ if(end_switch_old) {
+ /* old subbuffer */
+ /* Concurrency safe because we are the last and only thread to
+ * alter this sub-buffer. As long as it is not delivered and
+ * read, no other thread can alter the offset, alter the
+ * reserve_count or call the client_buffer_end_callback on
+ * this sub-buffer.
+ * The only remaining threads could be the ones with pending
+ * commits. They will have to do the deliver themself.
+ * Not concurrency safe in overwrite mode. We detect corrupted
+ * subbuffers with commit and reserve counts. We keep a
+ * corrupted sub-buffers count and push the readers across
+ * these sub-buffers.
+ * Not concurrency safe if a writer is stalled in a subbuffer
+ * and another writer switches in, finding out it's corrupted.
+ * The result will be than the old (uncommited) subbuffer will
+ * be declared corrupted, and that the new subbuffer will be
+ * declared corrupted too because of the commit count
+ * adjustment.
+ * Note : offset_old should never be 0 here.*/
+ ltt_channel->buffer_end(buf, *tsc, offset_old,
+ SUBBUF_INDEX((offset_old-1), buf->chan));
+ /* Setting this reserve_count will allow the sub-buffer to be
+ * delivered by the last committer. */
+ reserve_count =
+ atomic_add_return(
+ (SUBBUF_OFFSET((offset_old-1), buf->chan)+1),
+ <t_buf->reserve_count[
+ SUBBUF_INDEX((offset_old-1), buf->chan)]);
+ if(reserve_count == atomic_read(
+ <t_buf->commit_count[SUBBUF_INDEX((offset_old-1),
+ buf->chan)])) {
+ buf->chan->cb->deliver(buf, SUBBUF_INDEX((offset_old-1),
+ buf->chan), NULL);
+ }
+ }
+
+ if(begin_switch) {
+ /* New sub-buffer */
+ /* This code can be executed unordered : writers may already
+ * have written to the sub-buffer before this code gets
+ * executed, caution. */
+ /* The commit makes sure that this code is executed before the
+ * deliver of this sub-buffer */
+ ltt_channel->buffer_begin(buf, *tsc, SUBBUF_INDEX(offset_begin,
+ buf->chan));
+ commit_count = atomic_add_return(
+ ltt_subbuf_header_len(),
+ <t_buf->commit_count[
+ SUBBUF_INDEX(offset_begin, buf->chan)]);
+ /* Check if the written buffer has to be delivered */
+ if(commit_count == atomic_read(
+ <t_buf->reserve_count[SUBBUF_INDEX(offset_begin,
+ buf->chan)])) {
+ buf->chan->cb->deliver(buf,
+ SUBBUF_INDEX(offset_begin, buf->chan), NULL);
+ }
+ }
+
+ if(end_switch_current) {
+ /* current subbuffer */
+ /* Concurrency safe because we are the last and only thread to
+ * alter this sub-buffer. As long as it is not delivered and
+ * read, no other thread can alter the offset, alter the
+ * reserve_count or call the client_buffer_end_callback on this
+ * sub-buffer.
+ * The only remaining threads could be the ones with pending
+ * commits. They will have to do the deliver themself.
+ * Not concurrency safe in overwrite mode. We detect corrupted
+ * subbuffers with commit and reserve counts. We keep a
+ * corrupted sub-buffers count and push the readers across
+ * these sub-buffers.
+ * Not concurrency safe if a writer is stalled in a subbuffer
+ * and another writer switches in, finding out it's corrupted.
+ * The result will be than the old (uncommited) subbuffer will
+ * be declared corrupted, and that the new subbuffer will be
+ * declared corrupted too because of the commit count
+ * adjustment. */
+ ltt_channel->buffer_end(buf, *tsc, offset_end,
+ SUBBUF_INDEX((offset_end-1), buf->chan));
+ /* Setting this reserve_count will allow the sub-buffer to be
+ * delivered by the last committer. */
+ reserve_count = atomic_add_return(
+ (SUBBUF_OFFSET((offset_end-1), buf->chan)+1),
+ <t_buf->reserve_count[SUBBUF_INDEX((offset_end-1),
+ buf->chan)]);
+ if(reserve_count == atomic_read(
+ <t_buf->commit_count[SUBBUF_INDEX((offset_end-1),
+ buf->chan)])) {
+ buf->chan->cb->deliver(buf,
+ SUBBUF_INDEX((offset_end-1), buf->chan), NULL);
+ }
+ }
+
+ *slot_size = size;
+
+ //BUG_ON(*slot_size != (data_size + *before_hdr_pad + *after_hdr_pad + *header_size));
+ //BUG_ON(*slot_size != (offset_end - offset_begin));
+
+ return buf->start + BUFFER_OFFSET(offset_begin, buf->chan);
+}
+
+
+/* ltt_relay_commit_slot
+ *
+ * Atomic unordered slot commit. Increments the commit count in the
+ * specified sub-buffer, and delivers it if necessary.
+ *
+ * Parameters:
+ *
+ * @buf : the buffer to commit to.
+ * @reserved : address of the beginnig of the reserved slot.
+ * @slot_size : size of the reserved slot.
+ *
+ */
+static void ltt_relay_commit_slot(
+ struct ltt_channel_struct *ltt_channel,
+ void **transport_data,
+ void *reserved,
+ size_t slot_size)
+{
+ struct rchan_buf *buf = *transport_data;
+ struct ltt_channel_buf_struct *ltt_buf = <t_channel->buf[buf->cpu];
+ unsigned int offset_begin = reserved - buf->start;
+ int commit_count;
+
+ commit_count = atomic_add_return(slot_size,
+ <t_buf->commit_count[SUBBUF_INDEX(offset_begin,
+ buf->chan)]);
+ /* Check if all commits have been done */
+ if(commit_count == atomic_read(
+ <t_buf->reserve_count[
+ SUBBUF_INDEX(offset_begin, buf->chan)])) {
+ buf->chan->cb->deliver(buf,
+ SUBBUF_INDEX(offset_begin, buf->chan), NULL);
+ }
+}
+
+static void ltt_relay_print_subbuffer_errors(struct ltt_channel_struct *ltt_chan,
+ int cons_off, unsigned int i)
+{
+ struct rchan *rchan = ltt_chan->trans_channel_data;
+ int cons_idx;
+
+ printk(KERN_WARNING
+ "LTT : unread channel %s offset is %d "
+ "and cons_off : %d (cpu %u)\n",
+ ltt_chan->channel_name,
+ atomic_read(<t_chan->buf[i].offset), cons_off, i);
+ /* Check each sub-buffer for unequal reserve and commit count */
+ cons_idx = SUBBUF_INDEX(cons_off, rchan);
+ if (atomic_read(<t_chan->buf[i].reserve_count[cons_idx])
+ != atomic_read(<t_chan->buf[i].commit_count[cons_idx]))
+ printk(KERN_ALERT
+ "LTT : %s : subbuffer %u has unequal "
+ "reserve and commit counts.\n",
+ ltt_chan->channel_name, cons_idx);
+ printk(KERN_ALERT "LTT : %s : reserve count : %u, commit count : %u\n",
+ ltt_chan->channel_name,
+ atomic_read(<t_chan->buf[i].reserve_count[cons_idx]),
+ atomic_read(<t_chan->buf[i].commit_count[cons_idx]));
+}
+
+static void ltt_relay_print_errors(struct ltt_trace_struct *trace,
+ struct ltt_channel_struct *ltt_chan, int cpu)
+{
+ struct rchan *rchan = ltt_chan->trans_channel_data;
+ int cons_off;
+
+ for(cons_off = atomic_read(<t_chan->buf[cpu].consumed);
+ (SUBBUF_TRUNC(atomic_read(<t_chan->buf[cpu].offset),
+ rchan)
+ - cons_off) > 0;
+ cons_off = SUBBUF_ALIGN(cons_off, rchan)) {
+ ltt_relay_print_subbuffer_errors(ltt_chan, cons_off, cpu);
+ }
+}
+
+/* This is called with preemption disabled when user space has requested
+ * blocking mode. If one of the active traces has free space below a
+ * specific threshold value, we reenable preemption and block.
+ */
+static int ltt_relay_user_blocking(struct ltt_trace_struct *trace,
+ unsigned int index, size_t data_size, struct user_dbg_data *dbg)
+{
+ struct rchan *rchan;
+ struct ltt_channel_buf_struct *ltt_buf;
+ struct ltt_channel_struct *channel;
+ struct rchan_buf *relayfs_buf;
+ DECLARE_WAITQUEUE(wait, current);
+
+ channel = ltt_get_channel_from_index(trace, index);
+ rchan = channel->trans_channel_data;
+ relayfs_buf = rchan->buf[smp_processor_id()];
+ ltt_buf = &channel->buf[smp_processor_id()];
+ /* Check if data is too big for the channel : do not
+ * block for it */
+ if(LTT_RESERVE_CRITICAL + data_size
+ > relayfs_buf->chan->subbuf_size)
+ return 0;
+
+ /* If free space too low, we block. We restart from the
+ * beginning after we resume (cpu id may have changed
+ * while preemption is active).
+ */
+ spin_lock(<t_buf->full_lock);
+ if(!relayfs_buf->chan->overwrite &&
+ (dbg->avail_size = (dbg->write=atomic_read(
+ &channel->buf[relayfs_buf->cpu].offset))
+ + LTT_RESERVE_CRITICAL + data_size
+ - SUBBUF_TRUNC((dbg->read = atomic_read(
+ &channel->buf[relayfs_buf->cpu].consumed)),
+ relayfs_buf->chan))
+ >= rchan->alloc_size) {
+ __set_current_state(TASK_INTERRUPTIBLE);
+ add_wait_queue(<t_buf->write_wait, &wait);
+ spin_unlock(<t_buf->full_lock);
+ preempt_enable();
+ schedule();
+ __set_current_state(TASK_RUNNING);
+ remove_wait_queue(<t_buf->write_wait, &wait);
+ if (signal_pending(current))
+ return -ERESTARTSYS;
+ preempt_disable();
+ return 1;
+ }
+ spin_unlock(<t_buf->full_lock);
+ return 0;
+}
+
+static void ltt_relay_print_user_errors(struct ltt_trace_struct *trace,
+ unsigned int index, size_t data_size, struct user_dbg_data *dbg)
+{
+ struct rchan *rchan;
+ struct ltt_channel_buf_struct *ltt_buf;
+ struct ltt_channel_struct *channel;
+ struct rchan_buf *relayfs_buf;
+
+ channel = ltt_get_channel_from_index(trace, index);
+ rchan = channel->trans_channel_data;
+ relayfs_buf = rchan->buf[smp_processor_id()];
+ ltt_buf = &channel->buf[smp_processor_id()];
+ printk(KERN_ERR "Error in LTT usertrace : "
+ "buffer full : event lost in blocking "
+ "mode. Increase LTT_RESERVE_CRITICAL.\n");
+ printk(KERN_ERR "LTT nesting level is %u.\n",
+ ltt_nesting[smp_processor_id()]);
+ printk(KERN_ERR "LTT avail size %lu.\n",
+ dbg->avail_size);
+ printk(KERN_ERR "avai write : %lu, read : %lu\n",
+ dbg->write, dbg->read);
+ printk(KERN_ERR "LTT cur size %lu.\n",
+ (dbg->write = atomic_read(
+ &channel->buf[relayfs_buf->cpu].offset))
+ + LTT_RESERVE_CRITICAL + data_size
+ - SUBBUF_TRUNC((dbg->read = atomic_read(
+ &channel->buf[relayfs_buf->cpu].consumed)),
+ relayfs_buf->chan));
+ printk(KERN_ERR "cur write : %lu, read : %lu\n",
+ dbg->write, dbg->read);
+}
+
+static struct ltt_transport ltt_relay_transport = {
+ .name = "relay",
+ .owner = THIS_MODULE,
+ .ops = {
+ .create_dirs = ltt_relay_create_dirs,
+ .remove_dirs = ltt_relay_remove_dirs,
+ .create_channel = ltt_relay_create_channel,
+ .finish_channel = ltt_relay_finish_channel,
+ .remove_channel = ltt_relay_remove_channel,
+ .wakeup_channel = ltt_relay_async_wakeup_chan,
+ .commit_slot = ltt_relay_commit_slot,
+ .reserve_slot = ltt_relay_reserve_slot,
+ .print_errors = ltt_relay_print_errors,
+ .user_blocking = ltt_relay_user_blocking,
+ .user_errors = ltt_relay_print_user_errors,
+ },
+};
+
+static int __init ltt_relay_init(void)
+{
+ printk(KERN_INFO "LTT : ltt-relay init\n");
+ ltt_root_dentry = relayfs_create_dir(LTT_RELAYFS_ROOT, NULL);
+ if(ltt_root_dentry == NULL) return -EEXIST;
+
+ ltt_file_operations = relay_file_operations;
+ ltt_file_operations.owner = THIS_MODULE;
+ ltt_file_operations.poll = ltt_poll;
+ ltt_file_operations.ioctl = ltt_ioctl;
+#ifdef CONFIG_COMPAT
+ ltt_file_operations.compat_ioctl = ltt_compat_ioctl;
+#endif //CONFIG_COMPAT
+
+ ltt_transport_register(<t_relay_transport);
+
+ return 0;
+}
+
+static void __exit ltt_relay_exit(void)
+{
+ printk(KERN_INFO "LTT : ltt-relay exit\n");
+
+ ltt_transport_unregister(<t_relay_transport);
+
+ if(relayfs_remove_dir(ltt_root_dentry))
+ printk(KERN_INFO "LTT : stale ltt root relay entry\n");
+}
+
+module_init(ltt_relay_init);
+module_exit(ltt_relay_exit);
+
+MODULE_LICENSE("GPL");
+MODULE_AUTHOR("Mathieu Desnoyers");
+MODULE_DESCRIPTION("Linux Trace Toolkit Next Generation Tracer");
+
[Index of Archives]
[Kernel Newbies]
[Netfilter]
[Bugtraq]
[Photo]
[Stuff]
[Gimp]
[Yosemite News]
[MIPS Linux]
[ARM Linux]
[Linux Security]
[Linux RAID]
[Video 4 Linux]
[Linux for the blind]
[Linux Resources]