Program Listing for File onlinedecoder.cpp

Return to documentation for file (processors/onlinedecoder/onlinedecoder.cpp)

// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2021-now Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------

#include "onlinedecoder.hpp"

OnlineDecoder::OnlineDecoder() : IProcessor(){
    add_option("normalize", normalize_, "enable likelihood normalization");
   add_option("decoding size", delta_t_, "Time bin size for decoding (aka delta_t).");
}

void OnlineDecoder::CreatePorts(){
    data_in_port_ = create_input_port<ColumnsType<double>>(
        ColumnsType<double>::Capabilities(ChannelRange(1, MAX_NCHANNELS),
                                          SampleRange(0, std::numeric_limits<uint32_t>::max()), true),
        PortInPolicy(SlotRange(0, MAX_NCHANNELS)));

    data_out_port_ = create_output_port<ColumnsType<double>>("posterior", ColumnsType<double>::Parameters({"posterior likelihood"}, 0, true),
                                                             PortOutPolicy(SlotRange(1)));
    to_decode_ = create_follower_state("to decode", true, Permission::READ);

    shared_decoder_ = create_follower_state("decoder", (std::shared_ptr<Decoder>*)nullptr, Permission::NONE);

}

void OnlineDecoder::CompleteStreamInfo() {
    nslots_ = data_in_port_->number_of_slots();
    for (slot_ = 0; slot_ < nslots_; ++slot_) {
        events_.push_back({});
    }

    data_out_port_->streaminfo(0).set_stream_rate(1/delta_t_());
    buffer_packet_size_ = data_in_port_->streaminfo(0).stream_rate()*delta_t_();
}

void OnlineDecoder::Process(ProcessingContext &context){

    ColumnsType<double>::Data* data_in = nullptr;
    ColumnsType<double>::Data *data_out = nullptr;
    uint64_t hardware_ts;
    std::vector<double> logL;
    bool should_break=false;
    size_t npacket = 0;

    while (!context.terminated() and ! should_break) {
        for (slot_ = 0; slot_ < nslots_; ++slot_) {
            if (!data_in_port_->slot(slot_)->RetrieveData(data_in)) {
                should_break = true;
                continue;
            }
            if(to_decode_->get()){
                std::copy(data_in->data().begin(),
                          data_in->data().end(),
                          std::back_inserter(events_[slot_]));
            }
            if(slot_==0 and npacket == 0){
                hardware_ts = data_in->hardware_timestamp();
            }

            data_in_port_->slot(slot_)->ReleaseData();
       }
        npacket++;
        if(npacket  == buffer_packet_size_){

            std::fill(logL.begin(), logL.end(), 0);
            shared_decoder_->get()->get()->decode(events_,
                                           delta_t_(),
                                           logL.data(), 0,
                                           normalize_());

            npacket =0;
            data_out = data_out_port_->slot(0)->ClaimData(false);
            data_out->set_hardware_timestamp(hardware_ts);
            data_out->set_source_timestamp();
            data_out->set_data_column("posterior likelihood", logL);
            data_out_port_->slot(0)->PublishData();
        }

    }

}

REGISTERPROCESSOR(OnlineDecoder)